Skip to main content

consortium_nix/
tasks.rs

1//! DagTask implementations for each NixOS deployment phase.
2//!
3//! Each task reads its inputs from DagContext (predecessor outputs and
4//! shared fleet config) and writes its outputs for dependent tasks.
5
6use consortium::dag::{DagContext, DagTask, TaskId, TaskOutcome};
7
8use crate::activate;
9use crate::build;
10use crate::config::{DeployAction, FleetConfig};
11use crate::copy;
12use crate::eval;
13
14/// Evaluate a single host — resolve its toplevel store path.
15///
16/// Writes output: `eval:{host}` → `String` (toplevel store path)
17pub struct NixEvalTask {
18    pub host: String,
19}
20
21impl NixEvalTask {
22    pub fn new(host: &str) -> Self {
23        Self {
24            host: host.to_string(),
25        }
26    }
27}
28
29impl DagTask for NixEvalTask {
30    fn execute(&self, ctx: &DagContext) -> TaskOutcome {
31        let config: FleetConfig = match ctx.get_state("fleet_config") {
32            Some(c) => c,
33            None => return TaskOutcome::Failed("fleet_config not in context".into()),
34        };
35
36        match eval::eval_toplevel(&config.flake_uri, &self.host) {
37            Ok(path) => {
38                ctx.set_output(TaskId(format!("eval:{}", self.host)), path);
39                TaskOutcome::Success
40            }
41            Err(e) => TaskOutcome::Failed(format!("eval {}: {}", self.host, e)),
42        }
43    }
44
45    fn describe(&self) -> String {
46        format!("evaluate {}", self.host)
47    }
48}
49
50/// Build the system closure for a single host.
51///
52/// Reads: `eval:{host}` → toplevel path (to verify eval completed)
53/// Reads state: `machines_file` → `Option<String>` (path to machines file for distributed builds)
54/// Writes output: `build:{host}` → `String` (built store path)
55pub struct NixBuildTask {
56    pub host: String,
57}
58
59impl NixBuildTask {
60    pub fn new(host: &str) -> Self {
61        Self {
62            host: host.to_string(),
63        }
64    }
65}
66
67impl DagTask for NixBuildTask {
68    fn execute(&self, ctx: &DagContext) -> TaskOutcome {
69        let config: FleetConfig = match ctx.get_state("fleet_config") {
70            Some(c) => c,
71            None => return TaskOutcome::Failed("fleet_config not in context".into()),
72        };
73
74        let machines_file: Option<String> = ctx.get_state("machines_file");
75
76        match build::build_host(&config.flake_uri, &self.host, machines_file.as_deref()) {
77            Ok(path) => {
78                ctx.set_output(TaskId(format!("build:{}", self.host)), path);
79                TaskOutcome::Success
80            }
81            Err(e) => TaskOutcome::Failed(format!("build {}: {}", self.host, e)),
82        }
83    }
84
85    fn describe(&self) -> String {
86        format!("build {}", self.host)
87    }
88}
89
90/// Copy the built closure to the target host.
91///
92/// Reads: `build:{host}` → store path to copy
93/// Writes output: `copy:{host}` → `String` (copied store path)
94pub struct NixCopyTask {
95    pub host: String,
96}
97
98impl NixCopyTask {
99    pub fn new(host: &str) -> Self {
100        Self {
101            host: host.to_string(),
102        }
103    }
104}
105
106impl DagTask for NixCopyTask {
107    fn execute(&self, ctx: &DagContext) -> TaskOutcome {
108        let config: FleetConfig = match ctx.get_state("fleet_config") {
109            Some(c) => c,
110            None => return TaskOutcome::Failed("fleet_config not in context".into()),
111        };
112
113        let toplevel_path: String = match ctx.get_output(&TaskId(format!("build:{}", self.host))) {
114            Some(p) => p,
115            None => {
116                return TaskOutcome::Failed(format!("no build output for {} in context", self.host))
117            }
118        };
119
120        let node = match config.nodes.get(&self.host) {
121            Some(n) => n,
122            None => return TaskOutcome::Failed(format!("unknown host: {}", self.host)),
123        };
124
125        let store_uri = format!("ssh-ng://{}@{}", node.target_user, node.target_host);
126
127        match copy::copy_closure(&toplevel_path, &store_uri) {
128            Ok(()) => {
129                ctx.set_output(TaskId(format!("copy:{}", self.host)), toplevel_path);
130                TaskOutcome::Success
131            }
132            Err(e) => TaskOutcome::Failed(format!("copy to {}: {}", self.host, e)),
133        }
134    }
135
136    fn describe(&self) -> String {
137        format!("copy closure to {}", self.host)
138    }
139}
140
141/// Activate the system profile on the target host.
142///
143/// Reads: `copy:{host}` → store path (the closure that was copied)
144/// Reads state: `action` → DeployAction
145pub struct NixActivateTask {
146    pub host: String,
147}
148
149impl NixActivateTask {
150    pub fn new(host: &str) -> Self {
151        Self {
152            host: host.to_string(),
153        }
154    }
155}
156
157impl DagTask for NixActivateTask {
158    fn execute(&self, ctx: &DagContext) -> TaskOutcome {
159        let config: FleetConfig = match ctx.get_state("fleet_config") {
160            Some(c) => c,
161            None => return TaskOutcome::Failed("fleet_config not in context".into()),
162        };
163
164        let action: DeployAction = match ctx.get_state("action") {
165            Some(a) => a,
166            None => return TaskOutcome::Failed("action not in context".into()),
167        };
168
169        // Build-only: skip activation
170        if action == DeployAction::Build {
171            return TaskOutcome::Success;
172        }
173
174        let toplevel_path: String = match ctx.get_output(&TaskId(format!("copy:{}", self.host))) {
175            Some(p) => p,
176            None => {
177                return TaskOutcome::Failed(format!("no copy output for {} in context", self.host))
178            }
179        };
180
181        let node = match config.nodes.get(&self.host) {
182            Some(n) => n,
183            None => return TaskOutcome::Failed(format!("unknown host: {}", self.host)),
184        };
185
186        match activate::activate_host(
187            &node.target_host,
188            &node.target_user,
189            &toplevel_path,
190            &node.profile_type,
191            action,
192        ) {
193            Ok(()) => TaskOutcome::Success,
194            Err(e) => TaskOutcome::Failed(format!("activate {}: {}", self.host, e)),
195        }
196    }
197
198    fn describe(&self) -> String {
199        format!("activate {}", self.host)
200    }
201}