1pub mod error;
10pub mod tasks;
11
12pub use error::{Result, SlurmError};
13
14use consortium::dag::{DagBuilder, DagContext, DagReport, ErrorPolicy};
15use consortium_nix::FleetConfig;
16
17pub fn submit_job(
19 config: &FleetConfig,
20 job_name: &str,
21 script: &str,
22 partition: Option<&str>,
23 wait: bool,
24) -> Result<DagReport> {
25 let slurm_config = config.slurm_config.as_ref().ok_or(SlurmError::NoConfig)?;
26
27 let ctx = DagContext::new();
28 ctx.set_state("fleet_config", config.clone());
29
30 let mut dag = DagBuilder::new();
31
32 let build_id = format!("build-job-env:{}", job_name);
34 dag.add_task(
35 &build_id,
36 tasks::NixBuildJobEnvTask::new(job_name, &config.flake_uri),
37 );
38
39 let copy_id = format!("copy-job-env:{}", job_name);
41 dag.add_task(
42 ©_id,
43 tasks::NixCopyToSubmitTask {
44 job_name: job_name.to_string(),
45 submit_host: slurm_config.submit_node.clone(),
46 submit_user: slurm_config.submit_user.clone(),
47 },
48 );
49 dag.add_dep(©_id, &build_id);
50
51 let submit_id = format!("slurm-submit:{}", job_name);
53 dag.add_task(
54 &submit_id,
55 tasks::SlurmSubmitTask {
56 job_name: job_name.to_string(),
57 script: script.to_string(),
58 partition: partition.map(|s| s.to_string()),
59 submit_host: slurm_config.submit_node.clone(),
60 submit_user: slurm_config.submit_user.clone(),
61 },
62 );
63 dag.add_dep(&submit_id, ©_id);
64
65 if wait {
67 let wait_id = format!("slurm-wait:{}", job_name);
68 dag.add_task(
69 &wait_id,
70 tasks::SlurmWaitTask::new(
71 job_name,
72 &slurm_config.submit_node,
73 &slurm_config.submit_user,
74 ),
75 );
76 dag.add_dep(&wait_id, &submit_id);
77 }
78
79 dag.error_policy(ErrorPolicy::FailFast);
80 dag.context(ctx);
81
82 let report = dag
83 .build()
84 .map_err(|e| SlurmError::Dag(e.to_string()))?
85 .run()
86 .map_err(|e| SlurmError::Dag(e.to_string()))?;
87
88 Ok(report)
89}