From 6826398f64b87911ca0d7c6da40ffd6b706b93b7 Mon Sep 17 00:00:00 2001 From: Johannes Hofmann Date: Fri, 12 Sep 2025 16:18:15 +0200 Subject: [PATCH 1/8] Inital transfer of cluster submitter from another project branch --- py_alf/Jobfile_templates/Generic.slurm.j2 | 57 ++ py_alf/cluster_submission.py | 782 ++++++++++++++++++++++ 2 files changed, 839 insertions(+) create mode 100644 py_alf/Jobfile_templates/Generic.slurm.j2 create mode 100644 py_alf/cluster_submission.py diff --git a/py_alf/Jobfile_templates/Generic.slurm.j2 b/py_alf/Jobfile_templates/Generic.slurm.j2 new file mode 100644 index 0000000..0604292 --- /dev/null +++ b/py_alf/Jobfile_templates/Generic.slurm.j2 @@ -0,0 +1,57 @@ +#!/bin/bash -l +#SBATCH --nodes={{ nodes }} +#SBATCH --ntasks={{ tasks }} +#SBATCH --threads-per-core=1 +#SBATCH --cpus-per-task={{ threads }} +##SBATCH --hint=nomultithread +#SBATCH --mail-type FAIL +#SBATCH --job-name={{ name }} +## Create a directory called logfiles in the submission directory to store log files +## The logfiles are named job-.log or job--.log +## DO NOT CHANGE so pyALF can find the logfiles! +#SBATCH --output=logfiles/job-{% if array %}%A-%a{% else %}%j{% endif %}.log +#SBATCH --mem={{ mem }} +#SBATCH --partition={{ queue }} +#SBATCH --time={{ time }}:00:00 +#SBATCH --no-requeue +{% if array %}#SBATCH --array=0-{{ array_max }}{% endif %} + +echo "Job is assigned to $SLURM_JOB_NODELIST" + +# +# Prepare your environment +# + +# causes jobs to fail if one command fails - makes failed jobs easier to find with tools like sacct +set -e + +# load modules using the machine provided to the configure.sh script +cd {{ alf_src }} +source configure.sh {{ config }} +cd - + +## DO NOT CHANGE so array jobs can find the directories.txt file, created by cluster_sumbitter +#switch to apropriate simulation directory +{% if array %} +simdir="$(sed -n "$((SLURM_ARRAY_TASK_ID+1))p" directories.txt)" +{% else %} +simdir="{{ sim_dir }}" +{% endif %} +cd $simdir + +## DO NOT CHANGE so pyALF can find the logfiles! +slurm_logfile="${SLURM_SUBMIT_DIR}/logfiles/job-{% if array %}${SLURM_ARRAY_JOB_ID}-${SLURM_ARRAY_TASK_ID}{% else %}${SLURM_JOB_ID}{% endif %}.log" +ln -sf "${slurm_logfile}" latest_cluster_run.log + +#run QMC +echo "Starting QMC run" +ulimit -c 0 #disable the generation of core.*** dump files if code crashed +srun ./ALF.out +echo "Done with QMC run" +echo + +#cleanup slurm id file to indicate job is done. +rm -f {{ slurm_id_file }} + +# exit gracefully +exit 0 diff --git a/py_alf/cluster_submission.py b/py_alf/cluster_submission.py new file mode 100644 index 0000000..8b71495 --- /dev/null +++ b/py_alf/cluster_submission.py @@ -0,0 +1,782 @@ +"""Provides interfaces for compiling, running and postprocessing ALF in Python. +""" + +__author__ = "Johannes Hofmann" +__copyright__ = "Copyright 2020-2025, The ALF Project" +__license__ = "GPL" + +from jinja2 import Environment, FileSystemLoader, StrictUndefined +from pathlib import Path +from .simulation import cd, Simulation +import subprocess +from collections.abc import Iterable +import os +from colorama import Fore +from tabulate import tabulate +from typing import Union, List, Optional, Dict, Any, Sequence +import logging +from tqdm import tqdm + +logger = logging.getLogger(__name__) + +# --- ClusterSubmitter class --- +class ClusterSubmitter: + """ + Handles job submission to a SLURM cluster using Jinja2 templates. + """ + def __init__(self, cluster_name: str): + # Locate job template directory and load the template for the given cluster. + base_dir = Path(__file__).resolve().parent + template_dir = base_dir / "Jobfile_templates" + self.env = Environment( + loader=FileSystemLoader(str(template_dir)), + undefined=StrictUndefined + ) + self.template = self.env.get_template(f"{cluster_name}.slurm.j2") + + def render(self, context: Dict[str, Any]) -> str: + """ + Render the SLURM job script from the template and context. + Warns about unused context variables. + """ + provided = set(context.keys()) + referenced = set(self.template.module.__dict__.keys()) + unused = provided - referenced + if unused: + logger.warning(f"Unused variables in template: {unused}") + + return self.template.render(**context) + + def submit( + self, + sims: Union[Simulation, Iterable[Simulation]], + submit_dir: str = 'array_submissions', + job_properties: Optional[Dict[str, Any]] = None + ) -> None: + """ + Submits one or more Simulation instances to the cluster. + - Prepares simulation directories. + - Filters out running/broken jobs. + - Renders and writes the SLURM script. + - Submits via sbatch. + - Stores job IDs. + Args: + sims: A Simulation instance or a sequence of Simulation instances. + submit_dir: Directory to submit jobs from. + job_properties: Optional dictionary of SLURM job properties. + """ + submit_path = Path(submit_dir) + submit_path.mkdir(parents=True, exist_ok=True) + + # Normalize input to a list of simulations. + # Accept any iterable except string/bytes + if isinstance(sims, Iterable) and not isinstance(sims, (str, bytes, Simulation)): + sim_list = list(sims) + else: + sim_list = [sims] + + # Type check for Simulation instances + for s in sim_list: + if not isinstance(s, Simulation): + logger.error(f"Expected Simulation, got {type(s)}") + raise TypeError(f"Expected Simulation, got {type(s)}") + + # Filter out active or broken jobs. + filtered_sims = [] + for s in sim_list: + jobid_file = Path(s.sim_dir) / "jobid.txt" + if jobid_file.exists(): + with jobid_file.open("r") as f: + jobid = f.read().strip() + status = _get_slurm_status_sacct(jobid) + if status in ("PENDING", "RUNNING"): + logger.info(f"Skipping {s.sim_dir}: job {jobid} is {status}") + continue + running_file = Path(s.sim_dir) / "RUNNING" + if running_file.exists(): + jobid = jobid_file.read_text().strip() + status = _get_slurm_status_sacct(jobid) + if status in ("RUNNING"): + logger.info(f"Skipping {s.sim_dir}: job {jobid} is {status}") + continue + else: + logger.warning(f"Leftover RUNNING file detected in {s.sim_dir}.") + logger.warning("This indicates an error in the previous run.") + choice = input(f"Do you want to remove this file to enable resubmission (status={status}) [y/N]?").strip().lower() + if choice in ("yes", "y"): + running_file.unlink() + logger.info("File removed.") + else: + logger.info("File kept.") + logger.info(f"Skipping {s.sim_dir}") + continue + filtered_sims.append(s) + + if not filtered_sims: + logger.info("No inactive simulations to submit.") + return + + array = len(filtered_sims) > 1 + sim = filtered_sims[0] # Representative simulation for job naming. + + # For array jobs, write directories to a file. + if array: + # Find the next available integer subdirectory inside submit_path + existing = [int(p.name) for p in submit_path.iterdir() if p.is_dir() and p.name.isdigit()] + next_idx = max(existing, default=0) + 1 + array_submit_path = submit_path / str(next_idx) + array_submit_path.mkdir(parents=True, exist_ok=True) + # Write directories.txt inside the new subdir + dirs_file = array_submit_path / "directories.txt" + dirs_file.write_text("\n".join(str(s.sim_dir) for s in filtered_sims)) + else: + array_submit_path = submit_path + + # Prepare simulation directories. + for s in filtered_sims: + s.run(only_prep=True, copy_bin=True) + + # Default job properties, can be overridden. + MaxRunTime = sim.sim_dict.get("CPU_MAX",24) + MaxRunTime = max(1,int(MaxRunTime)) + props = { + "name": sim.ham_name, + "time": MaxRunTime, + "mem": "2G", + "threads": sim.n_omp, + "tasks": sim.n_mpi, + "partition": "short", + } + if job_properties: + props.update(job_properties) + + # Render SLURM script. + context = { + "alf_src": sim.alf_src.alf_dir, + "sim_dir": sim.sim_dir, + "sub_dir": str(array_submit_path), + "config": sim.config, + "array": array, + "array_max": len(filtered_sims) - 1 if array else None, + "slurm_id_file": "jobid.txt", + **props, + } + script_content = self.template.render(**context) + + script_path = Path(sim.sim_dir) + if array: + script_path = array_submit_path + script_file = script_path / "job.slurm" + script_file.write_text(script_content) + + # Submit job via sbatch. + with cd(script_path): + result = subprocess.run(["sbatch", "job.slurm"], capture_output=True, text=True) + output = result.stdout.strip() + error = result.stderr.strip() + + if result.returncode != 0: + logger.error(f"Subprocess failed: {error}") + raise RuntimeError(f"Subprocess failed: {error}") + + logger.info(output) + + # Store job ID(s). + if output.startswith("Submitted batch job"): + jobid = output.split()[-1] + if array: + for idx, s in enumerate(filtered_sims): + jobid_file = Path(s.sim_dir) / "jobid.txt" + jobid_file.write_text(f"{jobid}_{idx}") + else: + jobid_file = Path(s.sim_dir) / "jobid.txt" + jobid_file.write_text(jobid) + + def resubmission( + self, + sims_to_resubmit: Iterable[Simulation], + job_properties: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, str]] = None, + print_first: bool = True, + confirm: bool = True, + counting_obs: str = 'Ener_scal', + subdir: Optional[str] = 'resubmission' + ) -> None: + """ + Resubmits simulations that have too few bins. + Args: + sims: Iterable of Simulation instances. + print_first: If True, prints simulations to be resubmitted. + confirm: If True, asks for confirmation before resubmission. + subdir: Subdirectory for resubmission job scripts. + """ + + if not sims_to_resubmit: + logger.info("No simulations to resubmit.") + return + + if print_first: + print(f"{len(sims_to_resubmit)} simulations will be resubmitted.") + for sim in sims_to_resubmit: + num_bins = sim.bin_count(counting_obs=counting_obs, refresh=True) + status = sim.get_cluster_job_status() + label = "".join(f"{k}={sim.sim_dict[v]}, " if v in sim.sim_dict else "" for k,v in params.items()) if params else sim.sim_dir + print(f"Sim (Nbins={num_bins}) {label} with status {status}") + + if confirm: + choice = input("Proceed with resubmission? [y/N]: ").strip().lower() + if choice not in ("yes", "y"): + logger.info("Resubmission cancelled.") + return + + self.submit( + sims=sims_to_resubmit, + submit_dir=subdir, + job_properties=job_properties + ) + +# --- Status functions --- +def get_status(sim: Simulation, colored: bool = True) -> str: + """ + Returns colorized SLURM job status for a simulation. + Args: + sim: Simulation instance. + colored: Colorize output if True. + Returns: + Colorized status string. + """ + jobid_file = Path(sim.sim_dir) / "jobid.txt" + running_file = Path(sim.sim_dir) / "RUNNING" + if not jobid_file.exists(): + if running_file.exists(): + status = "CRASHED" + else: + status = "NO_JOBID" + else: + jobid = jobid_file.read_text().strip() + status, runtime = _get_slurm_status_sacct(jobid) + if colored: + status = _colorize_status(status) + return status + +def get_job_id(sim: Simulation) -> str | None: + """ + Returns colorized SLURM job status for a simulation. + Args: + sim: Simulation instance. + colored: Colorize output if True. + Returns: + Colorized status string. + """ + jobid_file = Path(sim.sim_dir) / "jobid.txt" + if not jobid_file.exists(): + return None + else: + return jobid_file.read_text().strip() + +def _get_slurm_status_sacct(jobid: str) -> tuple[str, Optional[str]]: + """ + Query SLURM sacct for job status and elapsed time. + Returns (status, runtime) tuple. + """ + try: + result = subprocess.run( + ["sacct", "-j", jobid, "--format=State,Elapsed", "--noheader", "--array"], + capture_output=True, text=True, timeout=15 + ) + lines = result.stdout.strip().splitlines() + logger.debug(lines) + for line in lines: + parts = line.split() + if len(parts) >= 1: + state = parts[0] + runtime = parts[1] if len(parts) > 1 else None + return (state, runtime) + return ("UNKNOWN", None) + except Exception as e: + logger.error(f"sacct error for job {jobid}: {e}") + return ("ERROR", None) + +def _get_slurm_status_bulk_sacct(jobids: List[str]) -> Dict[str, Union[str, tuple]]: + """ + Query SLURM sacct for multiple job IDs (including array tasks) in one call. + Returns dict: jobid[_index] -> (status, runtime) + """ + status_map: Dict[str, Union[str, tuple]] = {jid: "UNKNOWN" for jid in jobids} + if not jobids: + return status_map + + try: + result = subprocess.run( + ["sacct", "--format=JobID,State,Elapsed", "--noheader", "--array"], + capture_output=True, text=True, timeout=30 + ) + for line in result.stdout.strip().splitlines(): + parts = line.split() + if len(parts) >= 2: + jobid = parts[0] + state = parts[1] + runtime = parts[2] if len(parts) > 2 else None + status_map[jobid] = (state, runtime) + except Exception as e: + logger.error(f"sacct bulk error: {e}") + for jid in jobids: + status_map[jid] = ("ERROR", None) + return status_map + +def _get_slurm_status_bulk(jobids: List[str]) -> Dict[str, Union[str, tuple]]: + """ + Query SLURM for multiple job IDs (including array tasks) in one call. + Args: + jobids: List of job IDs. + Returns: + Dict mapping jobid[_index] to (status, runtime) or status string. + """ + if not jobids: + return {} + + status_map: Dict[str, Union[str, tuple]] = {jid: "FINISHED_OR_NOT_FOUND" for jid in jobids} + found_in_squeue = set() + + try: + result = subprocess.run( + ["squeue", "-h", "-o", "%A %i %T %M", "--array", "-j", ",".join(jobids)], + capture_output=True, text=True, timeout=30 + ) + squeue_failed = False + except subprocess.TimeoutExpired: + logger.warning("squeue command timed out. Falling back to sacct for job status.") + squeue_failed = True + except Exception as e: + logger.error(f"Error running squeue: {e}") + squeue_failed = True + + if not squeue_failed: + for line in result.stdout.strip().splitlines(): + try: + parts = line.split(maxsplit=3) + if len(parts) != 4: + logger.warning(f"Unexpected squeue output line: '{line}'") + continue + jid, idx, state, runtime = parts + full_id = jid if idx == "N/A" else idx + status_map[full_id] = (state, runtime) + found_in_squeue.add(full_id) + except Exception as e: + logger.error(f"Error parsing squeue output line '{line}': {e}") + continue + # For jobs not found in squeue output, fallback to sacct bulk + missing_jobids = [jid for jid in jobids if jid not in found_in_squeue] + if missing_jobids: + sacct_statuses = _get_slurm_status_bulk_sacct(missing_jobids) + for jid in missing_jobids: + status_map[jid] = sacct_statuses.get(jid, ("UNKNOWN", None)) + else: + # squeue failed, use sacct bulk for all jobids + status_map = _get_slurm_status_bulk_sacct(jobids) + + return status_map + +def get_status_all( + sims: Iterable[Simulation], + header: Optional[List[str]] = None, + keys: Optional[List[str]] = None, + filter_out: Optional[List[str]] = None, + crash_tags: Optional[List[str]] = None, + showid: bool = True, + counting_obs: str = 'Ener_scal', + refresh_cache: bool = False, + min_bins: int = 4, + **tabargs +) -> tuple[Optional[List[Simulation]], Optional[List[Simulation]]]: + """ + Prints a table of statuses for all simulations (bulk SLURM query). + Args: + sims: Iterable of Simulation instances. + header: List of column headers. + keys: List of keys to extract from sim.sim_dict. + filter_out: List of statuses to filter out from display. + crash_tags: List of tags indicating crashed simulations. + showid: Whether to show job ID column. + counting_obs: Observable name for bin counting. + refresh_cache: Whether to refresh bin count cache. + min_bins: Minimum number of bins required; simulations with fewer are returned. + tabargs: Additional arguments for tabulate. + Returns: + Tuple of (sims_with_too_few_bins, crashed_sims) + """ + sims = list(sims) # Accept any iterable + if header is None: + header = ['dir'] + if keys is None: + keys = ['sim_dir'] + if filter_out is None: + filter_out = ['INACTIVE'] + if crash_tags: + crash_tags = [tag.upper() for tag in crash_tags] + if 'CRASHED' not in crash_tags: + crash_tags.append('CRASHED') + if 'FAILED' not in crash_tags: + crash_tags.append('FAILED') + else: + crash_tags = ['CRASHED', 'FAILED'] + jobid_map: Dict[str, str] = {} + for sim in sims: + jobid_file = Path(sim.sim_dir) / "jobid.txt" + if jobid_file.exists(): + jobid_map[sim.sim_dir] = jobid_file.read_text().strip() + + statuses = _get_slurm_status_bulk(list(jobid_map.values())) + + summary: Dict[str, int] = {} + header = header + ['N_bin', 'JobID', 'status', 'time'] + if showid: + header = ['SimID'] + header + entries: List[List[Any]] = [] + + sims_with_too_few_bins: List[Simulation] = [] + crashed_sims: List[Simulation] = [] + for idx,sim in enumerate(sims): + runtime: Optional[str] = None + jobid = jobid_map.get(sim.sim_dir) + if jobid is None: + running_file = Path(sim.sim_dir) / "RUNNING" + if running_file.exists(): + status = "CRASHED" + else: + status = "INACTIVE" + else: + status_tuple = statuses.get(jobid, ("UNKNOWN", None)) + if isinstance(status_tuple, tuple): + status, runtime = status_tuple + else: + status = status_tuple + runtime = None + + num_bins = _bin_count(sim, counting_obs, refresh=(status == 'RUNNING') or refresh_cache) + row = [sim.sim_dict.get(key, None) for key in keys] + if showid: + row = [idx] + row + row.append(num_bins) + row.append(jobid) + row.append(_colorize_status(status)) + row.append(_pad_runtime(runtime) if status == 'RUNNING' and runtime is not None else None) + if filter_out and status not in filter_out: + entries.append(row) + if status in crash_tags: + crashed_sims.append(sim) + if num_bins < min_bins and status not in ('RUNNING', 'PENDING'): + # double-check bin count to avoid cache issues + num_bins = _bin_count(sim, counting_obs, refresh=True) + if num_bins < min_bins: + sims_with_too_few_bins.append(sim) + + summary[status] = summary.get(status, 0) + 1 + + print(tabulate(entries, headers=header, + tablefmt=tabargs.pop('tablefmt', "fancy_grid"), + stralign=tabargs.pop('stralign', 'right'), + **tabargs)) + + print('\nSummary:') + if 'RUNNING' in summary: + key = 'RUNNING' + val = summary.pop(key) + _print_summary_entry(key, val, len(sims), filter_out) + if 'PENDING' in summary: + key = 'PENDING' + val = summary.pop(key) + _print_summary_entry(key, val, len(sims), filter_out) + for key, val in summary.items(): + if filter_out and key in filter_out: + print(_colorize_status(key), f':\t{val}/{len(sims)}\t(filtered out)') + else: + print(_colorize_status(key), f':\t{val}/{len(sims)}') + + if sims_with_too_few_bins: + print(f"{len(sims_with_too_few_bins)} simulations with fewer than {min_bins} bins in '{counting_obs}'.") + return sims_with_too_few_bins if sims_with_too_few_bins else None, crashed_sims if crashed_sims else None + +def find_sims_by_status( + sims: Iterable[Simulation], + filter: List[str] +) -> List[Simulation] | None: + """ + Prints a table of statuses for all simulations (bulk SLURM query). + Args: + sims: Iterable of Simulation instances. + filter: List of statuses to return. + """ + sims = list(sims) # Accept any iterable + jobid_map: Dict[str, str] = {} + for sim in sims: + jobid_file = Path(sim.sim_dir) / "jobid.txt" + if jobid_file.exists(): + jobid_map[sim.sim_dir] = jobid_file.read_text().strip() + + statuses = _get_slurm_status_bulk(list(jobid_map.values())) + + + sims_with_status: List[Simulation] = [] + for sim in sims: + jobid = jobid_map.get(sim.sim_dir) + if jobid is None: + running_file = Path(sim.sim_dir) / "RUNNING" + if running_file.exists(): + status = "CRASHED" + else: + status = "INACTIVE" + else: + status_tuple = statuses.get(jobid, ("UNKNOWN", None)) + if isinstance(status_tuple, tuple): + status, runtime = status_tuple + else: + status = status_tuple + if status in filter: + sims_with_status.append(sim) + if not sims_with_status: + logger.info(f"No simulations found with status in {filter}.") + return sims_with_status if sims_with_status else None + +def _print_summary_entry(key: str, val: int, total: int, filter_out: Optional[List[str]] = None) -> None: + if filter_out and key in filter_out: + print(_colorize_status(key), f':\t{val}/{total}\t(not shown)') + else: + print(_colorize_status(key), f':\t{val}/{total}') + +def _get_slurm_status(jobid_element: str) -> str: + """ + Query SLURM for a single job or array element. + Args: + jobid_element: Job ID string. + Returns: + Status string. + """ + result = subprocess.run( + ["squeue", "-j", jobid_element, "-h", "-o", "%T"], + capture_output=True, text=True + ) + status = result.stdout.strip() + return status if status else "FINISHED_OR_NOT_FOUND" + +_bin_cache: Dict[Any, int] = {} + +def _bin_count(sim: Simulation, counting_obs: str = 'Ener_scal', refresh: bool = False) -> int: + """ + Counts bins for a given observable in simulation data, with caching. + Args: + sim: Simulation instance. + counting_obs: Observable name. + refresh: Whether to refresh cache. + Returns: + Number of bins. + """ + import h5py, os + filename = os.path.join(sim.sim_dir, "data.h5") + key = (filename, counting_obs) + + if (key in _bin_cache) and (not refresh): + return _bin_cache[key] + + N_bins = 0 + try: + with h5py.File(filename, "r") as f: + if counting_obs in f: + N_bins = f[counting_obs + '/obser'].shape[0] + except (FileNotFoundError) as e: + N_bins = 0 + except (OSError, KeyError) as e: + logger.error(f"Error reading {filename}: {e}") + N_bins = 0 + + _bin_cache[key] = N_bins + return N_bins + +def _colorize_status(status: str) -> str: + """ + Returns a colorized status string for terminal output. + Args: + status: Status string. + Returns: + Colorized status string. + """ + if status == "RUNNING": + status = Fore.GREEN + status + Fore.RESET + elif status == "PENDING": + status = Fore.YELLOW + status + Fore.RESET + elif status in ("CRASHED","FAILED"): + status = Fore.RED + status + Fore.RESET + elif status == "FINISHED_OR_NOT_FOUND": + status = Fore.BLUE + status + Fore.RESET + return status + +def _pad_runtime(runtime: Optional[str], width: int = 10) -> str: + """ + Pads runtime string for table formatting. + Args: + runtime: Runtime string. + width: Desired width. + Returns: + Padded runtime string. + """ + return runtime.rjust(width) if runtime is not None else "".rjust(width) + +def print_logfile( + sim: Simulation, + logfile: Optional[str] = None, + tail: Optional[int] = None, + head: Optional[int] = None, + return_content: bool = False, + show_progress: bool = False +) -> Optional[str]: + """ + Prints the logfile of a simulation to the terminal, with options for tail/head and progress. + Args: + sim: Simulation instance. + logfile: Path to logfile. If None, tries to auto-detect. + tail: If set, print only the last N lines. + head: If set, print only the first N lines. + return_content: If True, return log content as string. + show_progress: If True, show a progress bar for large files. + Returns: + Log content as string if return_content is True, else None. + """ + log_file = None + if logfile: + log_file = Path(logfile) + if not log_file.exists(): + logger.error(f"Log file {log_file} does not exist.") + return None + else: + log_file = Path(sim.sim_dir) / "latest_cluster_run.log" + if not log_file.exists(): + logger.info(f"Searching by job ID...") + jobid=get_job_id(sim) + if jobid: + status=get_status(sim, colored=False) + print(f"Found job ID {jobid} with status {status}.") + if status =='PENDING': + logger.info(f"Job {jobid} is pending. Logfile cannot be located by job ID.") + return None + else: + logger.info(f"No job ID found. Logfile cannot be located by job ID.") + return None + logfile_path = _find_job_log(jobid, root_dir=[sim.sim_dir, '.']) + if logfile_path is None: + logger.error(f"Cannot locate logfile.") + return None + log_file = logfile_path + + try: + with log_file.open("r") as f: + lines = f.readlines() + total_lines = len(lines) + if head is not None: + lines = lines[:head] + elif tail is not None: + lines = lines[-tail:] + if show_progress and total_lines > 1000: + for line in tqdm(lines, desc="Reading logfile"): + print(line, end="") + else: + print("".join(lines)) + if return_content: + return "".join(lines) + except FileNotFoundError: + logger.warning(f"Log file {log_file} does not exist.") + except Exception as e: + logger.error(f"Error reading {log_file}: {e}") + return None + +def _find_job_log(jobid: str, root_dir: List[str] = ['.']) -> Path | None: + if jobid is None: + logger.info("No job ID provided for logfile search.") + return None + all_matches = [] + for dir in root_dir: + root = Path(dir) + pattern = f"job-{jobid.replace("_", "-")}.log" + matches = list(root.rglob(pattern)) + all_matches.extend(matches) + matches = all_matches + if not matches : + logger.error(f"Could not find logfile for job {jobid} in {root_dir}") + return None + if len(matches) != 1: + logger.warning(f"Multiple logfiles found for job {jobid} in {root_dir}") + return matches[0] # list of Path objects + +# --- Attach status method to Simulation class --- +Simulation.get_cluster_job_status = get_status +Simulation.get_cluster_job_id = get_job_id +Simulation.print_cluster_logfile = print_logfile +Simulation.bin_count = _bin_count + +def simulation_submit_to_cluster( + self: Simulation, + cluster_submitter: ClusterSubmitter, + job_properties: Optional[Dict[str, Any]] = None +) -> None: + """ + Submits this simulation as a single job to the cluster using the provided ClusterSubmitter. + Args: + self: Simulation instance. + cluster_submitter: ClusterSubmitter instance. + job_properties: Optional dictionary of SLURM job properties. + """ + if not isinstance(cluster_submitter, ClusterSubmitter): + raise TypeError("cluster_submitter must be a ClusterSubmitter instance") + cluster_submitter.submit(self, submit_dir=self.sim_dir, job_properties=job_properties) + +Simulation.submit_to_cluster = simulation_submit_to_cluster + +def cancel_cluster_job(sim: Simulation) -> bool: + """ + Cancels the SLURM job associated with this simulation. + Returns True if cancellation was attempted, False otherwise. + """ + jobid = get_job_id(sim) + if jobid is None: + logger.warning(f"No job ID found for simulation in {sim.sim_dir}.") + return False + try: + result = subprocess.run(["scancel", jobid], capture_output=True, text=True) + if result.returncode == 0: + logger.info(f"Cancelled job {jobid} for simulation in {sim.sim_dir}.") + return True + else: + logger.error(f"Failed to cancel job {jobid}: {result.stderr.strip()}") + return False + except Exception as e: + logger.error(f"Error cancelling job {jobid}: {e}") + return False + +def cancel_cluster_jobs(sims: Iterable[Simulation]) -> None: + """ + Cancels SLURM jobs for a list or iterable of simulations. + """ + for sim in sims: + cancel_cluster_job(sim) + +# Attach to Simulation class +Simulation.cancel_cluster_job = cancel_cluster_job + +def remove_RUNNING_file(sim: Simulation) -> bool: + """ + Removes the RUNNING file from the simulation directory if it exists. + Returns True if the file was removed, False otherwise. + """ + running_file = Path(sim.sim_dir) / "RUNNING" + if running_file.exists(): + try: + running_file.unlink() + logger.info(f"Removed RUNNING file from {sim.sim_dir}.") + return True + except Exception as e: + logger.error(f"Error removing RUNNING file from {sim.sim_dir}: {e}") + return False + else: + logger.info(f"No RUNNING file found in {sim.sim_dir}.") + return False + +Simulation.remove_RUNNING_file = remove_RUNNING_file \ No newline at end of file From b29c7309feba2273420b8ff01e6bfefc0acc5f53 Mon Sep 17 00:00:00 2001 From: Jonas Schwab Date: Tue, 23 Sep 2025 10:08:56 +0200 Subject: [PATCH 2/8] Some automatic formatting by ruff --- py_alf/cluster_submission.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/py_alf/cluster_submission.py b/py_alf/cluster_submission.py index 8b71495..1970569 100644 --- a/py_alf/cluster_submission.py +++ b/py_alf/cluster_submission.py @@ -5,18 +5,20 @@ __copyright__ = "Copyright 2020-2025, The ALF Project" __license__ = "GPL" -from jinja2 import Environment, FileSystemLoader, StrictUndefined -from pathlib import Path -from .simulation import cd, Simulation +import logging +import os import subprocess from collections.abc import Iterable -import os +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + from colorama import Fore +from jinja2 import Environment, FileSystemLoader, StrictUndefined from tabulate import tabulate -from typing import Union, List, Optional, Dict, Any, Sequence -import logging from tqdm import tqdm +from .simulation import Simulation, cd + logger = logging.getLogger(__name__) # --- ClusterSubmitter class --- @@ -210,7 +212,7 @@ def resubmission( confirm: If True, asks for confirmation before resubmission. subdir: Subdirectory for resubmission job scripts. """ - + if not sims_to_resubmit: logger.info("No simulations to resubmit.") return @@ -571,7 +573,7 @@ def _bin_count(sim: Simulation, counting_obs: str = 'Ener_scal', refresh: bool = Returns: Number of bins. """ - import h5py, os + import h5py filename = os.path.join(sim.sim_dir, "data.h5") key = (filename, counting_obs) @@ -583,7 +585,7 @@ def _bin_count(sim: Simulation, counting_obs: str = 'Ener_scal', refresh: bool = with h5py.File(filename, "r") as f: if counting_obs in f: N_bins = f[counting_obs + '/obser'].shape[0] - except (FileNotFoundError) as e: + except (FileNotFoundError): N_bins = 0 except (OSError, KeyError) as e: logger.error(f"Error reading {filename}: {e}") @@ -650,7 +652,7 @@ def print_logfile( else: log_file = Path(sim.sim_dir) / "latest_cluster_run.log" if not log_file.exists(): - logger.info(f"Searching by job ID...") + logger.info("Searching by job ID...") jobid=get_job_id(sim) if jobid: status=get_status(sim, colored=False) @@ -659,11 +661,11 @@ def print_logfile( logger.info(f"Job {jobid} is pending. Logfile cannot be located by job ID.") return None else: - logger.info(f"No job ID found. Logfile cannot be located by job ID.") + logger.info("No job ID found. Logfile cannot be located by job ID.") return None logfile_path = _find_job_log(jobid, root_dir=[sim.sim_dir, '.']) if logfile_path is None: - logger.error(f"Cannot locate logfile.") + logger.error("Cannot locate logfile.") return None log_file = logfile_path @@ -779,4 +781,4 @@ def remove_RUNNING_file(sim: Simulation) -> bool: logger.info(f"No RUNNING file found in {sim.sim_dir}.") return False -Simulation.remove_RUNNING_file = remove_RUNNING_file \ No newline at end of file +Simulation.remove_RUNNING_file = remove_RUNNING_file From ab91d6f24c253c1217de84062f91626f39ed4dee Mon Sep 17 00:00:00 2001 From: Jonas Schwab Date: Tue, 23 Sep 2025 10:10:03 +0200 Subject: [PATCH 3/8] Some more formatting by ruff --- py_alf/cluster_submission.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/py_alf/cluster_submission.py b/py_alf/cluster_submission.py index 1970569..45511b4 100644 --- a/py_alf/cluster_submission.py +++ b/py_alf/cluster_submission.py @@ -250,10 +250,7 @@ def get_status(sim: Simulation, colored: bool = True) -> str: jobid_file = Path(sim.sim_dir) / "jobid.txt" running_file = Path(sim.sim_dir) / "RUNNING" if not jobid_file.exists(): - if running_file.exists(): - status = "CRASHED" - else: - status = "NO_JOBID" + status = "CRASHED" if running_file.exists() else "NO_JOBID" else: jobid = jobid_file.read_text().strip() status, runtime = _get_slurm_status_sacct(jobid) @@ -443,10 +440,7 @@ def get_status_all( jobid = jobid_map.get(sim.sim_dir) if jobid is None: running_file = Path(sim.sim_dir) / "RUNNING" - if running_file.exists(): - status = "CRASHED" - else: - status = "INACTIVE" + status = "CRASHED" if running_file.exists() else "INACTIVE" else: status_tuple = statuses.get(jobid, ("UNKNOWN", None)) if isinstance(status_tuple, tuple): @@ -524,10 +518,7 @@ def find_sims_by_status( jobid = jobid_map.get(sim.sim_dir) if jobid is None: running_file = Path(sim.sim_dir) / "RUNNING" - if running_file.exists(): - status = "CRASHED" - else: - status = "INACTIVE" + status = "CRASHED" if running_file.exists() else "INACTIVE" else: status_tuple = statuses.get(jobid, ("UNKNOWN", None)) if isinstance(status_tuple, tuple): @@ -690,7 +681,9 @@ def print_logfile( logger.error(f"Error reading {log_file}: {e}") return None -def _find_job_log(jobid: str, root_dir: List[str] = ['.']) -> Path | None: +def _find_job_log(jobid: str, root_dir: List[str] = None) -> Path | None: + if root_dir is None: + root_dir = ['.'] if jobid is None: logger.info("No job ID provided for logfile search.") return None From 03aa889d22cfe8abd9e2b651dc76f4402cc970cb Mon Sep 17 00:00:00 2001 From: Jonas Schwab Date: Thu, 25 Sep 2025 17:32:58 +0200 Subject: [PATCH 4/8] Add necessary packages for cluster_submission to pyproject.toml --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 4714825..e672753 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,10 @@ dependencies = [ "numpy", "pandas", "scipy", + "colorama", + "Jinja2", + "tabulate2", + "tqdm", ] [project.optional-dependencies] From 6ea78c3b37488b1c75dd7738f6d2b215e6dca9b8 Mon Sep 17 00:00:00 2001 From: Jonas Schwab Date: Thu, 25 Sep 2025 17:34:56 +0200 Subject: [PATCH 5/8] Fix linter error --- py_alf/cluster_submission.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py_alf/cluster_submission.py b/py_alf/cluster_submission.py index 45511b4..eaf2c0f 100644 --- a/py_alf/cluster_submission.py +++ b/py_alf/cluster_submission.py @@ -690,7 +690,7 @@ def _find_job_log(jobid: str, root_dir: List[str] = None) -> Path | None: all_matches = [] for dir in root_dir: root = Path(dir) - pattern = f"job-{jobid.replace("_", "-")}.log" + pattern = f"job-{jobid.replace('_', '-')}.log" matches = list(root.rglob(pattern)) all_matches.extend(matches) matches = all_matches From b51f54281472712e04a1f6f0b3c1d82b523989fd Mon Sep 17 00:00:00 2001 From: Jonas Schwab Date: Thu, 25 Sep 2025 17:49:22 +0200 Subject: [PATCH 6/8] CI: Work on test_template --- .gitlab-ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 41e3157..329c359 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -19,10 +19,9 @@ default: - tests - .gitlab-ci.yml script: - - pip install pylint ruff pytest + - pip install pylint ruff pytest . - pylint -E py_alf/ - ruff check --exclude doc - - pip install . - pytest - alf_run --machine $MACHINE --branch $BRANCH_R --sims_file py_alf/cli/Sims From 25aeb1b62cc9e23c5c676d815b197a14d018bd8d Mon Sep 17 00:00:00 2001 From: Johannes Hofmann Date: Wed, 8 Oct 2025 13:13:28 +0200 Subject: [PATCH 7/8] update treating slurm status as dict instead of tuple --- py_alf/cluster_submission.py | 69 ++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/py_alf/cluster_submission.py b/py_alf/cluster_submission.py index eaf2c0f..7c6680a 100644 --- a/py_alf/cluster_submission.py +++ b/py_alf/cluster_submission.py @@ -90,21 +90,21 @@ def submit( if jobid_file.exists(): with jobid_file.open("r") as f: jobid = f.read().strip() - status = _get_slurm_status_sacct(jobid) - if status in ("PENDING", "RUNNING"): - logger.info(f"Skipping {s.sim_dir}: job {jobid} is {status}") + status_entry = _get_slurm_status_sacct(jobid) + if status_entry.get("status") in ("PENDING", "RUNNING"): + logger.info(f"Skipping {s.sim_dir}: job {jobid} is {status_entry.get('status')}") continue running_file = Path(s.sim_dir) / "RUNNING" if running_file.exists(): jobid = jobid_file.read_text().strip() - status = _get_slurm_status_sacct(jobid) - if status in ("RUNNING"): - logger.info(f"Skipping {s.sim_dir}: job {jobid} is {status}") + status_entry = _get_slurm_status_sacct(jobid) + if status_entry.get("status") == "RUNNING": + logger.info(f"Skipping {s.sim_dir}: job {jobid} is {status_entry.get('status')}") continue else: logger.warning(f"Leftover RUNNING file detected in {s.sim_dir}.") logger.warning("This indicates an error in the previous run.") - choice = input(f"Do you want to remove this file to enable resubmission (status={status}) [y/N]?").strip().lower() + choice = input(f"Do you want to remove this file to enable resubmission (status={status_entry.get('status')}) [y/N]?").strip().lower() if choice in ("yes", "y"): running_file.unlink() logger.info("File removed.") @@ -251,9 +251,12 @@ def get_status(sim: Simulation, colored: bool = True) -> str: running_file = Path(sim.sim_dir) / "RUNNING" if not jobid_file.exists(): status = "CRASHED" if running_file.exists() else "NO_JOBID" + runtime = None else: jobid = jobid_file.read_text().strip() - status, runtime = _get_slurm_status_sacct(jobid) + entry = _get_slurm_status_sacct(jobid) + status = entry.get("status", "UNKNOWN") + runtime = entry.get("runtime") if colored: status = _colorize_status(status) return status @@ -273,10 +276,10 @@ def get_job_id(sim: Simulation) -> str | None: else: return jobid_file.read_text().strip() -def _get_slurm_status_sacct(jobid: str) -> tuple[str, Optional[str]]: +def _get_slurm_status_sacct(jobid: str) -> Dict[str, Optional[str]]: """ Query SLURM sacct for job status and elapsed time. - Returns (status, runtime) tuple. + Returns dict: {'status': , 'runtime': } """ try: result = subprocess.run( @@ -290,18 +293,18 @@ def _get_slurm_status_sacct(jobid: str) -> tuple[str, Optional[str]]: if len(parts) >= 1: state = parts[0] runtime = parts[1] if len(parts) > 1 else None - return (state, runtime) - return ("UNKNOWN", None) + return {"status": state, "runtime": runtime} + return {"status": "UNKNOWN", "runtime": None} except Exception as e: logger.error(f"sacct error for job {jobid}: {e}") - return ("ERROR", None) + return {"status": "ERROR", "runtime": None} -def _get_slurm_status_bulk_sacct(jobids: List[str]) -> Dict[str, Union[str, tuple]]: +def _get_slurm_status_bulk_sacct(jobids: List[str]) -> Dict[str, Dict[str, Optional[str]]]: """ Query SLURM sacct for multiple job IDs (including array tasks) in one call. - Returns dict: jobid[_index] -> (status, runtime) + Returns dict: jobid[_index] -> {'status': , 'runtime': } """ - status_map: Dict[str, Union[str, tuple]] = {jid: "UNKNOWN" for jid in jobids} + status_map: Dict[str, Dict[str, Optional[str]]] = {jid: {"status": "UNKNOWN", "runtime": None} for jid in jobids} if not jobids: return status_map @@ -316,25 +319,25 @@ def _get_slurm_status_bulk_sacct(jobids: List[str]) -> Dict[str, Union[str, tupl jobid = parts[0] state = parts[1] runtime = parts[2] if len(parts) > 2 else None - status_map[jobid] = (state, runtime) + status_map[jobid] = {"status": state, "runtime": runtime} except Exception as e: logger.error(f"sacct bulk error: {e}") for jid in jobids: - status_map[jid] = ("ERROR", None) + status_map[jid] = {"status": "ERROR", "runtime": None} return status_map -def _get_slurm_status_bulk(jobids: List[str]) -> Dict[str, Union[str, tuple]]: +def _get_slurm_status_bulk(jobids: List[str]) -> Dict[str, Dict[str, Optional[str]]]: """ Query SLURM for multiple job IDs (including array tasks) in one call. Args: jobids: List of job IDs. Returns: - Dict mapping jobid[_index] to (status, runtime) or status string. + Dict mapping jobid[_index] to {'status':..., 'runtime':...}. """ if not jobids: return {} - status_map: Dict[str, Union[str, tuple]] = {jid: "FINISHED_OR_NOT_FOUND" for jid in jobids} + status_map: Dict[str, Dict[str, Optional[str]]] = {jid: {"status": "FINISHED_OR_NOT_FOUND", "runtime": None} for jid in jobids} found_in_squeue = set() try: @@ -359,7 +362,7 @@ def _get_slurm_status_bulk(jobids: List[str]) -> Dict[str, Union[str, tuple]]: continue jid, idx, state, runtime = parts full_id = jid if idx == "N/A" else idx - status_map[full_id] = (state, runtime) + status_map[full_id] = {"status": state, "runtime": runtime} found_in_squeue.add(full_id) except Exception as e: logger.error(f"Error parsing squeue output line '{line}': {e}") @@ -369,10 +372,12 @@ def _get_slurm_status_bulk(jobids: List[str]) -> Dict[str, Union[str, tuple]]: if missing_jobids: sacct_statuses = _get_slurm_status_bulk_sacct(missing_jobids) for jid in missing_jobids: - status_map[jid] = sacct_statuses.get(jid, ("UNKNOWN", None)) + status_map[jid] = sacct_statuses.get(jid, {"status": "UNKNOWN", "runtime": None}) else: # squeue failed, use sacct bulk for all jobids - status_map = _get_slurm_status_bulk_sacct(jobids) + sacct_statuses = _get_slurm_status_bulk_sacct(jobids) + for jid in jobids: + status_map[jid] = sacct_statuses.get(jid, {"status": "UNKNOWN", "runtime": None}) return status_map @@ -442,12 +447,9 @@ def get_status_all( running_file = Path(sim.sim_dir) / "RUNNING" status = "CRASHED" if running_file.exists() else "INACTIVE" else: - status_tuple = statuses.get(jobid, ("UNKNOWN", None)) - if isinstance(status_tuple, tuple): - status, runtime = status_tuple - else: - status = status_tuple - runtime = None + status_entry = statuses.get(jobid, {"status": "UNKNOWN", "runtime": None}) + status = status_entry.get("status", "UNKNOWN") + runtime = status_entry.get("runtime", None) num_bins = _bin_count(sim, counting_obs, refresh=(status == 'RUNNING') or refresh_cache) row = [sim.sim_dict.get(key, None) for key in keys] @@ -520,11 +522,8 @@ def find_sims_by_status( running_file = Path(sim.sim_dir) / "RUNNING" status = "CRASHED" if running_file.exists() else "INACTIVE" else: - status_tuple = statuses.get(jobid, ("UNKNOWN", None)) - if isinstance(status_tuple, tuple): - status, runtime = status_tuple - else: - status = status_tuple + status_entry = statuses.get(jobid, {"status":"UNKNOWN","runtime":None}) + status = status_entry.get("status","UNKNOWN") if status in filter: sims_with_status.append(sim) if not sims_with_status: From 4b5c9f6b4cc51bbe6d46de2cdfaafb7eb64b8c68 Mon Sep 17 00:00:00 2001 From: Johannes Hofmann Date: Wed, 8 Oct 2025 13:36:55 +0200 Subject: [PATCH 8/8] working on pylint and ruff fixes --- py_alf/cluster_submission.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/py_alf/cluster_submission.py b/py_alf/cluster_submission.py index 7c6680a..dcbd1c5 100644 --- a/py_alf/cluster_submission.py +++ b/py_alf/cluster_submission.py @@ -251,17 +251,15 @@ def get_status(sim: Simulation, colored: bool = True) -> str: running_file = Path(sim.sim_dir) / "RUNNING" if not jobid_file.exists(): status = "CRASHED" if running_file.exists() else "NO_JOBID" - runtime = None else: jobid = jobid_file.read_text().strip() entry = _get_slurm_status_sacct(jobid) status = entry.get("status", "UNKNOWN") - runtime = entry.get("runtime") if colored: status = _colorize_status(status) return status -def get_job_id(sim: Simulation) -> str | None: +def get_job_id(sim: Simulation) -> Optional[str]: """ Returns colorized SLURM job status for a simulation. Args: @@ -498,7 +496,7 @@ def get_status_all( def find_sims_by_status( sims: Iterable[Simulation], filter: List[str] -) -> List[Simulation] | None: +) -> Optional[List[Simulation]]: """ Prints a table of statuses for all simulations (bulk SLURM query). Args: @@ -680,7 +678,7 @@ def print_logfile( logger.error(f"Error reading {log_file}: {e}") return None -def _find_job_log(jobid: str, root_dir: List[str] = None) -> Path | None: +def _find_job_log(jobid: str, root_dir: List[str] = None) -> Optional[Path]: if root_dir is None: root_dir = ['.'] if jobid is None: