Skip to main content

consortium_nix/
lib.rs

1//! # consortium-nix
2//!
3//! NixOS deployment orchestration for consortium.
4//!
5//! This crate provides the deployment pipeline for NixOS and nix-darwin systems,
6//! replacing colmena. It consumes fleet configuration (produced by the Nix library)
7//! and orchestrates the evaluate → build → copy → activate pipeline using
8//! consortium's DAG executor for maximum parallelism with per-host pipelining.
9//!
10//! ## Architecture
11//!
12//! The deployment pipeline has four stages, executed as a DAG:
13//!
14//! ```text
15//! For each host:
16//!   eval(host) → build(host) → copy(host) → activate(host)
17//! ```
18//!
19//! Stages run in parallel across hosts (up to concurrency limits), and each
20//! host can advance independently — host A can be copying while host B is
21//! still building.
22//!
23//! Builder health checking ([`health`]) validates remote builders before use.
24
25pub mod activate;
26pub mod build;
27pub mod config;
28pub mod copy;
29pub mod error;
30pub mod eval;
31pub mod health;
32pub mod tasks;
33
34pub use config::{DeployAction, DeploymentNode, DeploymentPlan, FleetConfig, ProfileType};
35pub use error::{NixError, Result};
36
37use consortium::dag::{DagContext, DagReport, ErrorPolicy, StageBuilder};
38
39/// Run the full deployment pipeline using the DAG executor.
40///
41/// Each host progresses independently through eval → build → copy → activate,
42/// with per-stage concurrency limits. A host that fails at any stage is
43/// cancelled for subsequent stages without blocking other hosts.
44pub fn deploy(
45    config: &FleetConfig,
46    target_nodes: &[String],
47    action: DeployAction,
48    max_parallel: usize,
49    use_builders: bool,
50) -> Result<DeployReport> {
51    // Phase 0: Health check builders and prepare machines file
52    let machines_file: Option<String> = if use_builders && !config.builders.is_empty() {
53        let statuses = health::check_builders(config);
54        let healthy: Vec<_> = statuses.iter().filter(|s| s.healthy).cloned().collect();
55        if healthy.is_empty() {
56            eprintln!("warning: no healthy builders available, building locally");
57            None
58        } else {
59            match build::generate_machines_file_from_healthy(&healthy) {
60                Ok(path) => Some(path),
61                Err(e) => {
62                    eprintln!("warning: failed to generate machines file: {}", e);
63                    None
64                }
65            }
66        }
67    } else {
68        None
69    };
70
71    // Set up shared context
72    let ctx = DagContext::new();
73    ctx.set_state("fleet_config", config.clone());
74    ctx.set_state("action", action);
75    if let Some(ref path) = machines_file {
76        ctx.set_state("machines_file", path.clone());
77    }
78
79    // Determine stage concurrency limits
80    // eval: limited to 1 (nix evaluation is memory-heavy)
81    // build: up to max_parallel (nix distributes across builders internally)
82    // copy: up to max_parallel (IO-bound, can be aggressive)
83    // activate: limited to avoid overwhelming the fleet
84    let eval_limit = 1;
85    let build_limit = max_parallel;
86    let copy_limit = max_parallel;
87    let activate_limit = max_parallel.min(4);
88
89    // Build the deployment DAG
90    let mut builder = StageBuilder::new()
91        .resources(target_nodes.to_vec())
92        .stage("eval", Some(eval_limit), |host| {
93            Box::new(tasks::NixEvalTask::new(host))
94        })
95        .stage("build", Some(build_limit), |host| {
96            Box::new(tasks::NixBuildTask::new(host))
97        })
98        .error_policy(ErrorPolicy::ContinueIndependent)
99        .context(ctx);
100
101    // Only add copy + activate stages if not build-only
102    if action != DeployAction::Build {
103        builder = builder
104            .stage("copy", Some(copy_limit), |host| {
105                Box::new(tasks::NixCopyTask::new(host))
106            })
107            .stage("activate", Some(activate_limit), |host| {
108                Box::new(tasks::NixActivateTask::new(host))
109            });
110    }
111
112    let dag_report = builder
113        .build()
114        .map_err(|e| NixError::General(e.to_string()))?
115        .run()
116        .map_err(|e| NixError::General(e.to_string()))?;
117
118    Ok(DeployReport::from_dag_report(
119        &dag_report,
120        target_nodes,
121        action,
122    ))
123}
124
125/// Summary of a deployment run.
126#[derive(Debug)]
127pub struct DeployReport {
128    /// Hosts whose closures were built successfully.
129    pub built: Vec<String>,
130    /// Hosts whose closures were copied successfully.
131    pub copied: Vec<String>,
132    /// Hosts that were activated successfully.
133    pub activated: Vec<String>,
134    /// Hosts that failed to build (name, error message).
135    pub build_failures: Vec<(String, String)>,
136    /// Hosts that failed closure copy (name, error message).
137    pub copy_failures: Vec<(String, String)>,
138    /// Hosts that failed activation (name, error message).
139    pub activation_failures: Vec<(String, String)>,
140}
141
142impl DeployReport {
143    /// Whether the deployment was fully successful (no failures).
144    pub fn is_success(&self) -> bool {
145        self.build_failures.is_empty()
146            && self.copy_failures.is_empty()
147            && self.activation_failures.is_empty()
148    }
149
150    /// Total number of failures across all phases.
151    pub fn failure_count(&self) -> usize {
152        self.build_failures.len() + self.copy_failures.len() + self.activation_failures.len()
153    }
154
155    /// Number of hosts that completed successfully (all phases).
156    pub fn success_count(&self) -> usize {
157        self.activated.len()
158    }
159
160    /// Build a DeployReport from a DagReport by inspecting task IDs.
161    fn from_dag_report(report: &DagReport, target_nodes: &[String], action: DeployAction) -> Self {
162        let mut built = Vec::new();
163        let mut copied = Vec::new();
164        let mut activated = Vec::new();
165        let mut build_failures = Vec::new();
166        let mut copy_failures = Vec::new();
167        let mut activation_failures = Vec::new();
168
169        for host in target_nodes {
170            let build_id = consortium::dag::TaskId(format!("build:{}", host));
171            let copy_id = consortium::dag::TaskId(format!("copy:{}", host));
172            let activate_id = consortium::dag::TaskId(format!("activate:{}", host));
173
174            // Check build
175            if report.completed.contains(&build_id) || report.skipped.contains(&build_id) {
176                built.push(host.clone());
177            } else if let Some(err) = report.failed.get(&build_id) {
178                build_failures.push((host.clone(), err.clone()));
179            }
180            // cancelled builds are not reported as failures — they're implied by an earlier failure
181
182            if action == DeployAction::Build {
183                continue;
184            }
185
186            // Check copy
187            if report.completed.contains(&copy_id) || report.skipped.contains(&copy_id) {
188                copied.push(host.clone());
189            } else if let Some(err) = report.failed.get(&copy_id) {
190                copy_failures.push((host.clone(), err.clone()));
191            }
192
193            // Check activate
194            if report.completed.contains(&activate_id) || report.skipped.contains(&activate_id) {
195                activated.push(host.clone());
196            } else if let Some(err) = report.failed.get(&activate_id) {
197                activation_failures.push((host.clone(), err.clone()));
198            }
199        }
200
201        Self {
202            built,
203            copied,
204            activated,
205            build_failures,
206            copy_failures,
207            activation_failures,
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use consortium::dag::{DagReport, TaskId};
216    use std::collections::HashSet;
217
218    fn make_report(completed: &[&str], failed: &[(&str, &str)], cancelled: &[&str]) -> DagReport {
219        DagReport {
220            completed: completed.iter().map(|s| TaskId(s.to_string())).collect(),
221            skipped: HashSet::new(),
222            failed: failed
223                .iter()
224                .map(|(k, v)| (TaskId(k.to_string()), v.to_string()))
225                .collect(),
226            cancelled: cancelled.iter().map(|s| TaskId(s.to_string())).collect(),
227        }
228    }
229
230    #[test]
231    fn test_deploy_report_all_success() {
232        let dag_report = make_report(
233            &[
234                "eval:hp01",
235                "build:hp01",
236                "copy:hp01",
237                "activate:hp01",
238                "eval:hp02",
239                "build:hp02",
240                "copy:hp02",
241                "activate:hp02",
242            ],
243            &[],
244            &[],
245        );
246        let targets = vec!["hp01".to_string(), "hp02".to_string()];
247        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
248
249        assert!(report.is_success());
250        assert_eq!(report.built, vec!["hp01", "hp02"]);
251        assert_eq!(report.copied, vec!["hp01", "hp02"]);
252        assert_eq!(report.activated, vec!["hp01", "hp02"]);
253        assert_eq!(report.failure_count(), 0);
254    }
255
256    #[test]
257    fn test_deploy_report_build_failure() {
258        let dag_report = make_report(
259            &[
260                "eval:hp01",
261                "eval:hp02",
262                "build:hp01",
263                "copy:hp01",
264                "activate:hp01",
265            ],
266            &[("build:hp02", "nix build failed")],
267            &["copy:hp02", "activate:hp02"],
268        );
269        let targets = vec!["hp01".to_string(), "hp02".to_string()];
270        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
271
272        assert!(!report.is_success());
273        assert_eq!(report.built, vec!["hp01"]);
274        assert_eq!(report.activated, vec!["hp01"]);
275        assert_eq!(
276            report.build_failures,
277            vec![("hp02".to_string(), "nix build failed".to_string())]
278        );
279        // copy and activate for hp02 are cancelled, not failed
280        assert!(report.copy_failures.is_empty());
281        assert!(report.activation_failures.is_empty());
282    }
283
284    #[test]
285    fn test_deploy_report_copy_failure() {
286        let dag_report = make_report(
287            &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
288            &[("copy:hp01", "ssh connection refused")],
289            &["activate:hp01"],
290        );
291        let targets = vec!["hp01".to_string(), "hp02".to_string()];
292        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
293
294        assert!(!report.is_success());
295        assert_eq!(report.built, vec!["hp01", "hp02"]); // both built
296        assert_eq!(
297            report.copy_failures,
298            vec![("hp01".to_string(), "ssh connection refused".to_string())]
299        );
300    }
301
302    #[test]
303    fn test_deploy_report_build_only() {
304        let dag_report = make_report(
305            &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
306            &[],
307            &[],
308        );
309        let targets = vec!["hp01".to_string(), "hp02".to_string()];
310        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Build);
311
312        assert!(report.is_success());
313        assert_eq!(report.built, vec!["hp01", "hp02"]);
314        assert!(report.copied.is_empty());
315        assert!(report.activated.is_empty());
316    }
317
318    #[test]
319    fn test_deploy_action_display_roundtrip() {
320        for action in &["switch", "boot", "test", "dry-activate", "build"] {
321            let parsed: DeployAction = action.parse().unwrap();
322            assert_eq!(parsed.to_string(), *action);
323        }
324    }
325
326    #[test]
327    fn test_deploy_action_invalid() {
328        assert!("reboot".parse::<DeployAction>().is_err());
329        assert!("".parse::<DeployAction>().is_err());
330        assert!("SWITCH".parse::<DeployAction>().is_err());
331    }
332}