1pub mod activate;
26pub mod build;
27pub mod config;
28pub mod copy;
29pub mod error;
30pub mod eval;
31pub mod health;
32pub mod tasks;
33
34pub use config::{DeployAction, DeploymentNode, DeploymentPlan, FleetConfig, ProfileType};
35pub use error::{NixError, Result};
36
37use consortium::dag::{DagContext, DagReport, ErrorPolicy, StageBuilder};
38
39pub fn deploy(
45 config: &FleetConfig,
46 target_nodes: &[String],
47 action: DeployAction,
48 max_parallel: usize,
49 use_builders: bool,
50) -> Result<DeployReport> {
51 let machines_file: Option<String> = if use_builders && !config.builders.is_empty() {
53 let statuses = health::check_builders(config);
54 let healthy: Vec<_> = statuses.iter().filter(|s| s.healthy).cloned().collect();
55 if healthy.is_empty() {
56 eprintln!("warning: no healthy builders available, building locally");
57 None
58 } else {
59 match build::generate_machines_file_from_healthy(&healthy) {
60 Ok(path) => Some(path),
61 Err(e) => {
62 eprintln!("warning: failed to generate machines file: {}", e);
63 None
64 }
65 }
66 }
67 } else {
68 None
69 };
70
71 let ctx = DagContext::new();
73 ctx.set_state("fleet_config", config.clone());
74 ctx.set_state("action", action);
75 if let Some(ref path) = machines_file {
76 ctx.set_state("machines_file", path.clone());
77 }
78
79 let eval_limit = 1;
85 let build_limit = max_parallel;
86 let copy_limit = max_parallel;
87 let activate_limit = max_parallel.min(4);
88
89 let mut builder = StageBuilder::new()
91 .resources(target_nodes.to_vec())
92 .stage("eval", Some(eval_limit), |host| {
93 Box::new(tasks::NixEvalTask::new(host))
94 })
95 .stage("build", Some(build_limit), |host| {
96 Box::new(tasks::NixBuildTask::new(host))
97 })
98 .error_policy(ErrorPolicy::ContinueIndependent)
99 .context(ctx);
100
101 if action != DeployAction::Build {
103 builder = builder
104 .stage("copy", Some(copy_limit), |host| {
105 Box::new(tasks::NixCopyTask::new(host))
106 })
107 .stage("activate", Some(activate_limit), |host| {
108 Box::new(tasks::NixActivateTask::new(host))
109 });
110 }
111
112 let dag_report = builder
113 .build()
114 .map_err(|e| NixError::General(e.to_string()))?
115 .run()
116 .map_err(|e| NixError::General(e.to_string()))?;
117
118 Ok(DeployReport::from_dag_report(
119 &dag_report,
120 target_nodes,
121 action,
122 ))
123}
124
125#[derive(Debug)]
127pub struct DeployReport {
128 pub built: Vec<String>,
130 pub copied: Vec<String>,
132 pub activated: Vec<String>,
134 pub build_failures: Vec<(String, String)>,
136 pub copy_failures: Vec<(String, String)>,
138 pub activation_failures: Vec<(String, String)>,
140}
141
142impl DeployReport {
143 pub fn is_success(&self) -> bool {
145 self.build_failures.is_empty()
146 && self.copy_failures.is_empty()
147 && self.activation_failures.is_empty()
148 }
149
150 pub fn failure_count(&self) -> usize {
152 self.build_failures.len() + self.copy_failures.len() + self.activation_failures.len()
153 }
154
155 pub fn success_count(&self) -> usize {
157 self.activated.len()
158 }
159
160 fn from_dag_report(report: &DagReport, target_nodes: &[String], action: DeployAction) -> Self {
162 let mut built = Vec::new();
163 let mut copied = Vec::new();
164 let mut activated = Vec::new();
165 let mut build_failures = Vec::new();
166 let mut copy_failures = Vec::new();
167 let mut activation_failures = Vec::new();
168
169 for host in target_nodes {
170 let build_id = consortium::dag::TaskId(format!("build:{}", host));
171 let copy_id = consortium::dag::TaskId(format!("copy:{}", host));
172 let activate_id = consortium::dag::TaskId(format!("activate:{}", host));
173
174 if report.completed.contains(&build_id) || report.skipped.contains(&build_id) {
176 built.push(host.clone());
177 } else if let Some(err) = report.failed.get(&build_id) {
178 build_failures.push((host.clone(), err.clone()));
179 }
180 if action == DeployAction::Build {
183 continue;
184 }
185
186 if report.completed.contains(©_id) || report.skipped.contains(©_id) {
188 copied.push(host.clone());
189 } else if let Some(err) = report.failed.get(©_id) {
190 copy_failures.push((host.clone(), err.clone()));
191 }
192
193 if report.completed.contains(&activate_id) || report.skipped.contains(&activate_id) {
195 activated.push(host.clone());
196 } else if let Some(err) = report.failed.get(&activate_id) {
197 activation_failures.push((host.clone(), err.clone()));
198 }
199 }
200
201 Self {
202 built,
203 copied,
204 activated,
205 build_failures,
206 copy_failures,
207 activation_failures,
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use consortium::dag::{DagReport, TaskId};
216 use std::collections::HashSet;
217
218 fn make_report(completed: &[&str], failed: &[(&str, &str)], cancelled: &[&str]) -> DagReport {
219 DagReport {
220 completed: completed.iter().map(|s| TaskId(s.to_string())).collect(),
221 skipped: HashSet::new(),
222 failed: failed
223 .iter()
224 .map(|(k, v)| (TaskId(k.to_string()), v.to_string()))
225 .collect(),
226 cancelled: cancelled.iter().map(|s| TaskId(s.to_string())).collect(),
227 }
228 }
229
230 #[test]
231 fn test_deploy_report_all_success() {
232 let dag_report = make_report(
233 &[
234 "eval:hp01",
235 "build:hp01",
236 "copy:hp01",
237 "activate:hp01",
238 "eval:hp02",
239 "build:hp02",
240 "copy:hp02",
241 "activate:hp02",
242 ],
243 &[],
244 &[],
245 );
246 let targets = vec!["hp01".to_string(), "hp02".to_string()];
247 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
248
249 assert!(report.is_success());
250 assert_eq!(report.built, vec!["hp01", "hp02"]);
251 assert_eq!(report.copied, vec!["hp01", "hp02"]);
252 assert_eq!(report.activated, vec!["hp01", "hp02"]);
253 assert_eq!(report.failure_count(), 0);
254 }
255
256 #[test]
257 fn test_deploy_report_build_failure() {
258 let dag_report = make_report(
259 &[
260 "eval:hp01",
261 "eval:hp02",
262 "build:hp01",
263 "copy:hp01",
264 "activate:hp01",
265 ],
266 &[("build:hp02", "nix build failed")],
267 &["copy:hp02", "activate:hp02"],
268 );
269 let targets = vec!["hp01".to_string(), "hp02".to_string()];
270 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
271
272 assert!(!report.is_success());
273 assert_eq!(report.built, vec!["hp01"]);
274 assert_eq!(report.activated, vec!["hp01"]);
275 assert_eq!(
276 report.build_failures,
277 vec![("hp02".to_string(), "nix build failed".to_string())]
278 );
279 assert!(report.copy_failures.is_empty());
281 assert!(report.activation_failures.is_empty());
282 }
283
284 #[test]
285 fn test_deploy_report_copy_failure() {
286 let dag_report = make_report(
287 &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
288 &[("copy:hp01", "ssh connection refused")],
289 &["activate:hp01"],
290 );
291 let targets = vec!["hp01".to_string(), "hp02".to_string()];
292 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
293
294 assert!(!report.is_success());
295 assert_eq!(report.built, vec!["hp01", "hp02"]); assert_eq!(
297 report.copy_failures,
298 vec![("hp01".to_string(), "ssh connection refused".to_string())]
299 );
300 }
301
302 #[test]
303 fn test_deploy_report_build_only() {
304 let dag_report = make_report(
305 &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
306 &[],
307 &[],
308 );
309 let targets = vec!["hp01".to_string(), "hp02".to_string()];
310 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Build);
311
312 assert!(report.is_success());
313 assert_eq!(report.built, vec!["hp01", "hp02"]);
314 assert!(report.copied.is_empty());
315 assert!(report.activated.is_empty());
316 }
317
318 #[test]
319 fn test_deploy_action_display_roundtrip() {
320 for action in &["switch", "boot", "test", "dry-activate", "build"] {
321 let parsed: DeployAction = action.parse().unwrap();
322 assert_eq!(parsed.to_string(), *action);
323 }
324 }
325
326 #[test]
327 fn test_deploy_action_invalid() {
328 assert!("reboot".parse::<DeployAction>().is_err());
329 assert!("".parse::<DeployAction>().is_err());
330 assert!("SWITCH".parse::<DeployAction>().is_err());
331 }
332}