From 5079386934a6f07c36a92529825a3a9b2db00291 Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 11:04:37 -0400 Subject: [PATCH 01/11] Add sacct wrapper class --- simple_slurm/sacct.py | 108 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 simple_slurm/sacct.py diff --git a/simple_slurm/sacct.py b/simple_slurm/sacct.py new file mode 100644 index 0000000..342d81b --- /dev/null +++ b/simple_slurm/sacct.py @@ -0,0 +1,108 @@ +import os +import subprocess +import csv +from io import StringIO +from pprint import pformat + + +class SlurmSacctWrapper: + def __init__(self, fields: list = None, units: str = "M"): + """ + Wrap and parse 'sacct' + + For example: + sacct -j 4315805 --format=JobID,JobName%20,State,Elapsed,Start,End,NNodes,AllocCPUs,ReqCPUs,ReqMem,MaxRSS,AllocTRES --units=M + + Returns: + JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES + ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- + 4315805 test_slurm COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 2048M billing=1+ + 4315805.bat+ batch COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 cpu=1,mem+ + 4315805.ext+ extern COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 billing=1+ + + First one is most critical. Return a list of the dictionaries (only first row). If array is submitted, they will have multiple entries. + """ + self.fields = fields or ["JobID", "JobName", "State", "Elapsed", "Start", "End", "NNodes", "AllocCPUs", "ReqCPUs", "ReqMem", "MaxRSS"] + self.command = ["sacct", "-j"] + self.units = units + self.sacct = None + self.job_id = None + self.exit_code = None + + def __getitem__(self, key: str): + assert self.sacct is not None, f"sacct not initialized. Call 'update()' first" + return [c[key] for c in self.sacct] + + def __len__(self): + return len(self.sacct) + + def __iter__(self): + assert self.sacct is not None, f"sacct not initialized. Call 'update()' first" + return iter(self.sacct) + + def __str__(self): + assert self.sacct is not None, f"sacct not initialized. Call 'update()' first" + return pformat(self.sacct) + + def __repr__(self): + return self.__str__() + + def __call__(self, job_id: str = None): + """Get the job information for a specific job ID""" + if job_id is not None: + self.job_id = job_id + self.update(job_id) + return self + + def _get_job_id(self, job_id: str = None): + self.job_id = job_id or self.job_id + if self.job_id is None: + raise ValueError("Job ID not specified. Please provide a job ID.") + return self.job_id + + def update(self, job_id: str = None): + """Refresh the information from the current queue for the current user""" + self.job_id = self._get_job_id(job_id) + result = subprocess.run( + self.command + [str(self.job_id), f"--format={','.join(self.fields)}", f"--units={self.units}"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(f"Error running scontrol: {result.stderr}") + self.sacct = self._parse_output(result.stdout) + return self + + def _parse_output(self, output: str): + """ + Get a list of dictionaries from the sacct output. New lines are detected when "JobName" is not "batch" or "extern". + + ``` + JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES + ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- + 4319559_1 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ + 4319559_1.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ + 4319559_1.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ + 4319559_2 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ + 4319559_2.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ + 4319559_2.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ ... + ``` + """ + + skip = ["batch", "extern"] + acct = [] + for i, line in enumerate(output.splitlines()): + # set header + if i == 0: + header = line.split() + assert "JobName" in header, f"JobName not found in header: {header}. Make sure to include it in the fields." + continue + if i == 1: + continue + # add line + acct_one = dict(zip(header, line.split())) + if acct_one["JobName"] not in skip: + acct.append(acct_one) + + return acct From b0485756a71de1224602c6a3306e21b289d3b3f8 Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 11:04:53 -0400 Subject: [PATCH 02/11] Add scontrol wrapper class --- simple_slurm/scontrol.py | 93 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 simple_slurm/scontrol.py diff --git a/simple_slurm/scontrol.py b/simple_slurm/scontrol.py new file mode 100644 index 0000000..67a844a --- /dev/null +++ b/simple_slurm/scontrol.py @@ -0,0 +1,93 @@ +import os +import subprocess +import csv +from io import StringIO +from pprint import pformat + + +class SlurmScontrolWrapper: + def __init__(self): + self.command = ["scontrol", "show", "job"] + self.control = None + self.job_id = None + self.exit_code = None + + def __getitem__(self, key: str): + assert self.control is not None, f"scontrol not initialized. Call 'update()' first" + return [c[key] for c in self.control] + + def __len__(self): + return len(self.control) + + def __iter__(self): + assert self.control is not None, f"scontrol not initialized. Call 'update()' first" + return iter(self.control) + + def __str__(self): + assert self.control is not None, f"scontrol not initialized. Call 'update()' first" + return pformat(self.control) + + def __repr__(self): + return self.__str__() + + def __call__(self, job_id: str = None): + """Get the job information for a specific job ID""" + if job_id is not None: + self.job_id = job_id + self.update(job_id) + return self + + def _get_job_id(self, job_id: str = None): + self.job_id = job_id or self.job_id + if self.job_id is None: + raise ValueError("Job ID not specified. Please provide a job ID.") + return self.job_id + + def update(self, job_id: str = None): + """Refresh the information from the current queue for the current user""" + job_id = self._get_job_id(job_id) + result = subprocess.run( + self.command + [str(job_id)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(f"Error running scontrol: {result.stderr}") + self.control = self._parse_output(result.stdout) + # set exit code [0:0, 2:0]... set it to the max value. should only be 0 if ALL jobs are finished successfully + self.exit_code = max([int(c) for e in self["ExitCode"] for c in e.split(":")]) + return self + + def _parse_output(self, output: str): + """ + Get a list of dictionaries from the scontrol output. New blocks are separated by JobId=. + + ``` + JobId=3609270 ArrayJobId=3609269 ArrayTaskId=0 JobName=test_job + UserId=krauset(164084959) GroupId=krauset(164084959) MCS_label=N/A + ... + ``` + """ + + def _parse_line(line): + """Parse key=value pairs from a line of scontrol output""" + pairs = line.strip().split() + for pair in pairs: + if "=" not in pair: + continue + key, value = pair.split("=", 1) + yield key, value + + control = [] + for i, line in enumerate(output.splitlines()): + # start new block + if "JobId=" in line: + if i > 0: + control.append(control_i) + control_i = {} + # fill data + for key, value in _parse_line(line): + control_i[key] = value + control.append(control_i) + return control From f5234c1a14c2a3fd4152c82302b72832bf9d93e8 Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 11:05:29 -0400 Subject: [PATCH 03/11] Integrate sacct and scontrol to core --- simple_slurm/core.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/simple_slurm/core.py b/simple_slurm/core.py index e32b662..205548f 100644 --- a/simple_slurm/core.py +++ b/simple_slurm/core.py @@ -7,6 +7,8 @@ from simple_slurm.squeue import SlurmSqueueWrapper from simple_slurm.scancel import SlurmScancelWrapper +from simple_slurm.scontrol import SlurmScontrolWrapper +from simple_slurm.sacct import SlurmSacctWrapper IGNORE_BOOLEAN = "IGNORE_BOOLEAN" @@ -30,6 +32,12 @@ def __init__(self, *args, **kwargs): self.parser = argparse.ArgumentParser() self.squeue = SlurmSqueueWrapper() self.scancel = SlurmScancelWrapper() + self.sacct = SlurmSacctWrapper() + self.scontrol = SlurmScontrolWrapper() + + # add variables for debug + self.job_id = None + self.cmd = None # add arguments into argparser for keys in read_simple_txt("arguments.txt"): @@ -210,7 +218,8 @@ def sbatch( "EOF", ) ) - result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) + result = subprocess.run(cmd, capture_output=True, shell=True) + # init for clarity job_id = None stdout = "" @@ -224,12 +233,28 @@ def sbatch( else: success_msg = "Submitted batch job" stdout = result.stdout.decode() - assert success_msg in stdout, result.stderr + if success_msg not in stdout: + error = result.stderr + if error is None: + error = result.stdout + if error is None: + error = f"Unknown error for cmd:\n {cmd}" + raise RuntimeError( + f"sbatch failed with error:\n{error.decode()}\nstdout:\n{stdout}\ncmd:\n{cmd}" + ) job_id = int(stdout.split(" ")[3]) + + # store job id and command + self.job_id = job_id + self.scontrol.job_id = job_id + self.cmd = cmd + + # assertions assert job_id is not None, "this should never happen, assert for linter" if verbose: print(stdout) - return job_id + return self + class Namespace: From ccc01c31668cc75c10fc0f417f08f9992b73aaab Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 11:25:23 -0400 Subject: [PATCH 04/11] Update readme --- README.md | 45 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c3ff836..00b52cf 100644 --- a/README.md +++ b/README.md @@ -337,7 +337,7 @@ SLURM_ARRAY_JOB_ID | job array's master job id number ## Job Management -Simple Slurm provides a simple interface to Slurm's job management tools (`squeue` and `scance`l) to let you monitor and control running jobs. +Simple Slurm provides a simple interface to Slurm's job management tools (`squeue`, `scancel`, `sacct` and `scontrol`) to let you monitor and control running jobs. ### Monitoring Jobs with `squeue` @@ -374,11 +374,54 @@ slurm.scancel.cancel_job(34987) for job_id in [34987, 34988, 34989]: slurm.scancel.cancel_job(job_id) + # Send SIGTERM before canceling (graceful termination) slurm.scancel.signal_job(34987) slurm.scancel.cancel_job(34987) ``` +### Inspecting Jobs with `scontrol` + +Check details and exit codes of job(s). + +```python +from simple_slurm import Slurm + +# existing job +slurm = Slurm() +slurm.scontrol.update(job_id=) + +# new jobs automatically assigns the job id +slurm = Slurm() +slurm.sbatch("echo hello world") +slurm.scontrol.update() + +# output: list of job details for each job (in array) +``` + +### Inspecting `sacct` + +Get job resource information using `sacct` + +```python +from simple_slurm import Slurm + +# existing job +slurm = Slurm() +slurm.sacct.update(job_id=) + +# new jobs automatically assigns the job id +slurm = Slurm() +slurm.sbatch("echo hello world") +slurm.sacct.update() + +# custom outputs: +slurm = Slurm(kwargs_sacct={"fields": ["JobID", "JobName", "State", "Elapsed", "Start", "End"]}) +slurm.sacct.update(job_id=) + +# output: list of resource requirements for each job (in array) +slurm.sacct +``` ## Error Handling The library does not raise specific exceptions for invalid Slurm arguments or job submission failures. Instead, it relies on the underlying Slurm commands (`sbatch`, `srun`, etc.) to handle errors. If a job submission fails, the error message from Slurm will be printed to the console. From fba6b950e1e8db8999bf04a78211bcbc56d1326c Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 11:25:43 -0400 Subject: [PATCH 05/11] Small adjustments and debugs --- simple_slurm/sacct.py | 1 - simple_slurm/scontrol.py | 3 ++- simple_slurm/squeue.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/simple_slurm/sacct.py b/simple_slurm/sacct.py index 342d81b..3776272 100644 --- a/simple_slurm/sacct.py +++ b/simple_slurm/sacct.py @@ -72,7 +72,6 @@ def update(self, job_id: str = None): if result.returncode != 0: raise RuntimeError(f"Error running scontrol: {result.stderr}") self.sacct = self._parse_output(result.stdout) - return self def _parse_output(self, output: str): """ diff --git a/simple_slurm/scontrol.py b/simple_slurm/scontrol.py index 67a844a..3c94b9f 100644 --- a/simple_slurm/scontrol.py +++ b/simple_slurm/scontrol.py @@ -57,7 +57,8 @@ def update(self, job_id: str = None): self.control = self._parse_output(result.stdout) # set exit code [0:0, 2:0]... set it to the max value. should only be 0 if ALL jobs are finished successfully self.exit_code = max([int(c) for e in self["ExitCode"] for c in e.split(":")]) - return self + if self["JobState"] == "PENDING": + self.exit_code = None def _parse_output(self, output: str): """ diff --git a/simple_slurm/squeue.py b/simple_slurm/squeue.py index e33fe1c..4352e34 100644 --- a/simple_slurm/squeue.py +++ b/simple_slurm/squeue.py @@ -50,7 +50,7 @@ def _parse_output(self, output: str): ) jobs = {} for row in reader: - jobs[int(row["JOBID"])] = row + jobs[row["JOBID"]] = row return jobs def display_jobs(self): From 4d9be6eee1af588a006991af22c56da7051591db Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 11:26:01 -0400 Subject: [PATCH 06/11] Support custom kwargs for sacct and scontrol --- simple_slurm/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/simple_slurm/core.py b/simple_slurm/core.py index 205548f..2c618df 100644 --- a/simple_slurm/core.py +++ b/simple_slurm/core.py @@ -24,7 +24,7 @@ class Slurm: Multiple syntaxes are allowed for defining the arguments. """ - def __init__(self, *args, **kwargs): + def __init__(self, kwargs_sacct={}, kwargs_scontrol={}, *args, **kwargs): """Initialize the parser with the given arguments.""" # initialize parser @@ -32,8 +32,8 @@ def __init__(self, *args, **kwargs): self.parser = argparse.ArgumentParser() self.squeue = SlurmSqueueWrapper() self.scancel = SlurmScancelWrapper() - self.sacct = SlurmSacctWrapper() - self.scontrol = SlurmScontrolWrapper() + self.sacct = SlurmSacctWrapper(**kwargs_sacct) + self.scontrol = SlurmScontrolWrapper(**kwargs_scontrol) # add variables for debug self.job_id = None From 34328030253ffb824fef3374a2280444349d95cb Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 15:12:23 -0400 Subject: [PATCH 07/11] Support job_id being a string, integer or Slurm class --- simple_slurm/core.py | 10 +++++++++- simple_slurm/scancel.py | 12 +++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/simple_slurm/core.py b/simple_slurm/core.py index 5d15696..7757e51 100644 --- a/simple_slurm/core.py +++ b/simple_slurm/core.py @@ -24,7 +24,7 @@ class Slurm: Multiple syntaxes are allowed for defining the arguments. """ - def __init__(self, kwargs_sacct={}, kwargs_scontrol={}, *args, **kwargs): + def __init__(self, *args, kwargs_sacct={}, kwargs_scontrol={}, **kwargs): """Initialize the parser with the given arguments.""" # initialize parser @@ -74,6 +74,14 @@ def __repr__(self) -> str: params["run_cmds"] = self.run_cmds return repr(params) + def __int__(self) -> int: + """Return the job ID of the last submitted job when calling int() on the object.""" + + + if self.job_id is None: + raise ValueError("Job ID is not set.") + return self.job_id + def _add_one_argument(self, key: str, value: str): """Parse the given key-value pair (the argument is given in key).""" key, value = fmt_key(key), fmt_value(value) diff --git a/simple_slurm/scancel.py b/simple_slurm/scancel.py index 0e2e1b6..5a22c7c 100644 --- a/simple_slurm/scancel.py +++ b/simple_slurm/scancel.py @@ -1,6 +1,7 @@ import subprocess from datetime import datetime, timedelta import logging +from typing import Union logging.basicConfig() logger = logging.getLogger(__name__) @@ -15,9 +16,10 @@ class SlurmScancelWrapper: def __init__(self, staledelta=timedelta(minutes=30)): self.stale_delta = staledelta - def cancel_job(self, job_id: int): - """Sends a straightforward scancel to a job""" - job_id = str(job_id) + def cancel_job(self, job_id): + """Sends a straightforward scancel to a job. job_id can be a str, int or slurm.Slurm() object""" + # int(slurm.Slurm)) returns a job id + job_id = str(int(job_id)) result = subprocess.run( ["scancel", job_id], stdout=subprocess.PIPE, @@ -27,12 +29,12 @@ def cancel_job(self, job_id: int): if result.returncode != 0: raise RuntimeError(f"Error cancelling job: {result.stderr.strip()}") - def signal_job(self, job_id: int): + def signal_job(self, job_id): """First time it is sent to a job, tries send a SIGTERM to the job id If sent again to the same job, attempts a SIGKILL instead if that fails as well, involkes scancel without term arguments """ - job_id = str(job_id) + job_id = str(int(job_id)) self.prune_old_jobs() signal = "--signal=TERM" if job_id not in self.sigmtems: From 7725248116f15b52b17bfd58ed2205866bd961fe Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 15:13:09 -0400 Subject: [PATCH 08/11] Support updated sbatch return logic in pytests --- test/test_core.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/test/test_core.py b/test/test_core.py index f6aa322..a68e236 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -213,11 +213,12 @@ def test_14_srun_returncode(self): def test_15_sbatch_execution(self): slurm = Slurm(contiguous=True) - job_id, stdout = self.__run_sbatch(slurm) + job, stdout = self.__run_sbatch(slurm) self.assertFalse(slurm.is_parsable) - self.assertIsInstance(job_id, int) - self.assertIn(f"Submitted batch job {job_id}", stdout) + self.assertIsInstance(job, Slurm) + self.assertIsInstance(int(job), int) + self.assertIn(f"Submitted batch job {int(job)}", stdout) def test_16_parse_timedelta(self): slurm = Slurm( @@ -263,11 +264,12 @@ def test_19_sbatch_execution_with_job_file(self): job_file = "script.sh" slurm = Slurm(contiguous=True) - job_id, stdout = self.__run_sbatch(slurm, job_file=job_file) + job, stdout = self.__run_sbatch(slurm, job_file=job_file) self.assertFalse(slurm.is_parsable) - self.assertIsInstance(job_id, int) - self.assertIn(f"Submitted batch job {job_id}", stdout) + self.assertIsInstance(job, Slurm) + self.assertIsInstance(int(job), int) + self.assertIn(f"Submitted batch job {int(job)}", stdout) with open(job_file, "r") as fid: job_contents = fid.read() @@ -331,18 +333,19 @@ def test_21_add_cmd_multiple(self): def test_22_parsable_sbatch_execution(self): slurm = Slurm(contiguous=True, parsable=True) - job_id, stdout = self.__run_sbatch(slurm) + job, stdout = self.__run_sbatch(slurm) self.assertTrue(slurm.is_parsable) - self.assertIsInstance(job_id, int) - self.assertEqual(f"{job_id}\n", stdout) + self.assertIsInstance(job, Slurm) + self.assertIsInstance(int(job), int) + self.assertEqual(f"{int(job)}\n", stdout) def __run_sbatch(self, slurm, *args, **kwargs): with io.StringIO() as buffer: with contextlib.redirect_stdout(buffer): - job_id = slurm.sbatch("echo Hello!", *args, **kwargs) + job = slurm.sbatch("echo Hello!", *args, **kwargs) stdout = buffer.getvalue() - return job_id, stdout + return job, stdout if __name__ == "__main__": From 6632acf8c6cdd0d875cad9930f2729c6e4e67deb Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 15:27:11 -0400 Subject: [PATCH 09/11] Update toc --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 038cb7e..32cbcb1 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ EOF + [Job Management](#job-management) - [Monitoring Jobs with `squeue`](#monitoring-jobs-with-squeue) - [Canceling Jobs with `scancel`](#canceling-jobs-with-scancel) + - [Inspecting Jobs with `scontrol`](#inspecting-jobs-with-scontrol) + - [Inspecting `sacct`](#inspecting-sacct) + [Error Handling](#error-handling) + [Project Growth](#project-growth) From 42f7d713076760c8cc49779ea48961effc82a5c8 Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 15:55:34 -0400 Subject: [PATCH 10/11] Version bump --- simple_slurm/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simple_slurm/__about__.py b/simple_slurm/__about__.py index d7b30e1..6a9beea 100644 --- a/simple_slurm/__about__.py +++ b/simple_slurm/__about__.py @@ -1 +1 @@ -__version__ = "0.3.6" +__version__ = "0.4.0" From b39d37eb312cde068754018110141e90e0076f0a Mon Sep 17 00:00:00 2001 From: tobiaspk Date: Wed, 21 May 2025 16:05:01 -0400 Subject: [PATCH 11/11] Support ruff formatting --- simple_slurm/core.py | 4 +-- simple_slurm/sacct.py | 70 ++++++++++++++++++++++++++-------------- simple_slurm/scancel.py | 1 - simple_slurm/scontrol.py | 22 +++++++------ test/test_cli.py | 3 -- test/test_core.py | 2 -- 6 files changed, 60 insertions(+), 42 deletions(-) diff --git a/simple_slurm/core.py b/simple_slurm/core.py index 7757e51..b2eb610 100644 --- a/simple_slurm/core.py +++ b/simple_slurm/core.py @@ -76,7 +76,6 @@ def __repr__(self) -> str: def __int__(self) -> int: """Return the job ID of the last submitted job when calling int() on the object.""" - if self.job_id is None: raise ValueError("Job ID is not set.") @@ -266,7 +265,7 @@ def sbatch( f"sbatch failed with error:\n{error.decode()}\nstdout:\n{stdout}\ncmd:\n{cmd}" ) job_id = int(stdout.split(" ")[3]) - + # store job id and command self.job_id = job_id self.scontrol.job_id = job_id @@ -277,7 +276,6 @@ def sbatch( if verbose: print(stdout) return self - class Namespace: diff --git a/simple_slurm/sacct.py b/simple_slurm/sacct.py index 3776272..4246a90 100644 --- a/simple_slurm/sacct.py +++ b/simple_slurm/sacct.py @@ -1,7 +1,4 @@ -import os import subprocess -import csv -from io import StringIO from pprint import pformat @@ -14,15 +11,27 @@ def __init__(self, fields: list = None, units: str = "M"): sacct -j 4315805 --format=JobID,JobName%20,State,Elapsed,Start,End,NNodes,AllocCPUs,ReqCPUs,ReqMem,MaxRSS,AllocTRES --units=M Returns: - JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES - ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- - 4315805 test_slurm COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 2048M billing=1+ - 4315805.bat+ batch COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 cpu=1,mem+ - 4315805.ext+ extern COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 billing=1+ + JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES + ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- + 4315805 test_slurm COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 2048M billing=1+ + 4315805.bat+ batch COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 cpu=1,mem+ + 4315805.ext+ extern COMPLETED 00:00:02 2025-04-15T13:39:11 2025-04-15T13:39:13 1 1 1 0 billing=1+ First one is most critical. Return a list of the dictionaries (only first row). If array is submitted, they will have multiple entries. """ - self.fields = fields or ["JobID", "JobName", "State", "Elapsed", "Start", "End", "NNodes", "AllocCPUs", "ReqCPUs", "ReqMem", "MaxRSS"] + self.fields = fields or [ + "JobID", + "JobName", + "State", + "Elapsed", + "Start", + "End", + "NNodes", + "AllocCPUs", + "ReqCPUs", + "ReqMem", + "MaxRSS", + ] self.command = ["sacct", "-j"] self.units = units self.sacct = None @@ -30,20 +39,26 @@ def __init__(self, fields: list = None, units: str = "M"): self.exit_code = None def __getitem__(self, key: str): - assert self.sacct is not None, f"sacct not initialized. Call 'update()' first" + assert self.sacct is not None, ( + "sacct not initialized. Call 'update()' first" + ) return [c[key] for c in self.sacct] def __len__(self): return len(self.sacct) - + def __iter__(self): - assert self.sacct is not None, f"sacct not initialized. Call 'update()' first" + assert self.sacct is not None, ( + "sacct not initialized. Call 'update()' first" + ) return iter(self.sacct) def __str__(self): - assert self.sacct is not None, f"sacct not initialized. Call 'update()' first" + assert self.sacct is not None, ( + "sacct not initialized. Call 'update()' first" + ) return pformat(self.sacct) - + def __repr__(self): return self.__str__() @@ -64,7 +79,12 @@ def update(self, job_id: str = None): """Refresh the information from the current queue for the current user""" self.job_id = self._get_job_id(job_id) result = subprocess.run( - self.command + [str(self.job_id), f"--format={','.join(self.fields)}", f"--units={self.units}"], + self.command + + [ + str(self.job_id), + f"--format={','.join(self.fields)}", + f"--units={self.units}", + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, @@ -78,24 +98,26 @@ def _parse_output(self, output: str): Get a list of dictionaries from the sacct output. New lines are detected when "JobName" is not "batch" or "extern". ``` - JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES - ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- - 4319559_1 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ - 4319559_1.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ - 4319559_1.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ - 4319559_2 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ - 4319559_2.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ + JobID JobName State Elapsed Start End NNodes AllocCPUS ReqCPUS ReqMem MaxRSS AllocTRES + ------------ -------------------- ---------- ---------- ------------------- ------------------- -------- ---------- -------- ---------- ---------- ---------- + 4319559_1 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ + 4319559_1.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ + 4319559_1.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ + 4319559_2 test COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 2048M billing=1+ + 4319559_2.b+ batch COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 cpu=1,mem+ 4319559_2.e+ extern COMPLETED 00:00:03 2025-04-15T13:49:15 2025-04-15T13:49:18 1 1 1 0 billing=1+ ... ``` """ - + skip = ["batch", "extern"] acct = [] for i, line in enumerate(output.splitlines()): # set header if i == 0: header = line.split() - assert "JobName" in header, f"JobName not found in header: {header}. Make sure to include it in the fields." + assert "JobName" in header, ( + f"JobName not found in header: {header}. Make sure to include it in the fields." + ) continue if i == 1: continue diff --git a/simple_slurm/scancel.py b/simple_slurm/scancel.py index 5a22c7c..c4c016a 100644 --- a/simple_slurm/scancel.py +++ b/simple_slurm/scancel.py @@ -1,7 +1,6 @@ import subprocess from datetime import datetime, timedelta import logging -from typing import Union logging.basicConfig() logger = logging.getLogger(__name__) diff --git a/simple_slurm/scontrol.py b/simple_slurm/scontrol.py index 3c94b9f..4c216cb 100644 --- a/simple_slurm/scontrol.py +++ b/simple_slurm/scontrol.py @@ -1,7 +1,4 @@ -import os import subprocess -import csv -from io import StringIO from pprint import pformat @@ -13,20 +10,26 @@ def __init__(self): self.exit_code = None def __getitem__(self, key: str): - assert self.control is not None, f"scontrol not initialized. Call 'update()' first" + assert self.control is not None, ( + "scontrol not initialized. Call 'update()' first" + ) return [c[key] for c in self.control] def __len__(self): return len(self.control) - + def __iter__(self): - assert self.control is not None, f"scontrol not initialized. Call 'update()' first" + assert self.control is not None, ( + "scontrol not initialized. Call 'update()' first" + ) return iter(self.control) def __str__(self): - assert self.control is not None, f"scontrol not initialized. Call 'update()' first" + assert self.control is not None, ( + "scontrol not initialized. Call 'update()' first" + ) return pformat(self.control) - + def __repr__(self): return self.__str__() @@ -79,8 +82,9 @@ def _parse_line(line): continue key, value = pair.split("=", 1) yield key, value - + control = [] + control_i = {} for i, line in enumerate(output.splitlines()): # start new block if "JobId=" in line: diff --git a/test/test_cli.py b/test/test_cli.py index b3873d8..64d3147 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -1,8 +1,5 @@ import contextlib import io -import os -import shutil -import subprocess import sys import unittest from unittest.mock import patch diff --git a/test/test_core.py b/test/test_core.py index a68e236..c388781 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -2,8 +2,6 @@ import datetime import io import os -import shutil -import subprocess import unittest from simple_slurm import Slurm