Skip to main content

consortium/worker/
mod.rs

1//! Worker module for executing commands across nodes.
2//!
3//! This module provides the abstraction for executing commands
4//! across nodes, similar to ClusterShell's Worker.
5//!
6//! ## Submodules
7//!
8//! - [`exec`] — Local command execution worker (ExecWorker)
9//! - [`ssh`] — SSH-based remote execution worker (SshWorker, ScpWorker)
10//! - [`tree`] — Tree-based propagation worker (TreeWorker)
11
12pub mod exec;
13pub mod ssh;
14pub mod tree;
15
16use std::collections::{HashMap, HashSet};
17use std::os::unix::io::RawFd;
18use thiserror::Error;
19
20/// Error types for the worker module.
21#[derive(Debug, Error)]
22pub enum WorkerError {
23    /// I/O error wrapper.
24    #[error("io error: {0}")]
25    Io(#[from] std::io::Error),
26    /// General worker error.
27    #[error("worker error: {0}")]
28    General(String),
29    /// Operation timed out.
30    #[error("timeout")]
31    Timeout,
32}
33
34/// Result type alias for worker operations.
35pub type Result<T> = std::result::Result<T, WorkerError>;
36
37/// Current state of a worker.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum WorkerState {
40    /// Worker is created but not started yet.
41    Pending,
42    /// Worker is currently running.
43    Running,
44    /// Worker has completed.
45    Done,
46    /// Worker was aborted.
47    Aborted,
48}
49
50/// Event handler trait for worker events.
51///
52/// Implement this trait to receive notifications about worker events.
53pub trait EventHandler: Send {
54    /// Called when the worker starts.
55    fn on_start(&mut self, worker: &dyn Worker) {
56        let _ = worker;
57    }
58
59    /// Called when data is available for reading from a node.
60    fn on_read(&mut self, node: &str, fd: RawFd, msg: &[u8]) {
61        let _ = (node, fd, msg);
62    }
63
64    /// Called when a connection to a node is closed.
65    fn on_close(&mut self, node: &str, rc: i32) {
66        let _ = (node, rc);
67    }
68
69    /// Called when a node operation times out.
70    fn on_timeout(&mut self, node: &str) {
71        let _ = node;
72    }
73
74    /// Called when an error occurs for a node.
75    fn on_error(&mut self, node: &str, error: &WorkerError) {
76        let _ = (node, error);
77    }
78
79    /// Extract buffered stdout/stderr data from this handler.
80    /// Returns (stdout, stderr, timeouts) where stdout/stderr are node -> `Vec<chunks>`.
81    /// Default returns empty maps. Used by Task to collect results after execution.
82    fn take_buffers(
83        &mut self,
84    ) -> (
85        HashMap<String, Vec<Vec<u8>>>,
86        HashMap<String, Vec<Vec<u8>>>,
87        HashSet<String>,
88    ) {
89        (HashMap::new(), HashMap::new(), HashSet::new())
90    }
91}
92
93/// Trait for worker implementations.
94///
95/// A worker executes commands across multiple nodes, handling I/O
96/// events and managing child processes.
97pub trait Worker: Send {
98    /// Start the worker.
99    fn start(&mut self) -> Result<()>;
100
101    /// Abort the worker, optionally killing child processes.
102    fn abort(&mut self, kill: bool);
103
104    /// Get the worker's current state.
105    fn state(&self) -> WorkerState;
106
107    /// Set the event handler for this worker.
108    fn set_handler(&mut self, handler: Box<dyn EventHandler>);
109
110    /// Get file descriptors for read interest.
111    fn read_fds(&self) -> Vec<RawFd>;
112
113    /// Get file descriptors for write interest.
114    fn write_fds(&self) -> Vec<RawFd>;
115
116    /// Handle a read event on the given file descriptor.
117    fn handle_read(&mut self, fd: RawFd) -> Result<()>;
118
119    /// Handle a write event on the given file descriptor.
120    fn handle_write(&mut self, fd: RawFd) -> Result<()>;
121
122    /// Check if the worker has completed.
123    fn is_done(&self) -> bool;
124
125    /// Get the return codes map: node -> return_code.
126    fn retcodes(&self) -> &HashMap<String, i32>;
127
128    /// Get the number of nodes.
129    fn num_nodes(&self) -> usize;
130
131    /// Take the event handler out of the worker (used by Task to extract gathered data).
132    fn take_handler(&mut self) -> Option<Box<dyn EventHandler>>;
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    /// Test WorkerState enum variants
140    #[test]
141    fn test_worker_state_variants() {
142        assert_eq!(WorkerState::Pending, WorkerState::Pending);
143        assert_eq!(WorkerState::Running, WorkerState::Running);
144        assert_eq!(WorkerState::Done, WorkerState::Done);
145        assert_eq!(WorkerState::Aborted, WorkerState::Aborted);
146    }
147
148    /// Test WorkerState is Debug
149    #[test]
150    fn test_worker_state_debug() {
151        assert_eq!(format!("{:?}", WorkerState::Pending), "Pending");
152        assert_eq!(format!("{:?}", WorkerState::Running), "Running");
153        assert_eq!(format!("{:?}", WorkerState::Done), "Done");
154        assert_eq!(format!("{:?}", WorkerState::Aborted), "Aborted");
155    }
156
157    /// Test WorkerState is Clone
158    #[test]
159    fn test_worker_state_clone() {
160        let state = WorkerState::Pending;
161        let cloned = state.clone();
162        assert_eq!(state, cloned);
163    }
164
165    /// Test WorkerState is Copy
166    #[test]
167    fn test_worker_state_copy() {
168        fn take_copy<T: Copy>(_: T) {}
169        take_copy(WorkerState::Pending);
170    }
171
172    /// Test WorkerState is PartialEq
173    #[test]
174    fn test_worker_state_partial_eq() {
175        assert!(WorkerState::Pending == WorkerState::Pending);
176        assert!(WorkerState::Running != WorkerState::Pending);
177    }
178
179    /// Test WorkerError variants
180    #[test]
181    fn test_worker_error_variants() {
182        let io_err = WorkerError::Io(std::io::Error::new(
183            std::io::ErrorKind::NotFound,
184            "not found",
185        ));
186        let gen_err = WorkerError::General("something went wrong".to_string());
187        let timeout_err = WorkerError::Timeout;
188
189        matches!(io_err, WorkerError::Io(_));
190        matches!(gen_err, WorkerError::General(_));
191        matches!(timeout_err, WorkerError::Timeout);
192    }
193
194    /// Test WorkerError implements Error trait
195    #[test]
196    fn test_worker_error_impls_error() {
197        use std::error::Error;
198
199        let gen_err = WorkerError::General("test".to_string());
200        assert!(gen_err.source().is_none());
201
202        let timeout_err = WorkerError::Timeout;
203        assert!(timeout_err.source().is_none());
204    }
205
206    /// Test WorkerError implements Display trait
207    #[test]
208    fn test_worker_error_impls_display() {
209        let io_err = WorkerError::Io(std::io::Error::new(
210            std::io::ErrorKind::NotFound,
211            "file.txt",
212        ));
213        assert_eq!(io_err.to_string(), "io error: file.txt");
214
215        let gen_err = WorkerError::General("test error".to_string());
216        assert_eq!(gen_err.to_string(), "worker error: test error");
217
218        let timeout_err = WorkerError::Timeout;
219        assert_eq!(timeout_err.to_string(), "timeout");
220    }
221
222    /// Test EventHandler default methods compile
223    #[test]
224    fn test_event_handler_default_methods() {
225        struct TestHandler;
226
227        impl EventHandler for TestHandler {}
228
229        let handler = TestHandler;
230
231        // These should compile with default implementations (no-op tests)
232        // We just verify the methods exist and can be called
233        let _ = handler;
234    }
235
236    /// Test that EventHandler is object-safe
237    #[test]
238    fn test_event_handler_object_safe() {
239        struct TestHandler;
240
241        impl EventHandler for TestHandler {}
242
243        let _handler: Box<dyn EventHandler> = Box::new(TestHandler);
244    }
245
246    /// Test EventHandler is Send
247    #[test]
248    fn test_event_handler_is_send() {
249        fn assert_send<T: Send>() {}
250        assert_send::<Box<dyn EventHandler>>();
251    }
252
253    /// Test WorkerError Result type alias
254    #[test]
255    fn test_result_type_alias() {
256        let ok: Result<()> = Ok(());
257        let err: Result<()> = Err(WorkerError::Timeout);
258
259        assert!(ok.is_ok());
260        assert!(err.is_err());
261    }
262}