From 3f0d0613313b999ed13307b5b80fd7b3bd2d794b Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Sun, 8 Jun 2025 00:05:55 +0200 Subject: [PATCH 1/8] feat: introduce logger on dask workers in drivers and dataprocessors --- src/queens/data_processors/_data_processor.py | 16 ++++++- src/queens/drivers/_driver.py | 30 ++++++++++++- src/queens/drivers/function.py | 2 +- src/queens/drivers/jobscript.py | 15 ++++--- src/queens/utils/config_directories.py | 15 +++++++ src/queens/utils/logger_settings.py | 45 +++++++++++++++++++ 6 files changed, 112 insertions(+), 11 deletions(-) diff --git a/src/queens/data_processors/_data_processor.py b/src/queens/data_processors/_data_processor.py index 78fb1dd28..cf0225818 100644 --- a/src/queens/data_processors/_data_processor.py +++ b/src/queens/data_processors/_data_processor.py @@ -18,6 +18,8 @@ import logging from pathlib import Path +from queens.utils.logger_settings import setup_logger_on_dask_worker + _logger = logging.getLogger(__name__) @@ -55,6 +57,7 @@ def __init__( implement valid options for this dictionary. files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted. The paths can contain regex expressions. + logger_on_dask_worker (logging.Logger): Logger instance used on the dask worker """ if not file_name_identifier: raise ValueError( @@ -90,15 +93,23 @@ def __init__( self.file_options_dict = file_options_dict self.file_name_identifier = file_name_identifier - def get_data_from_file(self, base_dir_file): + self.logger_on_dask_worker = None + + def get_data_from_file(self, base_dir_file, experiment_dir=None): """Get data of interest from file. Args: base_dir_file (Path): Path of the base directory that contains the file of interest + experiment_dir (Path): Path of QUEENS experiment. Returns: processed_data (np.array): Final data from data processor module """ + if self.logger_on_dask_worker is None: + self.logger_on_dask_worker = setup_logger_on_dask_worker( + name=type(self).__name__, experiment_dir=experiment_dir, level=logging.INFO + ) + if not base_dir_file: raise ValueError( "The data processor requires a base_directory for the " @@ -109,7 +120,8 @@ def get_data_from_file(self, base_dir_file): "The argument 'base_dir_file' must be of type 'Path' " f"but is of type {type(base_dir_file)}. Abort..." ) - + self.logger_on_dask_worker.info("\nProcessing the following files:") + self.logger_on_dask_worker.info(str(base_dir_file)) file_path = self._check_file_exist_and_is_unique(base_dir_file) processed_data = None if file_path: diff --git a/src/queens/drivers/_driver.py b/src/queens/drivers/_driver.py index 1d0829302..a75897ccc 100644 --- a/src/queens/drivers/_driver.py +++ b/src/queens/drivers/_driver.py @@ -17,6 +17,9 @@ import abc import logging from pathlib import Path +from typing import final + +from queens.utils.logger_settings import setup_logger_on_dask_worker _logger = logging.getLogger(__name__) @@ -27,6 +30,7 @@ class Driver(metaclass=abc.ABCMeta): Attributes: parameters (Parameters): Parameters object files_to_copy (list): files or directories to copy to experiment_dir + logger_on_dask_worker (logging.Logger): Logger instance used on the dask worker """ def __init__(self, parameters, files_to_copy=None): @@ -46,8 +50,32 @@ def __init__(self, parameters, files_to_copy=None): raise TypeError("files_to_copy must be a list of strings or Path objects") self.files_to_copy = files_to_copy - @abc.abstractmethod + self.logger_on_dask_worker = None + + @final def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + """Run driver. + + Args: + sample (dict): Dict containing sample + job_id (int): Job ID + num_procs (int): number of processors + experiment_name (str): name of QUEENS experiment. + experiment_dir (Path): Path to QUEENS experiment directory. + + Returns: + Result and potentially the gradient + """ + if self.logger_on_dask_worker is None: + self.logger_on_dask_worker = setup_logger_on_dask_worker( + name=type(self).__name__, experiment_dir=experiment_dir, level=logging.INFO + ) + self.logger_on_dask_worker.info("Running job %i", job_id) + + return self._run(sample, job_id, num_procs, experiment_dir, experiment_name) + + @abc.abstractmethod + def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): """Abstract method for driver run. Args: diff --git a/src/queens/drivers/function.py b/src/queens/drivers/function.py index bd77e240e..848957b10 100644 --- a/src/queens/drivers/function.py +++ b/src/queens/drivers/function.py @@ -112,7 +112,7 @@ def reshaped_output_function(sample_dict): return reshaped_output_function - def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): """Run the driver. Args: diff --git a/src/queens/drivers/jobscript.py b/src/queens/drivers/jobscript.py index 7dd40fa01..ab76a9443 100644 --- a/src/queens/drivers/jobscript.py +++ b/src/queens/drivers/jobscript.py @@ -187,7 +187,7 @@ def get_read_in_jobscript_template(jobscript_template): return jobscript_template - def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): """Run the driver. Args: @@ -239,7 +239,7 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): self._run_executable(job_id, execute_cmd) with metadata.time_code("data_processing"): - results = self._get_results(output_dir) + results = self._get_results(output_dir, experiment_dir) metadata.outputs = results return results @@ -293,11 +293,12 @@ def _run_executable(self, job_id, execute_cmd): f"{process_returncode}.", ) - def _get_results(self, output_dir): + def _get_results(self, output_dir, experiment_dir=None): """Get results from driver run. Args: output_dir (Path): Path to output directory. + experiment_dir (Path | None): Path to QUEENS experiment directory. Returns: result (np.array): Result from the driver run. @@ -305,13 +306,13 @@ def _get_results(self, output_dir): """ result = None if self.data_processor: - result = self.data_processor.get_data_from_file(output_dir) - _logger.debug("Got result: %s", result) + result = self.data_processor.get_data_from_file(output_dir, experiment_dir) + self.logger_on_dask_worker.info("Got result: %s", result) gradient = None if self.gradient_data_processor: - gradient = self.gradient_data_processor.get_data_from_file(output_dir) - _logger.debug("Got gradient: %s", gradient) + gradient = self.gradient_data_processor.get_data_from_file(output_dir, experiment_dir) + self.logger_on_dask_worker.info("Got gradient: %s", gradient) return result, gradient def prepare_input_files(self, sample_dict, experiment_dir, input_files): diff --git a/src/queens/utils/config_directories.py b/src/queens/utils/config_directories.py index f21524404..b3555bffe 100644 --- a/src/queens/utils/config_directories.py +++ b/src/queens/utils/config_directories.py @@ -73,6 +73,21 @@ def experiment_directory( return experiment_dir +def logging_directory_on_dask_worker(experiment_dir): + """Directory for log-files of dask workers. + + This is called on the Dask Worker thus the experiment directory should lready exist + and can be passed directly. + + + Args: + experiment_dir (Path): Directory for data of a specific experiment on the computing machine. + """ + log_dir = experiment_dir / "dask_workers_logs" + create_directory(log_dir) + return log_dir + + def create_directory(dir_path: str | Path) -> None: """Create a directory either local or remote. diff --git a/src/queens/utils/logger_settings.py b/src/queens/utils/logger_settings.py index ed8c4513e..cf5ac3767 100644 --- a/src/queens/utils/logger_settings.py +++ b/src/queens/utils/logger_settings.py @@ -21,6 +21,9 @@ from pathlib import Path from typing import Any, Callable, ParamSpec +from dask.distributed import get_worker + +from queens.utils.config_directories import logging_directory_on_dask_worker from queens.utils.printing import get_str_table LIBRARY_LOGGER_NAME = "queens" @@ -279,3 +282,45 @@ def key_fun(pair: tuple[str, Any]) -> int: method(*args, **kwargs) return wrapper + + +def setup_logger_on_dask_worker(name="worker", experiment_dir=None, level=logging.INFO): + """Setup a logger on a dask worker. + + Args: + name (str): Name of the logger. + experiment_dir (Path): Path to the experiment directory. + level (int): Logging level. + + Returns: + logger (logging.Logger): Logger instance. + """ + logger = logging.getLogger(name) + + if logger.hasHandlers(): + return logger # Already configured (avoid duplicate handlers) + + logger.setLevel(level) + formatter = NewLineFormatter( + "%(asctime)s %(name)-12s %(levelname)-8s %(message)s", datefmt="%m-%d %H:%M" + ) + + if experiment_dir is not None: + log_dir = logging_directory_on_dask_worker(experiment_dir) + + try: + worker = get_worker() + log_file = log_dir / f"{worker.name}.log" + except ValueError: + # Not inside a Dask worker — maybe running locally + log_file = log_dir / f"{name}_client.log" + + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + return logger From b0500899f72ba9baa780ab91f19cd6f450166913 Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Sat, 23 Aug 2025 12:55:52 +0200 Subject: [PATCH 2/8] feat: use one worker log file per job written to subdir of job dir --- src/queens/data_processors/_data_processor.py | 47 ++++++------ src/queens/drivers/_driver.py | 71 +++++++++++++++---- src/queens/drivers/function.py | 19 ++++- src/queens/drivers/jobscript.py | 55 +++++++------- src/queens/utils/config_directories.py | 15 ---- src/queens/utils/logger_settings.py | 42 +++++------ 6 files changed, 146 insertions(+), 103 deletions(-) diff --git a/src/queens/data_processors/_data_processor.py b/src/queens/data_processors/_data_processor.py index cf0225818..3440f4bfc 100644 --- a/src/queens/data_processors/_data_processor.py +++ b/src/queens/data_processors/_data_processor.py @@ -18,7 +18,7 @@ import logging from pathlib import Path -from queens.utils.logger_settings import setup_logger_on_dask_worker +from queens.utils.logger_settings import setup_logger_on_worker _logger = logging.getLogger(__name__) @@ -93,55 +93,56 @@ def __init__( self.file_options_dict = file_options_dict self.file_name_identifier = file_name_identifier - self.logger_on_dask_worker = None + self.logger_on_worker = None - def get_data_from_file(self, base_dir_file, experiment_dir=None): + def get_data_from_file(self, base_dir): """Get data of interest from file. Args: - base_dir_file (Path): Path of the base directory that contains the file of interest - experiment_dir (Path): Path of QUEENS experiment. + base_dir (Path): Path of the base directory that contains the file of interest Returns: processed_data (np.array): Final data from data processor module """ - if self.logger_on_dask_worker is None: - self.logger_on_dask_worker = setup_logger_on_dask_worker( - name=type(self).__name__, experiment_dir=experiment_dir, level=logging.INFO - ) - - if not base_dir_file: + if not base_dir: raise ValueError( "The data processor requires a base_directory for the " "files to operate on! Your input was empty! Abort..." ) - if not isinstance(base_dir_file, Path): + if not isinstance(base_dir, Path): raise TypeError( - "The argument 'base_dir_file' must be of type 'Path' " - f"but is of type {type(base_dir_file)}. Abort..." + "The argument 'base_dir' must be of type 'Path' " + f"but is of type {type(base_dir)}. Abort..." ) - self.logger_on_dask_worker.info("\nProcessing the following files:") - self.logger_on_dask_worker.info(str(base_dir_file)) - file_path = self._check_file_exist_and_is_unique(base_dir_file) + + self.logger_on_worker = setup_logger_on_worker( + name=type(self).__name__, + log_dir=base_dir, + level=logging.INFO, + ) + + file_path = self._check_file_exist_and_is_unique(base_dir) + self.logger_on_worker.info("\nProcessing the following files:") + self.logger_on_worker.info(str(file_path)) processed_data = None if file_path: raw_data = self.get_raw_data_from_file(file_path) filtered_data = self.filter_and_manipulate_raw_data(raw_data) processed_data = self._subsequent_data_manipulation(filtered_data) - self._clean_up(base_dir_file) + self._clean_up(base_dir) return processed_data - def _check_file_exist_and_is_unique(self, base_dir_file): + def _check_file_exist_and_is_unique(self, base_dir): """Check if file exists. Args: - base_dir_file (Path): Path to base directory that contains file of interest + base_dir (Path): Path to base directory that contains file of interest Returns: file_path (str): Actual path to the file of interest. """ - file_list = list(base_dir_file.glob(self.file_name_identifier)) + file_list = list(base_dir.glob(self.file_name_identifier)) if len(file_list) > 1: raise RuntimeError( @@ -153,8 +154,8 @@ def _check_file_exist_and_is_unique(self, base_dir_file): if len(file_list) == 1: file_path = file_list[0] else: - _logger.warning( - "The file '%s' does not exist!", base_dir_file / self.file_name_identifier + self.logger_on_worker.warning( + "The file '%s' does not exist!", base_dir / self.file_name_identifier ) file_path = None diff --git a/src/queens/drivers/_driver.py b/src/queens/drivers/_driver.py index a75897ccc..f661298f7 100644 --- a/src/queens/drivers/_driver.py +++ b/src/queens/drivers/_driver.py @@ -19,7 +19,7 @@ from pathlib import Path from typing import final -from queens.utils.logger_settings import setup_logger_on_dask_worker +from queens.utils.logger_settings import setup_logger_on_worker _logger = logging.getLogger(__name__) @@ -30,7 +30,7 @@ class Driver(metaclass=abc.ABCMeta): Attributes: parameters (Parameters): Parameters object files_to_copy (list): files or directories to copy to experiment_dir - logger_on_dask_worker (logging.Logger): Logger instance used on the dask worker + logger_on_worker (logging.Logger): Logger instance used on the dask worker """ def __init__(self, parameters, files_to_copy=None): @@ -50,7 +50,7 @@ def __init__(self, parameters, files_to_copy=None): raise TypeError("files_to_copy must be a list of strings or Path objects") self.files_to_copy = files_to_copy - self.logger_on_dask_worker = None + self.logger_on_worker = None @final def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): @@ -66,25 +66,72 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): Returns: Result and potentially the gradient """ - if self.logger_on_dask_worker is None: - self.logger_on_dask_worker = setup_logger_on_dask_worker( - name=type(self).__name__, experiment_dir=experiment_dir, level=logging.INFO - ) - self.logger_on_dask_worker.info("Running job %i", job_id) - - return self._run(sample, job_id, num_procs, experiment_dir, experiment_name) + job_dir, output_dir, output_file, log_file = self._manage_paths(job_id, experiment_dir) + self.logger_on_worker = setup_logger_on_worker( + name=type(self).__name__, log_dir=output_dir, level=logging.INFO + ) + + return self._run( + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + job_dir, + output_dir, + output_file, + log_file, + ) @abc.abstractmethod - def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + def _run( + self, + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + job_dir, + output_dir, + output_file, + log_file, + ): """Abstract method for driver run. Args: sample (dict): Dict containing sample job_id (int): Job ID num_procs (int): number of processors - experiment_name (str): name of QUEENS experiment. experiment_dir (Path): Path to QUEENS experiment directory. + experiment_name (str): name of QUEENS experiment. + job_dir (Path): Path to job directory. + output_dir (Path): Path to output directory. + output_file (Path): Path to output file(s). + log_file (Path): Path to log file. Returns: Result and potentially the gradient """ + + def _manage_paths(self, job_id, experiment_dir): + """Manage paths for driver run. + + Args: + job_id (int): Job ID. + experiment_dir (Path): Path to QUEENS experiment directory. + + Returns: + job_dir (Path): Path to job directory. + output_dir (Path): Path to output directory. + output_file (Path): Path to output file(s). + log_file (Path): Path to log file. + """ + job_dir = experiment_dir / str(job_id) + output_dir = job_dir / "output" + output_dir.mkdir(parents=True, exist_ok=True) + + output_prefix = "output" + output_file = output_dir / output_prefix + log_file = output_dir / (output_prefix + ".log") + + return job_dir, output_dir, output_file, log_file diff --git a/src/queens/drivers/function.py b/src/queens/drivers/function.py index 848957b10..ab2d51826 100644 --- a/src/queens/drivers/function.py +++ b/src/queens/drivers/function.py @@ -112,15 +112,30 @@ def reshaped_output_function(sample_dict): return reshaped_output_function - def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + def _run( + self, + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + job_dir, + output_dir, + output_file, + log_file, + ): """Run the driver. Args: sample (dict): Dict containing sample job_id (int): Job ID num_procs (int): number of processors - experiment_name (str): name of QUEENS experiment. experiment_dir (Path): Path to QUEENS experiment directory. + experiment_name (str): name of QUEENS experiment. + job_dir (Path): Path to job directory. + output_dir (Path): Path to output directory. + output_file (Path): Path to output file(s). + log_file (Path): Path to log file. Returns: Result and potentially the gradient diff --git a/src/queens/drivers/jobscript.py b/src/queens/drivers/jobscript.py index ab76a9443..715d7564d 100644 --- a/src/queens/drivers/jobscript.py +++ b/src/queens/drivers/jobscript.py @@ -187,7 +187,18 @@ def get_read_in_jobscript_template(jobscript_template): return jobscript_template - def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + def _run( + self, + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + job_dir, + output_dir, + output_file, + log_file, + ): """Run the driver. Args: @@ -195,14 +206,16 @@ def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): job_id (int): Job ID. num_procs (int): Number of processors. experiment_dir (Path): Path to QUEENS experiment directory. - experiment_name (str): Name of QUEENS experiment. + experiment_name (str): name of QUEENS experiment. + job_dir (Path): Path to job directory. + output_dir (Path): Path to output directory. + output_file (Path): Path to output file(s). + log_file (Path): Path to log file. Returns: Result and potentially the gradient. """ - job_dir, output_dir, output_file, input_files, log_file = self._manage_paths( - job_id, experiment_dir - ) + input_files = self._manage_input_files(job_dir) sample_dict = self.parameters.sample_as_dict(sample) @@ -239,39 +252,26 @@ def _run(self, sample, job_id, num_procs, experiment_dir, experiment_name): self._run_executable(job_id, execute_cmd) with metadata.time_code("data_processing"): - results = self._get_results(output_dir, experiment_dir) + results = self._get_results(output_dir) metadata.outputs = results return results - def _manage_paths(self, job_id, experiment_dir): + def _manage_input_files(self, job_dir): """Manage paths for driver run. Args: - job_id (int): Job ID. - experiment_dir (Path): Path to QUEENS experiment directory. + job_dir (Path): Path to job directory. Returns: - job_dir (Path): Path to job directory. - output_dir (Path): Path to output directory. - output_file (Path): Path to output file(s). input_files (dict): Dict with name and path of the input file(s). - log_file (Path): Path to log file. """ - job_dir = experiment_dir / str(job_id) - output_dir = job_dir / "output" - output_dir.mkdir(parents=True, exist_ok=True) - - output_prefix = "output" - output_file = output_dir / output_prefix - log_file = output_dir / (output_prefix + ".log") - input_files = {} for input_template_name, input_template_path in self.input_templates.items(): input_file_str = input_template_name + "".join(input_template_path.suffixes) input_files[input_template_name] = job_dir / input_file_str - return job_dir, output_dir, output_file, input_files, log_file + return input_files def _run_executable(self, job_id, execute_cmd): """Run executable. @@ -293,12 +293,11 @@ def _run_executable(self, job_id, execute_cmd): f"{process_returncode}.", ) - def _get_results(self, output_dir, experiment_dir=None): + def _get_results(self, output_dir): """Get results from driver run. Args: output_dir (Path): Path to output directory. - experiment_dir (Path | None): Path to QUEENS experiment directory. Returns: result (np.array): Result from the driver run. @@ -306,13 +305,13 @@ def _get_results(self, output_dir, experiment_dir=None): """ result = None if self.data_processor: - result = self.data_processor.get_data_from_file(output_dir, experiment_dir) - self.logger_on_dask_worker.info("Got result: %s", result) + result = self.data_processor.get_data_from_file(output_dir) + self.logger_on_worker.info("Got result: %s", result) gradient = None if self.gradient_data_processor: - gradient = self.gradient_data_processor.get_data_from_file(output_dir, experiment_dir) - self.logger_on_dask_worker.info("Got gradient: %s", gradient) + gradient = self.gradient_data_processor.get_data_from_file(output_dir) + self.logger_on_worker.info("Got gradient: %s", gradient) return result, gradient def prepare_input_files(self, sample_dict, experiment_dir, input_files): diff --git a/src/queens/utils/config_directories.py b/src/queens/utils/config_directories.py index b3555bffe..f21524404 100644 --- a/src/queens/utils/config_directories.py +++ b/src/queens/utils/config_directories.py @@ -73,21 +73,6 @@ def experiment_directory( return experiment_dir -def logging_directory_on_dask_worker(experiment_dir): - """Directory for log-files of dask workers. - - This is called on the Dask Worker thus the experiment directory should lready exist - and can be passed directly. - - - Args: - experiment_dir (Path): Directory for data of a specific experiment on the computing machine. - """ - log_dir = experiment_dir / "dask_workers_logs" - create_directory(log_dir) - return log_dir - - def create_directory(dir_path: str | Path) -> None: """Create a directory either local or remote. diff --git a/src/queens/utils/logger_settings.py b/src/queens/utils/logger_settings.py index cf5ac3767..c0f9fa512 100644 --- a/src/queens/utils/logger_settings.py +++ b/src/queens/utils/logger_settings.py @@ -21,9 +21,6 @@ from pathlib import Path from typing import Any, Callable, ParamSpec -from dask.distributed import get_worker - -from queens.utils.config_directories import logging_directory_on_dask_worker from queens.utils.printing import get_str_table LIBRARY_LOGGER_NAME = "queens" @@ -284,12 +281,12 @@ def key_fun(pair: tuple[str, Any]) -> int: return wrapper -def setup_logger_on_dask_worker(name="worker", experiment_dir=None, level=logging.INFO): - """Setup a logger on a dask worker. +def setup_logger_on_worker(name="worker", log_dir=None, level=logging.INFO): + """Setup a logger on a scheduler's worker. Args: name (str): Name of the logger. - experiment_dir (Path): Path to the experiment directory. + log_dir (Path): Path to the directory for the log file. level (int): Logging level. Returns: @@ -297,30 +294,29 @@ def setup_logger_on_dask_worker(name="worker", experiment_dir=None, level=loggin """ logger = logging.getLogger(name) - if logger.hasHandlers(): - return logger # Already configured (avoid duplicate handlers) - - logger.setLevel(level) formatter = NewLineFormatter( "%(asctime)s %(name)-12s %(levelname)-8s %(message)s", datefmt="%m-%d %H:%M" ) - if experiment_dir is not None: - log_dir = logging_directory_on_dask_worker(experiment_dir) - - try: - worker = get_worker() - log_file = log_dir / f"{worker.name}.log" - except ValueError: - # Not inside a Dask worker — maybe running locally - log_file = log_dir / f"{name}_client.log" + logger.setLevel(level) + # if the worker is set up for the first time + if not logger.hasHandlers(): + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + # Remove all FileHandlers, keep others (like StreamHandler) + # it should always have at least the StreamHandler from above but for safety + if logger.hasHandlers(): + for handler in logger.handlers[:]: + if isinstance(handler, logging.FileHandler): + logger.removeHandler(handler) + handler.close() # optional: closes the file descriptor + if log_dir is not None: + log_file = log_dir / "worker.log" file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger.addHandler(file_handler) - stream_handler = logging.StreamHandler() - stream_handler.setFormatter(formatter) - logger.addHandler(stream_handler) - return logger From 3fce60676719e7efba4e1fa46a358244dc3ef42c Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Sat, 23 Aug 2025 13:29:57 +0200 Subject: [PATCH 3/8] feat: expose worker log level and control of log file to user --- src/queens/data_processors/_data_processor.py | 16 +++++++++++---- src/queens/data_processors/csv_file.py | 8 ++++++++ src/queens/data_processors/numpy_file.py | 7 +++++++ src/queens/data_processors/pvd_file.py | 7 +++++++ src/queens/data_processors/txt_file.py | 7 +++++++ src/queens/drivers/_driver.py | 20 ++++++++++++++++--- src/queens/drivers/function.py | 11 +++++++++- src/queens/drivers/jobscript.py | 12 ++++++++++- src/queens/drivers/mpi.py | 9 +++++++++ src/queens_interfaces/fourc/driver.py | 9 +++++++++ 10 files changed, 97 insertions(+), 9 deletions(-) diff --git a/src/queens/data_processors/_data_processor.py b/src/queens/data_processors/_data_processor.py index 3440f4bfc..c6ef7f508 100644 --- a/src/queens/data_processors/_data_processor.py +++ b/src/queens/data_processors/_data_processor.py @@ -38,6 +38,9 @@ class DataProcessor(metaclass=abc.ABCMeta): The file prefix can contain BASIC regex expression and subdirectories. Examples are wildcards `*` or expressions like `[ab]`. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + logger_on_worker (logging.Logger): Logger instance used on the dask worker """ def __init__( @@ -45,6 +48,8 @@ def __init__( file_name_identifier=None, file_options_dict=None, files_to_be_deleted_regex_lst=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Init data processor class. @@ -57,7 +62,9 @@ def __init__( implement valid options for this dictionary. files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted. The paths can contain regex expressions. - logger_on_dask_worker (logging.Logger): Logger instance used on the dask worker + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ if not file_name_identifier: raise ValueError( @@ -93,6 +100,8 @@ def __init__( self.file_options_dict = file_options_dict self.file_name_identifier = file_name_identifier + self.worker_log_level = worker_log_level + self.write_worker_log_files = write_worker_log_files self.logger_on_worker = None def get_data_from_file(self, base_dir): @@ -115,10 +124,9 @@ def get_data_from_file(self, base_dir): f"but is of type {type(base_dir)}. Abort..." ) + worker_log_dir = base_dir if self.write_worker_log_files else None self.logger_on_worker = setup_logger_on_worker( - name=type(self).__name__, - log_dir=base_dir, - level=logging.INFO, + name=type(self).__name__, log_dir=worker_log_dir, level=self.worker_log_level ) file_path = self._check_file_exist_and_is_unique(base_dir) diff --git a/src/queens/data_processors/csv_file.py b/src/queens/data_processors/csv_file.py index 454a81652..900e3e519 100644 --- a/src/queens/data_processors/csv_file.py +++ b/src/queens/data_processors/csv_file.py @@ -67,6 +67,8 @@ def __init__( file_name_identifier=None, file_options_dict=None, files_to_be_deleted_regex_lst=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for csv data. @@ -99,6 +101,10 @@ def __init__( files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted. The paths can contain regex expressions. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) + Returns: Instance of CsvFile class """ @@ -106,6 +112,8 @@ def __init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) header_row = file_options_dict.get("header_row") diff --git a/src/queens/data_processors/numpy_file.py b/src/queens/data_processors/numpy_file.py index c9ecafdd3..9acc470bb 100644 --- a/src/queens/data_processors/numpy_file.py +++ b/src/queens/data_processors/numpy_file.py @@ -33,6 +33,8 @@ def __init__( file_name_identifier=None, file_options_dict=None, files_to_be_deleted_regex_lst=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for numpy binary data. @@ -43,11 +45,16 @@ def __init__( file_options_dict (dict): Dictionary with read-in options for the file files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted. The paths can contain regex expressions. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) def get_raw_data_from_file(self, file_path): diff --git a/src/queens/data_processors/pvd_file.py b/src/queens/data_processors/pvd_file.py index b48009b94..b01877d53 100644 --- a/src/queens/data_processors/pvd_file.py +++ b/src/queens/data_processors/pvd_file.py @@ -45,6 +45,8 @@ def __init__( time_steps=None, block=0, point_data=True, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for pvd data extraction. @@ -60,11 +62,16 @@ def __init__( block (int, optional): Considered block of MultiBlock data set (first block by default) point_data (bool, optional): Whether to extract point data (True) or cell data (False). Defaults to point data. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) self.field_name = field_name if time_steps is None: diff --git a/src/queens/data_processors/txt_file.py b/src/queens/data_processors/txt_file.py index 2d66f2212..b17f31516 100644 --- a/src/queens/data_processors/txt_file.py +++ b/src/queens/data_processors/txt_file.py @@ -54,6 +54,8 @@ def __init__( logger_prefix=r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} - " r"queens\.drivers\.driver_\d* - INFO -", max_file_size_in_mega_byte=200, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for txt data. @@ -72,11 +74,16 @@ def __init__( max_file_size_in_mega_byte (int): Upper limit of the file size to be read into memory in megabyte (MB). See comment above on Potential Improvement. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) self.remove_logger_prefix_from_raw_data = remove_logger_prefix_from_raw_data self.logger_prefix = logger_prefix diff --git a/src/queens/drivers/_driver.py b/src/queens/drivers/_driver.py index f661298f7..8b2c7b02e 100644 --- a/src/queens/drivers/_driver.py +++ b/src/queens/drivers/_driver.py @@ -30,15 +30,26 @@ class Driver(metaclass=abc.ABCMeta): Attributes: parameters (Parameters): Parameters object files_to_copy (list): files or directories to copy to experiment_dir - logger_on_worker (logging.Logger): Logger instance used on the dask worker + worker_log_level (int | str): Logging level used on the worker + write_worker_log_files (bool): Switch on/off writing of worker logs to files (one per job) + logger_on_worker (logging.Logger): Logger instance used on the worker """ - def __init__(self, parameters, files_to_copy=None): + def __init__( + self, + parameters, + files_to_copy=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, + ): """Initialize Driver object. Args: parameters (Parameters): Parameters object files_to_copy (list): files or directories to copy to experiment_dir + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ self.parameters = parameters if files_to_copy is None: @@ -50,6 +61,8 @@ def __init__(self, parameters, files_to_copy=None): raise TypeError("files_to_copy must be a list of strings or Path objects") self.files_to_copy = files_to_copy + self.worker_log_level = worker_log_level + self.write_worker_log_files = write_worker_log_files self.logger_on_worker = None @final @@ -67,8 +80,9 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): Result and potentially the gradient """ job_dir, output_dir, output_file, log_file = self._manage_paths(job_id, experiment_dir) + worker_log_dir = output_dir if self.write_worker_log_files else None self.logger_on_worker = setup_logger_on_worker( - name=type(self).__name__, log_dir=output_dir, level=logging.INFO + name=type(self).__name__, log_dir=worker_log_dir, level=self.worker_log_level ) return self._run( diff --git a/src/queens/drivers/function.py b/src/queens/drivers/function.py index ab2d51826..43b72862a 100644 --- a/src/queens/drivers/function.py +++ b/src/queens/drivers/function.py @@ -41,6 +41,8 @@ def __init__( parameters, function, external_python_module_function=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Initialize Function object. @@ -48,8 +50,15 @@ def __init__( parameters (Parameters): Parameters object function (callable, str): Function or name of example function provided by QUEENS external_python_module_function (Path | str): Path to external module with function + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ - super().__init__(parameters=parameters) + super().__init__( + parameters=parameters, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, + ) if external_python_module_function is None: if isinstance(function, str): # Try to load existing simulator functions diff --git a/src/queens/drivers/jobscript.py b/src/queens/drivers/jobscript.py index 715d7564d..25e7eb314 100644 --- a/src/queens/drivers/jobscript.py +++ b/src/queens/drivers/jobscript.py @@ -97,6 +97,8 @@ def __init__( jobscript_file_name="jobscript.sh", extra_options=None, raise_error_on_jobscript_failure=True, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Initialize Jobscript object. @@ -113,8 +115,16 @@ def __init__( extra_options (dict, opt): Extra options to inject into jobscript template. raise_error_on_jobscript_failure (bool, opt): Whether to raise an error for a non-zero jobscript exit code. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ - super().__init__(parameters=parameters, files_to_copy=files_to_copy) + super().__init__( + parameters=parameters, + files_to_copy=files_to_copy, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, + ) self.input_templates = self.create_input_templates_dict(input_templates) self.jobscript_template = self.get_read_in_jobscript_template(jobscript_template) self.files_to_copy.extend(self.input_templates.values()) diff --git a/src/queens/drivers/mpi.py b/src/queens/drivers/mpi.py index b3af292c2..1fa973874 100644 --- a/src/queens/drivers/mpi.py +++ b/src/queens/drivers/mpi.py @@ -14,6 +14,8 @@ # """Convenience wrapper around Jobscript Driver.""" +import logging + from queens.drivers.jobscript import Jobscript from queens.utils.logger_settings import log_init_args @@ -35,6 +37,8 @@ def __init__( data_processor=None, gradient_data_processor=None, mpi_cmd="/usr/bin/mpirun --bind-to none", + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Initialize MPI object. @@ -46,6 +50,9 @@ def __init__( data_processor (obj, opt): instance of data processor class gradient_data_processor (obj, opt): instance of data processor class for gradient data mpi_cmd (str, opt): mpi command + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ # pylint: disable=duplicate-code extra_options = { @@ -60,4 +67,6 @@ def __init__( data_processor=data_processor, gradient_data_processor=gradient_data_processor, extra_options=extra_options, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) diff --git a/src/queens_interfaces/fourc/driver.py b/src/queens_interfaces/fourc/driver.py index 4e81477b2..4b8ef88b5 100644 --- a/src/queens_interfaces/fourc/driver.py +++ b/src/queens_interfaces/fourc/driver.py @@ -14,6 +14,8 @@ # """Driver to run 4C.""" +import logging + from queens.drivers.jobscript import Jobscript from queens.utils.logger_settings import log_init_args @@ -41,6 +43,8 @@ def __init__( post_processor="", post_options="", mpi_cmd="/usr/bin/mpirun --bind-to none", + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Initialize Fourc object. @@ -54,6 +58,9 @@ def __init__( post_processor (path, opt): path to post_processor post_options (str, opt): options for post-processing mpi_cmd (str, opt): mpi command + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ # pylint: disable=duplicate-code extra_options = { @@ -70,4 +77,6 @@ def __init__( data_processor=data_processor, gradient_data_processor=gradient_data_processor, extra_options=extra_options, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) From 72b3a0b0d8eb70c766c2b1dce2d2eebced0c9b73 Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Thu, 6 Nov 2025 15:30:12 +0100 Subject: [PATCH 4/8] refactor: rename job_directory helper function and use in driver --- src/queens/drivers/_driver.py | 5 +++-- src/queens/models/adjoint.py | 4 ++-- src/queens/utils/config_directories.py | 10 +++++----- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/queens/drivers/_driver.py b/src/queens/drivers/_driver.py index 8b2c7b02e..85ef80e7d 100644 --- a/src/queens/drivers/_driver.py +++ b/src/queens/drivers/_driver.py @@ -19,6 +19,7 @@ from pathlib import Path from typing import final +from queens.utils.config_directories import create_directory, job_directory from queens.utils.logger_settings import setup_logger_on_worker _logger = logging.getLogger(__name__) @@ -140,9 +141,9 @@ def _manage_paths(self, job_id, experiment_dir): output_file (Path): Path to output file(s). log_file (Path): Path to log file. """ - job_dir = experiment_dir / str(job_id) + job_dir = job_directory(experiment_dir, job_id) output_dir = job_dir / "output" - output_dir.mkdir(parents=True, exist_ok=True) + create_directory(output_dir) output_prefix = "output" output_file = output_dir / output_prefix diff --git a/src/queens/models/adjoint.py b/src/queens/models/adjoint.py index fb1c4e765..08a7299c4 100644 --- a/src/queens/models/adjoint.py +++ b/src/queens/models/adjoint.py @@ -17,7 +17,7 @@ import logging from queens.models.simulation import Simulation -from queens.utils.config_directories import current_job_directory +from queens.utils.config_directories import job_directory from queens.utils.io import write_to_csv from queens.utils.logger_settings import log_init_args @@ -77,7 +77,7 @@ def grad(self, samples, upstream_gradient): # write adjoint data for each sample to adjoint files in old job directories for job_id, grad_objective in zip(last_job_ids, upstream_gradient): - job_dir = current_job_directory(experiment_dir, job_id) + job_dir = job_directory(experiment_dir, job_id) adjoint_file_path = job_dir.joinpath(self.adjoint_file) write_to_csv(adjoint_file_path, grad_objective.reshape(1, -1)) diff --git a/src/queens/utils/config_directories.py b/src/queens/utils/config_directories.py index f21524404..733ec140a 100644 --- a/src/queens/utils/config_directories.py +++ b/src/queens/utils/config_directories.py @@ -83,8 +83,8 @@ def create_directory(dir_path: str | Path) -> None: create_folder_if_not_existent(dir_path) -def current_job_directory(experiment_dir: Path, job_id: int) -> Path: - """Directory of the latest submitted job. +def job_directory(experiment_dir: Path, job_id: int) -> Path: + """Directory of a specific job. Args: experiment_dir: Experiment directory @@ -108,9 +108,9 @@ def job_dirs_in_experiment_dir(experiment_dir: Path | str) -> list[Path]: """ experiment_dir = Path(experiment_dir) job_directories = [] - for job_directory in experiment_dir.iterdir(): - if job_directory.is_dir() and job_directory.name.isdigit(): - job_directories.append(job_directory) + for job_dir in experiment_dir.iterdir(): + if job_dir.is_dir() and job_dir.name.isdigit(): + job_directories.append(job_dir) # Sort the jobs directories return sorted(job_directories, key=lambda x: int(x.name)) From b8cd85022958fa910b73296d8390dca5f4c0132f Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Thu, 6 Nov 2025 16:11:43 +0100 Subject: [PATCH 5/8] fix: add type hinting --- src/queens/utils/logger_settings.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/queens/utils/logger_settings.py b/src/queens/utils/logger_settings.py index c0f9fa512..18e6e2d42 100644 --- a/src/queens/utils/logger_settings.py +++ b/src/queens/utils/logger_settings.py @@ -281,16 +281,18 @@ def key_fun(pair: tuple[str, Any]) -> int: return wrapper -def setup_logger_on_worker(name="worker", log_dir=None, level=logging.INFO): +def setup_logger_on_worker( + name: str = "worker", log_dir: None | Path = None, level: int = logging.INFO +) -> logging.Logger: """Setup a logger on a scheduler's worker. Args: - name (str): Name of the logger. - log_dir (Path): Path to the directory for the log file. - level (int): Logging level. + name: Name of the logger. + log_dir: Path to the directory for the log file. + level: Logging level. Returns: - logger (logging.Logger): Logger instance. + logger: Logger instance. """ logger = logging.getLogger(name) From 86ac5d09c728ea132a1f9c7bcaecb8186915b268 Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Thu, 6 Nov 2025 16:13:36 +0100 Subject: [PATCH 6/8] feat: split manage paths and output files --- src/queens/drivers/_driver.py | 26 +++++++------------------- src/queens/drivers/function.py | 8 -------- src/queens/drivers/jobscript.py | 29 ++++++++++++++++++++--------- 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/src/queens/drivers/_driver.py b/src/queens/drivers/_driver.py index 85ef80e7d..98473d4f1 100644 --- a/src/queens/drivers/_driver.py +++ b/src/queens/drivers/_driver.py @@ -80,8 +80,11 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): Returns: Result and potentially the gradient """ - job_dir, output_dir, output_file, log_file = self._manage_paths(job_id, experiment_dir) - worker_log_dir = output_dir if self.write_worker_log_files else None + _, worker_log_dir = ( + self._manage_paths(job_id, experiment_dir) + if self.write_worker_log_files + else (None, None) + ) self.logger_on_worker = setup_logger_on_worker( name=type(self).__name__, log_dir=worker_log_dir, level=self.worker_log_level ) @@ -92,10 +95,6 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): num_procs, experiment_dir, experiment_name, - job_dir, - output_dir, - output_file, - log_file, ) @abc.abstractmethod @@ -106,10 +105,6 @@ def _run( num_procs, experiment_dir, experiment_name, - job_dir, - output_dir, - output_file, - log_file, ): """Abstract method for driver run. @@ -119,15 +114,12 @@ def _run( num_procs (int): number of processors experiment_dir (Path): Path to QUEENS experiment directory. experiment_name (str): name of QUEENS experiment. - job_dir (Path): Path to job directory. - output_dir (Path): Path to output directory. - output_file (Path): Path to output file(s). - log_file (Path): Path to log file. Returns: Result and potentially the gradient """ + @final def _manage_paths(self, job_id, experiment_dir): """Manage paths for driver run. @@ -145,8 +137,4 @@ def _manage_paths(self, job_id, experiment_dir): output_dir = job_dir / "output" create_directory(output_dir) - output_prefix = "output" - output_file = output_dir / output_prefix - log_file = output_dir / (output_prefix + ".log") - - return job_dir, output_dir, output_file, log_file + return job_dir, output_dir diff --git a/src/queens/drivers/function.py b/src/queens/drivers/function.py index 43b72862a..a29ac6363 100644 --- a/src/queens/drivers/function.py +++ b/src/queens/drivers/function.py @@ -128,10 +128,6 @@ def _run( num_procs, experiment_dir, experiment_name, - job_dir, - output_dir, - output_file, - log_file, ): """Run the driver. @@ -141,10 +137,6 @@ def _run( num_procs (int): number of processors experiment_dir (Path): Path to QUEENS experiment directory. experiment_name (str): name of QUEENS experiment. - job_dir (Path): Path to job directory. - output_dir (Path): Path to output directory. - output_file (Path): Path to output file(s). - log_file (Path): Path to log file. Returns: Result and potentially the gradient diff --git a/src/queens/drivers/jobscript.py b/src/queens/drivers/jobscript.py index 25e7eb314..604ddf853 100644 --- a/src/queens/drivers/jobscript.py +++ b/src/queens/drivers/jobscript.py @@ -204,10 +204,6 @@ def _run( num_procs, experiment_dir, experiment_name, - job_dir, - output_dir, - output_file, - log_file, ): """Run the driver. @@ -217,15 +213,14 @@ def _run( num_procs (int): Number of processors. experiment_dir (Path): Path to QUEENS experiment directory. experiment_name (str): name of QUEENS experiment. - job_dir (Path): Path to job directory. - output_dir (Path): Path to output directory. - output_file (Path): Path to output file(s). - log_file (Path): Path to log file. Returns: Result and potentially the gradient. """ + # call manage paths again to ensure creation of the directories even if no logging to file + job_dir, output_dir = self._manage_paths(job_id, experiment_dir) input_files = self._manage_input_files(job_dir) + output_file, jobscript_log_file = self._manage_output_files(output_dir) sample_dict = self.parameters.sample_as_dict(sample) @@ -258,7 +253,7 @@ def _run( ) with metadata.time_code("run_jobscript"): - execute_cmd = f"bash {jobscript_file} >{log_file} 2>&1" + execute_cmd = f"bash {jobscript_file} >{jobscript_log_file} 2>&1" self._run_executable(job_id, execute_cmd) with metadata.time_code("data_processing"): @@ -283,6 +278,22 @@ def _manage_input_files(self, job_dir): return input_files + def _manage_output_files(self, output_dir): + """Manage output file paths for driver run. + + Args: + output_dir (Path): Path to output directory. + + Returns: + output_file (Path): Path to output file(s). + log_file (Path): Path to log file. + """ + output_prefix = "output" + output_file = output_dir / output_prefix + log_file = output_dir / (output_prefix + ".log") + + return output_file, log_file + def _run_executable(self, job_id, execute_cmd): """Run executable. From 5657cadc240427804b3b5cd261f2709072b7c46c Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Thu, 6 Nov 2025 16:17:18 +0100 Subject: [PATCH 7/8] feat: turn off worker log files for Function driver on default --- src/queens/drivers/function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/queens/drivers/function.py b/src/queens/drivers/function.py index a29ac6363..e5d498f6e 100644 --- a/src/queens/drivers/function.py +++ b/src/queens/drivers/function.py @@ -42,7 +42,7 @@ def __init__( function, external_python_module_function=None, worker_log_level=logging.INFO, - write_worker_log_files=True, + write_worker_log_files=False, ): """Initialize Function object. From acc4d164217021bef011be6c92ae20e50f6822e0 Mon Sep 17 00:00:00 2001 From: Sebastian Brandstaeter Date: Thu, 6 Nov 2025 18:46:01 +0100 Subject: [PATCH 8/8] fix: duplicate code pylint warning --- src/queens/data_processors/csv_file.py | 1 + src/queens/data_processors/numpy_file.py | 1 + src/queens/data_processors/pvd_file.py | 1 + src/queens/data_processors/txt_file.py | 1 + 4 files changed, 4 insertions(+) diff --git a/src/queens/data_processors/csv_file.py b/src/queens/data_processors/csv_file.py index 900e3e519..73209a639 100644 --- a/src/queens/data_processors/csv_file.py +++ b/src/queens/data_processors/csv_file.py @@ -108,6 +108,7 @@ def __init__( Returns: Instance of CsvFile class """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, diff --git a/src/queens/data_processors/numpy_file.py b/src/queens/data_processors/numpy_file.py index 9acc470bb..366e3afae 100644 --- a/src/queens/data_processors/numpy_file.py +++ b/src/queens/data_processors/numpy_file.py @@ -49,6 +49,7 @@ def __init__( write_worker_log_files (bool): Control writing of worker logs to files (one per job) (default: True) """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, diff --git a/src/queens/data_processors/pvd_file.py b/src/queens/data_processors/pvd_file.py index b01877d53..d0009092c 100644 --- a/src/queens/data_processors/pvd_file.py +++ b/src/queens/data_processors/pvd_file.py @@ -66,6 +66,7 @@ def __init__( write_worker_log_files (bool): Control writing of worker logs to files (one per job) (default: True) """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, diff --git a/src/queens/data_processors/txt_file.py b/src/queens/data_processors/txt_file.py index b17f31516..d64aba306 100644 --- a/src/queens/data_processors/txt_file.py +++ b/src/queens/data_processors/txt_file.py @@ -78,6 +78,7 @@ def __init__( write_worker_log_files (bool): Control writing of worker logs to files (one per job) (default: True) """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict,