diff --git a/README.md b/README.md index 6adadf3..32cbcb1 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ EOF + [Job Management](#job-management) - [Monitoring Jobs with `squeue`](#monitoring-jobs-with-squeue) - [Canceling Jobs with `scancel`](#canceling-jobs-with-scancel) + - [Inspecting Jobs with `scontrol`](#inspecting-jobs-with-scontrol) + - [Inspecting `sacct`](#inspecting-sacct) + [Error Handling](#error-handling) + [Project Growth](#project-growth) @@ -348,7 +350,7 @@ In both cases, the default shell is modified in the Slurm object (*i.e.* applica ## Job Management -Simple Slurm provides a simple interface to Slurm's job management tools (`squeue` and `scance`l) to let you monitor and control running jobs. +Simple Slurm provides a simple interface to Slurm's job management tools (`squeue`, `scancel`, `sacct` and `scontrol`) to let you monitor and control running jobs. ### Monitoring Jobs with `squeue` @@ -385,11 +387,54 @@ slurm.scancel.cancel_job(34987) for job_id in [34987, 34988, 34989]: slurm.scancel.cancel_job(job_id) + # Send SIGTERM before canceling (graceful termination) slurm.scancel.signal_job(34987) slurm.scancel.cancel_job(34987) ``` +### Inspecting Jobs with `scontrol` + +Check details and exit codes of job(s). + +```python +from simple_slurm import Slurm + +# existing job +slurm = Slurm() +slurm.scontrol.update(job_id=) + +# new jobs automatically assigns the job id +slurm = Slurm() +slurm.sbatch("echo hello world") +slurm.scontrol.update() + +# output: list of job details for each job (in array) +``` + +### Inspecting `sacct` + +Get job resource information using `sacct` + +```python +from simple_slurm import Slurm + +# existing job +slurm = Slurm() +slurm.sacct.update(job_id=) + +# new jobs automatically assigns the job id +slurm = Slurm() +slurm.sbatch("echo hello world") +slurm.sacct.update() + +# custom outputs: +slurm = Slurm(kwargs_sacct={"fields": ["JobID", "JobName", "State", "Elapsed", "Start", "End"]}) +slurm.sacct.update(job_id=) + +# output: list of resource requirements for each job (in array) +slurm.sacct +``` ## Error Handling The library does not raise specific exceptions for invalid Slurm arguments or job submission failures. Instead, it relies on the underlying Slurm commands (`sbatch`, `srun`, etc.) to handle errors. If a job submission fails, the error message from Slurm will be printed to the console. diff --git a/simple_slurm/__about__.py b/simple_slurm/__about__.py index d7b30e1..6a9beea 100644 --- a/simple_slurm/__about__.py +++ b/simple_slurm/__about__.py @@ -1 +1 @@ -__version__ = "0.3.6" +__version__ = "0.4.0" diff --git a/simple_slurm/core.py b/simple_slurm/core.py index 63d8e1d..b2eb610 100644 --- a/simple_slurm/core.py +++ b/simple_slurm/core.py @@ -7,6 +7,8 @@ from simple_slurm.squeue import SlurmSqueueWrapper from simple_slurm.scancel import SlurmScancelWrapper +from simple_slurm.scontrol import SlurmScontrolWrapper +from simple_slurm.sacct import SlurmSacctWrapper IGNORE_BOOLEAN = "IGNORE_BOOLEAN" @@ -22,7 +24,7 @@ class Slurm: Multiple syntaxes are allowed for defining the arguments. """ - def __init__(self, *args, **kwargs): + def __init__(self, *args, kwargs_sacct={}, kwargs_scontrol={}, **kwargs): """Initialize the parser with the given arguments.""" # initialize parser @@ -30,6 +32,12 @@ def __init__(self, *args, **kwargs): self.parser = argparse.ArgumentParser() self.squeue = SlurmSqueueWrapper() self.scancel = SlurmScancelWrapper() + self.sacct = SlurmSacctWrapper(**kwargs_sacct) + self.scontrol = SlurmScontrolWrapper(**kwargs_scontrol) + + # add variables for debug + self.job_id = None + self.cmd = None # set default shell self.set_shell() @@ -66,6 +74,13 @@ def __repr__(self) -> str: params["run_cmds"] = self.run_cmds return repr(params) + def __int__(self) -> int: + """Return the job ID of the last submitted job when calling int() on the object.""" + + if self.job_id is None: + raise ValueError("Job ID is not set.") + return self.job_id + def _add_one_argument(self, key: str, value: str): """Parse the given key-value pair (the argument is given in key).""" key, value = fmt_key(key), fmt_value(value) @@ -225,7 +240,8 @@ def sbatch( "EOF", ) ) - result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) + result = subprocess.run(cmd, capture_output=True, shell=True) + # init for clarity job_id = None stdout = "" @@ -239,12 +255,27 @@ def sbatch( else: success_msg = "Submitted batch job" stdout = result.stdout.decode() - assert success_msg in stdout, result.stderr + if success_msg not in stdout: + error = result.stderr + if error is None: + error = result.stdout + if error is None: + error = f"Unknown error for cmd:\n {cmd}" + raise RuntimeError( + f"sbatch failed with error:\n{error.decode()}\nstdout:\n{stdout}\ncmd:\n{cmd}" + ) job_id = int(stdout.split(" ")[3]) + + # store job id and command + self.job_id = job_id + self.scontrol.job_id = job_id + self.cmd = cmd + + # assertions assert job_id is not None, "this should never happen, assert for linter" if verbose: print(stdout) - return job_id + return self class Namespace: diff --git a/simple_slurm/sacct.py b/simple_slurm/sacct.py new file mode 100644 index 0000000..4246a90 --- /dev/null +++ b/simple_slurm/sacct.py @@ -0,0 +1,129 @@ +import subprocess +from pprint import pformat + + +class SlurmSacctWrapper: + def __init__(self, fields: list = None, units: str = "M"): + """ + Wrap and parse 'sacct' + + For example: + sacct -j 4315805 --format=JobID,JobName%20,State,Elapsed,Start,End,NNodes,AllocCPUs,ReqCPUs,ReqMem,MaxRSS,AllocTRES --units=M + + Returns: + JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES + ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- + 4315805 test_slurm COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 2048M billing=1+ + 4315805.bat+ batch COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 cpu=1,mem+ + 4315805.ext+ extern COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 billing=1+ + + First one is most critical. Return a list of the dictionaries (only first row). If array is submitted, they will have multiple entries. + """ + self.fields = fields or [ + "JobID", + "JobName", + "State", + "Elapsed", + "Start", + "End", + "NNodes", + "AllocCPUs", + "ReqCPUs", + "ReqMem", + "MaxRSS", + ] + self.command = ["sacct", "-j"] + self.units = units + self.sacct = None + self.job_id = None + self.exit_code = None + + def __getitem__(self, key: str): + assert self.sacct is not None, ( + "sacct not initialized. Call 'update()' first" + ) + return [c[key] for c in self.sacct] + + def __len__(self): + return len(self.sacct) + + def __iter__(self): + assert self.sacct is not None, ( + "sacct not initialized. Call 'update()' first" + ) + return iter(self.sacct) + + def __str__(self): + assert self.sacct is not None, ( + "sacct not initialized. Call 'update()' first" + ) + return pformat(self.sacct) + + def __repr__(self): + return self.__str__() + + def __call__(self, job_id: str = None): + """Get the job information for a specific job ID""" + if job_id is not None: + self.job_id = job_id + self.update(job_id) + return self + + def _get_job_id(self, job_id: str = None): + self.job_id = job_id or self.job_id + if self.job_id is None: + raise ValueError("Job ID not specified. Please provide a job ID.") + return self.job_id + + def update(self, job_id: str = None): + """Refresh the information from the current queue for the current user""" + self.job_id = self._get_job_id(job_id) + result = subprocess.run( + self.command + + [ + str(self.job_id), + f"--format={','.join(self.fields)}", + f"--units={self.units}", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(f"Error running scontrol: {result.stderr}") + self.sacct = self._parse_output(result.stdout) + + def _parse_output(self, output: str): + """ + Get a list of dictionaries from the sacct output. New lines are detected when "JobName" is not "batch" or "extern". + + ``` + JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES + ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- + 4319559_1 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ + 4319559_1.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ + 4319559_1.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ + 4319559_2 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ + 4319559_2.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ + 4319559_2.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ ... + ``` + """ + + skip = ["batch", "extern"] + acct = [] + for i, line in enumerate(output.splitlines()): + # set header + if i == 0: + header = line.split() + assert "JobName" in header, ( + f"JobName not found in header: {header}. Make sure to include it in the fields." + ) + continue + if i == 1: + continue + # add line + acct_one = dict(zip(header, line.split())) + if acct_one["JobName"] not in skip: + acct.append(acct_one) + + return acct diff --git a/simple_slurm/scancel.py b/simple_slurm/scancel.py index 0e2e1b6..c4c016a 100644 --- a/simple_slurm/scancel.py +++ b/simple_slurm/scancel.py @@ -15,9 +15,10 @@ class SlurmScancelWrapper: def __init__(self, staledelta=timedelta(minutes=30)): self.stale_delta = staledelta - def cancel_job(self, job_id: int): - """Sends a straightforward scancel to a job""" - job_id = str(job_id) + def cancel_job(self, job_id): + """Sends a straightforward scancel to a job. job_id can be a str, int or slurm.Slurm() object""" + # int(slurm.Slurm)) returns a job id + job_id = str(int(job_id)) result = subprocess.run( ["scancel", job_id], stdout=subprocess.PIPE, @@ -27,12 +28,12 @@ def cancel_job(self, job_id: int): if result.returncode != 0: raise RuntimeError(f"Error cancelling job: {result.stderr.strip()}") - def signal_job(self, job_id: int): + def signal_job(self, job_id): """First time it is sent to a job, tries send a SIGTERM to the job id If sent again to the same job, attempts a SIGKILL instead if that fails as well, involkes scancel without term arguments """ - job_id = str(job_id) + job_id = str(int(job_id)) self.prune_old_jobs() signal = "--signal=TERM" if job_id not in self.sigmtems: diff --git a/simple_slurm/scontrol.py b/simple_slurm/scontrol.py new file mode 100644 index 0000000..4c216cb --- /dev/null +++ b/simple_slurm/scontrol.py @@ -0,0 +1,98 @@ +import subprocess +from pprint import pformat + + +class SlurmScontrolWrapper: + def __init__(self): + self.command = ["scontrol", "show", "job"] + self.control = None + self.job_id = None + self.exit_code = None + + def __getitem__(self, key: str): + assert self.control is not None, ( + "scontrol not initialized. Call 'update()' first" + ) + return [c[key] for c in self.control] + + def __len__(self): + return len(self.control) + + def __iter__(self): + assert self.control is not None, ( + "scontrol not initialized. Call 'update()' first" + ) + return iter(self.control) + + def __str__(self): + assert self.control is not None, ( + "scontrol not initialized. Call 'update()' first" + ) + return pformat(self.control) + + def __repr__(self): + return self.__str__() + + def __call__(self, job_id: str = None): + """Get the job information for a specific job ID""" + if job_id is not None: + self.job_id = job_id + self.update(job_id) + return self + + def _get_job_id(self, job_id: str = None): + self.job_id = job_id or self.job_id + if self.job_id is None: + raise ValueError("Job ID not specified. Please provide a job ID.") + return self.job_id + + def update(self, job_id: str = None): + """Refresh the information from the current queue for the current user""" + job_id = self._get_job_id(job_id) + result = subprocess.run( + self.command + [str(job_id)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(f"Error running scontrol: {result.stderr}") + self.control = self._parse_output(result.stdout) + # set exit code [0:0, 2:0]... set it to the max value. should only be 0 if ALL jobs are finished successfully + self.exit_code = max([int(c) for e in self["ExitCode"] for c in e.split(":")]) + if self["JobState"] == "PENDING": + self.exit_code = None + + def _parse_output(self, output: str): + """ + Get a list of dictionaries from the scontrol output. New blocks are separated by JobId=. + + ``` + JobId=3609270 ArrayJobId=3609269 ArrayTaskId=0 JobName=test_job + UserId=krauset(164084959) GroupId=krauset(164084959) MCS_label=N/A + ... + ``` + """ + + def _parse_line(line): + """Parse key=value pairs from a line of scontrol output""" + pairs = line.strip().split() + for pair in pairs: + if "=" not in pair: + continue + key, value = pair.split("=", 1) + yield key, value + + control = [] + control_i = {} + for i, line in enumerate(output.splitlines()): + # start new block + if "JobId=" in line: + if i > 0: + control.append(control_i) + control_i = {} + # fill data + for key, value in _parse_line(line): + control_i[key] = value + control.append(control_i) + return control diff --git a/simple_slurm/squeue.py b/simple_slurm/squeue.py index e33fe1c..4352e34 100644 --- a/simple_slurm/squeue.py +++ b/simple_slurm/squeue.py @@ -50,7 +50,7 @@ def _parse_output(self, output: str): ) jobs = {} for row in reader: - jobs[int(row["JOBID"])] = row + jobs[row["JOBID"]] = row return jobs def display_jobs(self): diff --git a/test/test_cli.py b/test/test_cli.py index b3873d8..64d3147 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -1,8 +1,5 @@ import contextlib import io -import os -import shutil -import subprocess import sys import unittest from unittest.mock import patch diff --git a/test/test_core.py b/test/test_core.py index f6aa322..c388781 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -2,8 +2,6 @@ import datetime import io import os -import shutil -import subprocess import unittest from simple_slurm import Slurm @@ -213,11 +211,12 @@ def test_14_srun_returncode(self): def test_15_sbatch_execution(self): slurm = Slurm(contiguous=True) - job_id, stdout = self.__run_sbatch(slurm) + job, stdout = self.__run_sbatch(slurm) self.assertFalse(slurm.is_parsable) - self.assertIsInstance(job_id, int) - self.assertIn(f"Submitted batch job {job_id}", stdout) + self.assertIsInstance(job, Slurm) + self.assertIsInstance(int(job), int) + self.assertIn(f"Submitted batch job {int(job)}", stdout) def test_16_parse_timedelta(self): slurm = Slurm( @@ -263,11 +262,12 @@ def test_19_sbatch_execution_with_job_file(self): job_file = "script.sh" slurm = Slurm(contiguous=True) - job_id, stdout = self.__run_sbatch(slurm, job_file=job_file) + job, stdout = self.__run_sbatch(slurm, job_file=job_file) self.assertFalse(slurm.is_parsable) - self.assertIsInstance(job_id, int) - self.assertIn(f"Submitted batch job {job_id}", stdout) + self.assertIsInstance(job, Slurm) + self.assertIsInstance(int(job), int) + self.assertIn(f"Submitted batch job {int(job)}", stdout) with open(job_file, "r") as fid: job_contents = fid.read() @@ -331,18 +331,19 @@ def test_21_add_cmd_multiple(self): def test_22_parsable_sbatch_execution(self): slurm = Slurm(contiguous=True, parsable=True) - job_id, stdout = self.__run_sbatch(slurm) + job, stdout = self.__run_sbatch(slurm) self.assertTrue(slurm.is_parsable) - self.assertIsInstance(job_id, int) - self.assertEqual(f"{job_id}\n", stdout) + self.assertIsInstance(job, Slurm) + self.assertIsInstance(int(job), int) + self.assertEqual(f"{int(job)}\n", stdout) def __run_sbatch(self, slurm, *args, **kwargs): with io.StringIO() as buffer: with contextlib.redirect_stdout(buffer): - job_id = slurm.sbatch("echo Hello!", *args, **kwargs) + job = slurm.sbatch("echo Hello!", *args, **kwargs) stdout = buffer.getvalue() - return job_id, stdout + return job, stdout if __name__ == "__main__":