Skip to content
Open
47 changes: 46 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ EOF
+ [Job Management](#job-management)
- [Monitoring Jobs with `squeue`](#monitoring-jobs-with-squeue)
- [Canceling Jobs with `scancel`](#canceling-jobs-with-scancel)
- [Inspecting Jobs with `scontrol`](#inspecting-jobs-with-scontrol)
- [Inspecting `sacct`](#inspecting-sacct)
+ [Error Handling](#error-handling)
+ [Project Growth](#project-growth)

Expand Down Expand Up @@ -348,7 +350,7 @@ In both cases, the default shell is modified in the Slurm object (*i.e.* applica

## Job Management

Simple Slurm provides a simple interface to Slurm's job management tools (`squeue` and `scance`l) to let you monitor and control running jobs.
Simple Slurm provides a simple interface to Slurm's job management tools (`squeue`, `scancel`, `sacct` and `scontrol`) to let you monitor and control running jobs.

### Monitoring Jobs with `squeue`

Expand Down Expand Up @@ -385,11 +387,54 @@ slurm.scancel.cancel_job(34987)
for job_id in [34987, 34988, 34989]:
slurm.scancel.cancel_job(job_id)


# Send SIGTERM before canceling (graceful termination)
slurm.scancel.signal_job(34987)
slurm.scancel.cancel_job(34987)
```

### Inspecting Jobs with `scontrol`

Check details and exit codes of job(s).

```python
from simple_slurm import Slurm

# existing job
slurm = Slurm()
slurm.scontrol.update(job_id=<job-id>)

# new jobs automatically assigns the job id
slurm = Slurm()
slurm.sbatch("echo hello world")
slurm.scontrol.update()

# output: list of job details for each job (in array)
```

### Inspecting `sacct`

Get job resource information using `sacct`

```python
from simple_slurm import Slurm

# existing job
slurm = Slurm()
slurm.sacct.update(job_id=<job-id>)

# new jobs automatically assigns the job id
slurm = Slurm()
slurm.sbatch("echo hello world")
slurm.sacct.update()

# custom outputs:
slurm = Slurm(kwargs_sacct={"fields": ["JobID", "JobName", "State", "Elapsed", "Start", "End"]})
slurm.sacct.update(job_id=<job-id>)

# output: list of resource requirements for each job (in array)
slurm.sacct
```

## Error Handling
The library does not raise specific exceptions for invalid Slurm arguments or job submission failures. Instead, it relies on the underlying Slurm commands (`sbatch`, `srun`, etc.) to handle errors. If a job submission fails, the error message from Slurm will be printed to the console.
Expand Down
2 changes: 1 addition & 1 deletion simple_slurm/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.6"
__version__ = "0.4.0"
39 changes: 35 additions & 4 deletions simple_slurm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from simple_slurm.squeue import SlurmSqueueWrapper
from simple_slurm.scancel import SlurmScancelWrapper
from simple_slurm.scontrol import SlurmScontrolWrapper
from simple_slurm.sacct import SlurmSacctWrapper

IGNORE_BOOLEAN = "IGNORE_BOOLEAN"

Expand All @@ -22,14 +24,20 @@ class Slurm:
Multiple syntaxes are allowed for defining the arguments.
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, kwargs_sacct={}, kwargs_scontrol={}, **kwargs):
"""Initialize the parser with the given arguments."""

# initialize parser
self.namespace = Namespace()
self.parser = argparse.ArgumentParser()
self.squeue = SlurmSqueueWrapper()
self.scancel = SlurmScancelWrapper()
self.sacct = SlurmSacctWrapper(**kwargs_sacct)
self.scontrol = SlurmScontrolWrapper(**kwargs_scontrol)

# add variables for debug
self.job_id = None
self.cmd = None

# set default shell
self.set_shell()
Expand Down Expand Up @@ -66,6 +74,13 @@ def __repr__(self) -> str:
params["run_cmds"] = self.run_cmds
return repr(params)

def __int__(self) -> int:
"""Return the job ID of the last submitted job when calling int() on the object."""

if self.job_id is None:
raise ValueError("Job ID is not set.")
return self.job_id

def _add_one_argument(self, key: str, value: str):
"""Parse the given key-value pair (the argument is given in key)."""
key, value = fmt_key(key), fmt_value(value)
Expand Down Expand Up @@ -225,7 +240,8 @@ def sbatch(
"EOF",
)
)
result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE)
result = subprocess.run(cmd, capture_output=True, shell=True)

# init for clarity
job_id = None
stdout = ""
Expand All @@ -239,12 +255,27 @@ def sbatch(
else:
success_msg = "Submitted batch job"
stdout = result.stdout.decode()
assert success_msg in stdout, result.stderr
if success_msg not in stdout:
error = result.stderr
if error is None:
error = result.stdout
if error is None:
error = f"Unknown error for cmd:\n {cmd}"
raise RuntimeError(
f"sbatch failed with error:\n{error.decode()}\nstdout:\n{stdout}\ncmd:\n{cmd}"
)
job_id = int(stdout.split(" ")[3])

# store job id and command
self.job_id = job_id
self.scontrol.job_id = job_id
self.cmd = cmd

# assertions
assert job_id is not None, "this should never happen, assert for linter"
if verbose:
print(stdout)
return job_id
return self


class Namespace:
Expand Down
129 changes: 129 additions & 0 deletions simple_slurm/sacct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import subprocess
from pprint import pformat


class SlurmSacctWrapper:
def __init__(self, fields: list = None, units: str = "M"):
"""
Wrap and parse 'sacct'

For example:
sacct -j 4315805 --format=JobID,JobName%20,State,Elapsed,Start,End,NNodes,AllocCPUs,ReqCPUs,ReqMem,MaxRSS,AllocTRES --units=M

Returns:
JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES
------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ----------
4315805 test_slurm COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 2048M billing=1+
4315805.bat+ batch COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 cpu=1,mem+
4315805.ext+ extern COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 billing=1+

First one is most critical. Return a list of the dictionaries (only first row). If array is submitted, they will have multiple entries.
"""
self.fields = fields or [
"JobID",
"JobName",
"State",
"Elapsed",
"Start",
"End",
"NNodes",
"AllocCPUs",
"ReqCPUs",
"ReqMem",
"MaxRSS",
]
self.command = ["sacct", "-j"]
self.units = units
self.sacct = None
self.job_id = None
self.exit_code = None

def __getitem__(self, key: str):
assert self.sacct is not None, (
"sacct not initialized. Call 'update(<job-id>)' first"
)
return [c[key] for c in self.sacct]

def __len__(self):
return len(self.sacct)

def __iter__(self):
assert self.sacct is not None, (
"sacct not initialized. Call 'update(<job-id>)' first"
)
return iter(self.sacct)

def __str__(self):
assert self.sacct is not None, (
"sacct not initialized. Call 'update(<job-id>)' first"
)
return pformat(self.sacct)

def __repr__(self):
return self.__str__()

def __call__(self, job_id: str = None):
"""Get the job information for a specific job ID"""
if job_id is not None:
self.job_id = job_id
self.update(job_id)
return self

def _get_job_id(self, job_id: str = None):
self.job_id = job_id or self.job_id
if self.job_id is None:
raise ValueError("Job ID not specified. Please provide a job ID.")
return self.job_id

def update(self, job_id: str = None):
"""Refresh the information from the current queue for the current user"""
self.job_id = self._get_job_id(job_id)
result = subprocess.run(
self.command
+ [
str(self.job_id),
f"--format={','.join(self.fields)}",
f"--units={self.units}",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Error running scontrol: {result.stderr}")
self.sacct = self._parse_output(result.stdout)

def _parse_output(self, output: str):
"""
Get a list of dictionaries from the sacct output. New lines are detected when "JobName" is not "batch" or "extern".

```
JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES
------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ----------
4319559_1 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+
4319559_1.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+
4319559_1.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+
4319559_2 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+
4319559_2.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+
4319559_2.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ ...
```
"""

skip = ["batch", "extern"]
acct = []
for i, line in enumerate(output.splitlines()):
# set header
if i == 0:
header = line.split()
assert "JobName" in header, (
f"JobName not found in header: {header}. Make sure to include it in the fields."
)
continue
if i == 1:
continue
# add line
acct_one = dict(zip(header, line.split()))
if acct_one["JobName"] not in skip:
acct.append(acct_one)

return acct
11 changes: 6 additions & 5 deletions simple_slurm/scancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ class SlurmScancelWrapper:
def __init__(self, staledelta=timedelta(minutes=30)):
self.stale_delta = staledelta

def cancel_job(self, job_id: int):
"""Sends a straightforward scancel to a job"""
job_id = str(job_id)
def cancel_job(self, job_id):
"""Sends a straightforward scancel to a job. job_id can be a str, int or slurm.Slurm() object"""
# int(slurm.Slurm)) returns a job id
job_id = str(int(job_id))
result = subprocess.run(
["scancel", job_id],
stdout=subprocess.PIPE,
Expand All @@ -27,12 +28,12 @@ def cancel_job(self, job_id: int):
if result.returncode != 0:
raise RuntimeError(f"Error cancelling job: {result.stderr.strip()}")

def signal_job(self, job_id: int):
def signal_job(self, job_id):
"""First time it is sent to a job, tries send a SIGTERM to the job id
If sent again to the same job, attempts a SIGKILL instead
if that fails as well, involkes scancel without term arguments
"""
job_id = str(job_id)
job_id = str(int(job_id))
self.prune_old_jobs()
signal = "--signal=TERM"
if job_id not in self.sigmtems:
Expand Down
Loading