Skip to main content

consortium_slurm/
lib.rs

1//! # consortium-slurm
2//!
3//! Slurm job orchestration with nix-built environments.
4//!
5//! Uses nix to build hermetic job environments, copies them to the submit
6//! node, then submits jobs with slurm. Supports arbitrary DAG pipelines
7//! (e.g., RNA-seq bioinformatics workflows) via DagBuilder.
8
9pub mod error;
10pub mod tasks;
11
12pub use error::{Result, SlurmError};
13
14use consortium::dag::{DagBuilder, DagContext, DagReport, ErrorPolicy};
15use consortium_nix::FleetConfig;
16
17/// Submit a single slurm job with a nix-built environment.
18pub fn submit_job(
19    config: &FleetConfig,
20    job_name: &str,
21    script: &str,
22    partition: Option<&str>,
23    wait: bool,
24) -> Result<DagReport> {
25    let slurm_config = config.slurm_config.as_ref().ok_or(SlurmError::NoConfig)?;
26
27    let ctx = DagContext::new();
28    ctx.set_state("fleet_config", config.clone());
29
30    let mut dag = DagBuilder::new();
31
32    // Build job environment
33    let build_id = format!("build-job-env:{}", job_name);
34    dag.add_task(
35        &build_id,
36        tasks::NixBuildJobEnvTask::new(job_name, &config.flake_uri),
37    );
38
39    // Copy to submit node
40    let copy_id = format!("copy-job-env:{}", job_name);
41    dag.add_task(
42        &copy_id,
43        tasks::NixCopyToSubmitTask {
44            job_name: job_name.to_string(),
45            submit_host: slurm_config.submit_node.clone(),
46            submit_user: slurm_config.submit_user.clone(),
47        },
48    );
49    dag.add_dep(&copy_id, &build_id);
50
51    // Submit
52    let submit_id = format!("slurm-submit:{}", job_name);
53    dag.add_task(
54        &submit_id,
55        tasks::SlurmSubmitTask {
56            job_name: job_name.to_string(),
57            script: script.to_string(),
58            partition: partition.map(|s| s.to_string()),
59            submit_host: slurm_config.submit_node.clone(),
60            submit_user: slurm_config.submit_user.clone(),
61        },
62    );
63    dag.add_dep(&submit_id, &copy_id);
64
65    // Wait (optional)
66    if wait {
67        let wait_id = format!("slurm-wait:{}", job_name);
68        dag.add_task(
69            &wait_id,
70            tasks::SlurmWaitTask::new(
71                job_name,
72                &slurm_config.submit_node,
73                &slurm_config.submit_user,
74            ),
75        );
76        dag.add_dep(&wait_id, &submit_id);
77    }
78
79    dag.error_policy(ErrorPolicy::FailFast);
80    dag.context(ctx);
81
82    let report = dag
83        .build()
84        .map_err(|e| SlurmError::Dag(e.to_string()))?
85        .run()
86        .map_err(|e| SlurmError::Dag(e.to_string()))?;
87
88    Ok(report)
89}