1pub mod exec;
13pub mod ssh;
14pub mod tree;
15
16use std::collections::{HashMap, HashSet};
17use std::os::unix::io::RawFd;
18use thiserror::Error;
19
20#[derive(Debug, Error)]
22pub enum WorkerError {
23 #[error("io error: {0}")]
25 Io(#[from] std::io::Error),
26 #[error("worker error: {0}")]
28 General(String),
29 #[error("timeout")]
31 Timeout,
32}
33
34pub type Result<T> = std::result::Result<T, WorkerError>;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum WorkerState {
40 Pending,
42 Running,
44 Done,
46 Aborted,
48}
49
50pub trait EventHandler: Send {
54 fn on_start(&mut self, worker: &dyn Worker) {
56 let _ = worker;
57 }
58
59 fn on_read(&mut self, node: &str, fd: RawFd, msg: &[u8]) {
61 let _ = (node, fd, msg);
62 }
63
64 fn on_close(&mut self, node: &str, rc: i32) {
66 let _ = (node, rc);
67 }
68
69 fn on_timeout(&mut self, node: &str) {
71 let _ = node;
72 }
73
74 fn on_error(&mut self, node: &str, error: &WorkerError) {
76 let _ = (node, error);
77 }
78
79 fn take_buffers(
83 &mut self,
84 ) -> (
85 HashMap<String, Vec<Vec<u8>>>,
86 HashMap<String, Vec<Vec<u8>>>,
87 HashSet<String>,
88 ) {
89 (HashMap::new(), HashMap::new(), HashSet::new())
90 }
91}
92
93pub trait Worker: Send {
98 fn start(&mut self) -> Result<()>;
100
101 fn abort(&mut self, kill: bool);
103
104 fn state(&self) -> WorkerState;
106
107 fn set_handler(&mut self, handler: Box<dyn EventHandler>);
109
110 fn read_fds(&self) -> Vec<RawFd>;
112
113 fn write_fds(&self) -> Vec<RawFd>;
115
116 fn handle_read(&mut self, fd: RawFd) -> Result<()>;
118
119 fn handle_write(&mut self, fd: RawFd) -> Result<()>;
121
122 fn is_done(&self) -> bool;
124
125 fn retcodes(&self) -> &HashMap<String, i32>;
127
128 fn num_nodes(&self) -> usize;
130
131 fn take_handler(&mut self) -> Option<Box<dyn EventHandler>>;
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
141 fn test_worker_state_variants() {
142 assert_eq!(WorkerState::Pending, WorkerState::Pending);
143 assert_eq!(WorkerState::Running, WorkerState::Running);
144 assert_eq!(WorkerState::Done, WorkerState::Done);
145 assert_eq!(WorkerState::Aborted, WorkerState::Aborted);
146 }
147
148 #[test]
150 fn test_worker_state_debug() {
151 assert_eq!(format!("{:?}", WorkerState::Pending), "Pending");
152 assert_eq!(format!("{:?}", WorkerState::Running), "Running");
153 assert_eq!(format!("{:?}", WorkerState::Done), "Done");
154 assert_eq!(format!("{:?}", WorkerState::Aborted), "Aborted");
155 }
156
157 #[test]
159 fn test_worker_state_clone() {
160 let state = WorkerState::Pending;
161 let cloned = state.clone();
162 assert_eq!(state, cloned);
163 }
164
165 #[test]
167 fn test_worker_state_copy() {
168 fn take_copy<T: Copy>(_: T) {}
169 take_copy(WorkerState::Pending);
170 }
171
172 #[test]
174 fn test_worker_state_partial_eq() {
175 assert!(WorkerState::Pending == WorkerState::Pending);
176 assert!(WorkerState::Running != WorkerState::Pending);
177 }
178
179 #[test]
181 fn test_worker_error_variants() {
182 let io_err = WorkerError::Io(std::io::Error::new(
183 std::io::ErrorKind::NotFound,
184 "not found",
185 ));
186 let gen_err = WorkerError::General("something went wrong".to_string());
187 let timeout_err = WorkerError::Timeout;
188
189 matches!(io_err, WorkerError::Io(_));
190 matches!(gen_err, WorkerError::General(_));
191 matches!(timeout_err, WorkerError::Timeout);
192 }
193
194 #[test]
196 fn test_worker_error_impls_error() {
197 use std::error::Error;
198
199 let gen_err = WorkerError::General("test".to_string());
200 assert!(gen_err.source().is_none());
201
202 let timeout_err = WorkerError::Timeout;
203 assert!(timeout_err.source().is_none());
204 }
205
206 #[test]
208 fn test_worker_error_impls_display() {
209 let io_err = WorkerError::Io(std::io::Error::new(
210 std::io::ErrorKind::NotFound,
211 "file.txt",
212 ));
213 assert_eq!(io_err.to_string(), "io error: file.txt");
214
215 let gen_err = WorkerError::General("test error".to_string());
216 assert_eq!(gen_err.to_string(), "worker error: test error");
217
218 let timeout_err = WorkerError::Timeout;
219 assert_eq!(timeout_err.to_string(), "timeout");
220 }
221
222 #[test]
224 fn test_event_handler_default_methods() {
225 struct TestHandler;
226
227 impl EventHandler for TestHandler {}
228
229 let handler = TestHandler;
230
231 let _ = handler;
234 }
235
236 #[test]
238 fn test_event_handler_object_safe() {
239 struct TestHandler;
240
241 impl EventHandler for TestHandler {}
242
243 let _handler: Box<dyn EventHandler> = Box::new(TestHandler);
244 }
245
246 #[test]
248 fn test_event_handler_is_send() {
249 fn assert_send<T: Send>() {}
250 assert_send::<Box<dyn EventHandler>>();
251 }
252
253 #[test]
255 fn test_result_type_alias() {
256 let ok: Result<()> = Ok(());
257 let err: Result<()> = Err(WorkerError::Timeout);
258
259 assert!(ok.is_ok());
260 assert!(err.is_err());
261 }
262}