diff --git a/src/queens/data_processors/_data_processor.py b/src/queens/data_processors/_data_processor.py index 78fb1dd28..c6ef7f508 100644 --- a/src/queens/data_processors/_data_processor.py +++ b/src/queens/data_processors/_data_processor.py @@ -18,6 +18,8 @@ import logging from pathlib import Path +from queens.utils.logger_settings import setup_logger_on_worker + _logger = logging.getLogger(__name__) @@ -36,6 +38,9 @@ class DataProcessor(metaclass=abc.ABCMeta): The file prefix can contain BASIC regex expression and subdirectories. Examples are wildcards `*` or expressions like `[ab]`. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + logger_on_worker (logging.Logger): Logger instance used on the dask worker """ def __init__( @@ -43,6 +48,8 @@ def __init__( file_name_identifier=None, file_options_dict=None, files_to_be_deleted_regex_lst=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Init data processor class. @@ -55,6 +62,9 @@ def __init__( implement valid options for this dictionary. files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted. The paths can contain regex expressions. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ if not file_name_identifier: raise ValueError( @@ -90,46 +100,57 @@ def __init__( self.file_options_dict = file_options_dict self.file_name_identifier = file_name_identifier - def get_data_from_file(self, base_dir_file): + self.worker_log_level = worker_log_level + self.write_worker_log_files = write_worker_log_files + self.logger_on_worker = None + + def get_data_from_file(self, base_dir): """Get data of interest from file. Args: - base_dir_file (Path): Path of the base directory that contains the file of interest + base_dir (Path): Path of the base directory that contains the file of interest Returns: processed_data (np.array): Final data from data processor module """ - if not base_dir_file: + if not base_dir: raise ValueError( "The data processor requires a base_directory for the " "files to operate on! Your input was empty! Abort..." ) - if not isinstance(base_dir_file, Path): + if not isinstance(base_dir, Path): raise TypeError( - "The argument 'base_dir_file' must be of type 'Path' " - f"but is of type {type(base_dir_file)}. Abort..." + "The argument 'base_dir' must be of type 'Path' " + f"but is of type {type(base_dir)}. Abort..." ) - file_path = self._check_file_exist_and_is_unique(base_dir_file) + worker_log_dir = base_dir if self.write_worker_log_files else None + self.logger_on_worker = setup_logger_on_worker( + name=type(self).__name__, log_dir=worker_log_dir, level=self.worker_log_level + ) + + file_path = self._check_file_exist_and_is_unique(base_dir) + self.logger_on_worker.info("\nProcessing the following files:") + self.logger_on_worker.info(str(file_path)) processed_data = None if file_path: raw_data = self.get_raw_data_from_file(file_path) filtered_data = self.filter_and_manipulate_raw_data(raw_data) processed_data = self._subsequent_data_manipulation(filtered_data) - self._clean_up(base_dir_file) + self._clean_up(base_dir) return processed_data - def _check_file_exist_and_is_unique(self, base_dir_file): + def _check_file_exist_and_is_unique(self, base_dir): """Check if file exists. Args: - base_dir_file (Path): Path to base directory that contains file of interest + base_dir (Path): Path to base directory that contains file of interest Returns: file_path (str): Actual path to the file of interest. """ - file_list = list(base_dir_file.glob(self.file_name_identifier)) + file_list = list(base_dir.glob(self.file_name_identifier)) if len(file_list) > 1: raise RuntimeError( @@ -141,8 +162,8 @@ def _check_file_exist_and_is_unique(self, base_dir_file): if len(file_list) == 1: file_path = file_list[0] else: - _logger.warning( - "The file '%s' does not exist!", base_dir_file / self.file_name_identifier + self.logger_on_worker.warning( + "The file '%s' does not exist!", base_dir / self.file_name_identifier ) file_path = None diff --git a/src/queens/data_processors/csv_file.py b/src/queens/data_processors/csv_file.py index 454a81652..73209a639 100644 --- a/src/queens/data_processors/csv_file.py +++ b/src/queens/data_processors/csv_file.py @@ -67,6 +67,8 @@ def __init__( file_name_identifier=None, file_options_dict=None, files_to_be_deleted_regex_lst=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for csv data. @@ -99,13 +101,20 @@ def __init__( files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted. The paths can contain regex expressions. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) + Returns: Instance of CsvFile class """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) header_row = file_options_dict.get("header_row") diff --git a/src/queens/data_processors/numpy_file.py b/src/queens/data_processors/numpy_file.py index c9ecafdd3..366e3afae 100644 --- a/src/queens/data_processors/numpy_file.py +++ b/src/queens/data_processors/numpy_file.py @@ -33,6 +33,8 @@ def __init__( file_name_identifier=None, file_options_dict=None, files_to_be_deleted_regex_lst=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for numpy binary data. @@ -43,11 +45,17 @@ def __init__( file_options_dict (dict): Dictionary with read-in options for the file files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted. The paths can contain regex expressions. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) def get_raw_data_from_file(self, file_path): diff --git a/src/queens/data_processors/pvd_file.py b/src/queens/data_processors/pvd_file.py index b48009b94..d0009092c 100644 --- a/src/queens/data_processors/pvd_file.py +++ b/src/queens/data_processors/pvd_file.py @@ -45,6 +45,8 @@ def __init__( time_steps=None, block=0, point_data=True, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for pvd data extraction. @@ -60,11 +62,17 @@ def __init__( block (int, optional): Considered block of MultiBlock data set (first block by default) point_data (bool, optional): Whether to extract point data (True) or cell data (False). Defaults to point data. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) self.field_name = field_name if time_steps is None: diff --git a/src/queens/data_processors/txt_file.py b/src/queens/data_processors/txt_file.py index 2d66f2212..d64aba306 100644 --- a/src/queens/data_processors/txt_file.py +++ b/src/queens/data_processors/txt_file.py @@ -54,6 +54,8 @@ def __init__( logger_prefix=r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} - " r"queens\.drivers\.driver_\d* - INFO -", max_file_size_in_mega_byte=200, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Instantiate data processor class for txt data. @@ -72,11 +74,17 @@ def __init__( max_file_size_in_mega_byte (int): Upper limit of the file size to be read into memory in megabyte (MB). See comment above on Potential Improvement. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ + # pylint: disable=duplicate-code super().__init__( file_name_identifier=file_name_identifier, file_options_dict=file_options_dict, files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) self.remove_logger_prefix_from_raw_data = remove_logger_prefix_from_raw_data self.logger_prefix = logger_prefix diff --git a/src/queens/drivers/_driver.py b/src/queens/drivers/_driver.py index 1d0829302..98473d4f1 100644 --- a/src/queens/drivers/_driver.py +++ b/src/queens/drivers/_driver.py @@ -17,6 +17,10 @@ import abc import logging from pathlib import Path +from typing import final + +from queens.utils.config_directories import create_directory, job_directory +from queens.utils.logger_settings import setup_logger_on_worker _logger = logging.getLogger(__name__) @@ -27,14 +31,26 @@ class Driver(metaclass=abc.ABCMeta): Attributes: parameters (Parameters): Parameters object files_to_copy (list): files or directories to copy to experiment_dir + worker_log_level (int | str): Logging level used on the worker + write_worker_log_files (bool): Switch on/off writing of worker logs to files (one per job) + logger_on_worker (logging.Logger): Logger instance used on the worker """ - def __init__(self, parameters, files_to_copy=None): + def __init__( + self, + parameters, + files_to_copy=None, + worker_log_level=logging.INFO, + write_worker_log_files=True, + ): """Initialize Driver object. Args: parameters (Parameters): Parameters object files_to_copy (list): files or directories to copy to experiment_dir + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ self.parameters = parameters if files_to_copy is None: @@ -46,9 +62,13 @@ def __init__(self, parameters, files_to_copy=None): raise TypeError("files_to_copy must be a list of strings or Path objects") self.files_to_copy = files_to_copy - @abc.abstractmethod + self.worker_log_level = worker_log_level + self.write_worker_log_files = write_worker_log_files + self.logger_on_worker = None + + @final def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): - """Abstract method for driver run. + """Run driver. Args: sample (dict): Dict containing sample @@ -60,3 +80,61 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): Returns: Result and potentially the gradient """ + _, worker_log_dir = ( + self._manage_paths(job_id, experiment_dir) + if self.write_worker_log_files + else (None, None) + ) + self.logger_on_worker = setup_logger_on_worker( + name=type(self).__name__, log_dir=worker_log_dir, level=self.worker_log_level + ) + + return self._run( + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + ) + + @abc.abstractmethod + def _run( + self, + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + ): + """Abstract method for driver run. + + Args: + sample (dict): Dict containing sample + job_id (int): Job ID + num_procs (int): number of processors + experiment_dir (Path): Path to QUEENS experiment directory. + experiment_name (str): name of QUEENS experiment. + + Returns: + Result and potentially the gradient + """ + + @final + def _manage_paths(self, job_id, experiment_dir): + """Manage paths for driver run. + + Args: + job_id (int): Job ID. + experiment_dir (Path): Path to QUEENS experiment directory. + + Returns: + job_dir (Path): Path to job directory. + output_dir (Path): Path to output directory. + output_file (Path): Path to output file(s). + log_file (Path): Path to log file. + """ + job_dir = job_directory(experiment_dir, job_id) + output_dir = job_dir / "output" + create_directory(output_dir) + + return job_dir, output_dir diff --git a/src/queens/drivers/function.py b/src/queens/drivers/function.py index bd77e240e..e5d498f6e 100644 --- a/src/queens/drivers/function.py +++ b/src/queens/drivers/function.py @@ -41,6 +41,8 @@ def __init__( parameters, function, external_python_module_function=None, + worker_log_level=logging.INFO, + write_worker_log_files=False, ): """Initialize Function object. @@ -48,8 +50,15 @@ def __init__( parameters (Parameters): Parameters object function (callable, str): Function or name of example function provided by QUEENS external_python_module_function (Path | str): Path to external module with function + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ - super().__init__(parameters=parameters) + super().__init__( + parameters=parameters, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, + ) if external_python_module_function is None: if isinstance(function, str): # Try to load existing simulator functions @@ -112,15 +121,22 @@ def reshaped_output_function(sample_dict): return reshaped_output_function - def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + def _run( + self, + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + ): """Run the driver. Args: sample (dict): Dict containing sample job_id (int): Job ID num_procs (int): number of processors - experiment_name (str): name of QUEENS experiment. experiment_dir (Path): Path to QUEENS experiment directory. + experiment_name (str): name of QUEENS experiment. Returns: Result and potentially the gradient diff --git a/src/queens/drivers/jobscript.py b/src/queens/drivers/jobscript.py index 7dd40fa01..604ddf853 100644 --- a/src/queens/drivers/jobscript.py +++ b/src/queens/drivers/jobscript.py @@ -97,6 +97,8 @@ def __init__( jobscript_file_name="jobscript.sh", extra_options=None, raise_error_on_jobscript_failure=True, + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Initialize Jobscript object. @@ -113,8 +115,16 @@ def __init__( extra_options (dict, opt): Extra options to inject into jobscript template. raise_error_on_jobscript_failure (bool, opt): Whether to raise an error for a non-zero jobscript exit code. + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ - super().__init__(parameters=parameters, files_to_copy=files_to_copy) + super().__init__( + parameters=parameters, + files_to_copy=files_to_copy, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, + ) self.input_templates = self.create_input_templates_dict(input_templates) self.jobscript_template = self.get_read_in_jobscript_template(jobscript_template) self.files_to_copy.extend(self.input_templates.values()) @@ -187,7 +197,14 @@ def get_read_in_jobscript_template(jobscript_template): return jobscript_template - def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): + def _run( + self, + sample, + job_id, + num_procs, + experiment_dir, + experiment_name, + ): """Run the driver. Args: @@ -195,14 +212,15 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): job_id (int): Job ID. num_procs (int): Number of processors. experiment_dir (Path): Path to QUEENS experiment directory. - experiment_name (str): Name of QUEENS experiment. + experiment_name (str): name of QUEENS experiment. Returns: Result and potentially the gradient. """ - job_dir, output_dir, output_file, input_files, log_file = self._manage_paths( - job_id, experiment_dir - ) + # call manage paths again to ensure creation of the directories even if no logging to file + job_dir, output_dir = self._manage_paths(job_id, experiment_dir) + input_files = self._manage_input_files(job_dir) + output_file, jobscript_log_file = self._manage_output_files(output_dir) sample_dict = self.parameters.sample_as_dict(sample) @@ -235,7 +253,7 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): ) with metadata.time_code("run_jobscript"): - execute_cmd = f"bash {jobscript_file} >{log_file} 2>&1" + execute_cmd = f"bash {jobscript_file} >{jobscript_log_file} 2>&1" self._run_executable(job_id, execute_cmd) with metadata.time_code("data_processing"): @@ -244,34 +262,37 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name): return results - def _manage_paths(self, job_id, experiment_dir): + def _manage_input_files(self, job_dir): """Manage paths for driver run. Args: - job_id (int): Job ID. - experiment_dir (Path): Path to QUEENS experiment directory. + job_dir (Path): Path to job directory. Returns: - job_dir (Path): Path to job directory. + input_files (dict): Dict with name and path of the input file(s). + """ + input_files = {} + for input_template_name, input_template_path in self.input_templates.items(): + input_file_str = input_template_name + "".join(input_template_path.suffixes) + input_files[input_template_name] = job_dir / input_file_str + + return input_files + + def _manage_output_files(self, output_dir): + """Manage output file paths for driver run. + + Args: output_dir (Path): Path to output directory. + + Returns: output_file (Path): Path to output file(s). - input_files (dict): Dict with name and path of the input file(s). log_file (Path): Path to log file. """ - job_dir = experiment_dir / str(job_id) - output_dir = job_dir / "output" - output_dir.mkdir(parents=True, exist_ok=True) - output_prefix = "output" output_file = output_dir / output_prefix log_file = output_dir / (output_prefix + ".log") - input_files = {} - for input_template_name, input_template_path in self.input_templates.items(): - input_file_str = input_template_name + "".join(input_template_path.suffixes) - input_files[input_template_name] = job_dir / input_file_str - - return job_dir, output_dir, output_file, input_files, log_file + return output_file, log_file def _run_executable(self, job_id, execute_cmd): """Run executable. @@ -306,12 +327,12 @@ def _get_results(self, output_dir): result = None if self.data_processor: result = self.data_processor.get_data_from_file(output_dir) - _logger.debug("Got result: %s", result) + self.logger_on_worker.info("Got result: %s", result) gradient = None if self.gradient_data_processor: gradient = self.gradient_data_processor.get_data_from_file(output_dir) - _logger.debug("Got gradient: %s", gradient) + self.logger_on_worker.info("Got gradient: %s", gradient) return result, gradient def prepare_input_files(self, sample_dict, experiment_dir, input_files): diff --git a/src/queens/drivers/mpi.py b/src/queens/drivers/mpi.py index b3af292c2..1fa973874 100644 --- a/src/queens/drivers/mpi.py +++ b/src/queens/drivers/mpi.py @@ -14,6 +14,8 @@ # """Convenience wrapper around Jobscript Driver.""" +import logging + from queens.drivers.jobscript import Jobscript from queens.utils.logger_settings import log_init_args @@ -35,6 +37,8 @@ def __init__( data_processor=None, gradient_data_processor=None, mpi_cmd="/usr/bin/mpirun --bind-to none", + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Initialize MPI object. @@ -46,6 +50,9 @@ def __init__( data_processor (obj, opt): instance of data processor class gradient_data_processor (obj, opt): instance of data processor class for gradient data mpi_cmd (str, opt): mpi command + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ # pylint: disable=duplicate-code extra_options = { @@ -60,4 +67,6 @@ def __init__( data_processor=data_processor, gradient_data_processor=gradient_data_processor, extra_options=extra_options, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, ) diff --git a/src/queens/models/adjoint.py b/src/queens/models/adjoint.py index fb1c4e765..08a7299c4 100644 --- a/src/queens/models/adjoint.py +++ b/src/queens/models/adjoint.py @@ -17,7 +17,7 @@ import logging from queens.models.simulation import Simulation -from queens.utils.config_directories import current_job_directory +from queens.utils.config_directories import job_directory from queens.utils.io import write_to_csv from queens.utils.logger_settings import log_init_args @@ -77,7 +77,7 @@ def grad(self, samples, upstream_gradient): # write adjoint data for each sample to adjoint files in old job directories for job_id, grad_objective in zip(last_job_ids, upstream_gradient): - job_dir = current_job_directory(experiment_dir, job_id) + job_dir = job_directory(experiment_dir, job_id) adjoint_file_path = job_dir.joinpath(self.adjoint_file) write_to_csv(adjoint_file_path, grad_objective.reshape(1, -1)) diff --git a/src/queens/utils/config_directories.py b/src/queens/utils/config_directories.py index f21524404..733ec140a 100644 --- a/src/queens/utils/config_directories.py +++ b/src/queens/utils/config_directories.py @@ -83,8 +83,8 @@ def create_directory(dir_path: str | Path) -> None: create_folder_if_not_existent(dir_path) -def current_job_directory(experiment_dir: Path, job_id: int) -> Path: - """Directory of the latest submitted job. +def job_directory(experiment_dir: Path, job_id: int) -> Path: + """Directory of a specific job. Args: experiment_dir: Experiment directory @@ -108,9 +108,9 @@ def job_dirs_in_experiment_dir(experiment_dir: Path | str) -> list[Path]: """ experiment_dir = Path(experiment_dir) job_directories = [] - for job_directory in experiment_dir.iterdir(): - if job_directory.is_dir() and job_directory.name.isdigit(): - job_directories.append(job_directory) + for job_dir in experiment_dir.iterdir(): + if job_dir.is_dir() and job_dir.name.isdigit(): + job_directories.append(job_dir) # Sort the jobs directories return sorted(job_directories, key=lambda x: int(x.name)) diff --git a/src/queens/utils/logger_settings.py b/src/queens/utils/logger_settings.py index ed8c4513e..18e6e2d42 100644 --- a/src/queens/utils/logger_settings.py +++ b/src/queens/utils/logger_settings.py @@ -279,3 +279,46 @@ def key_fun(pair: tuple[str, Any]) -> int: method(*args, **kwargs) return wrapper + + +def setup_logger_on_worker( + name: str = "worker", log_dir: None | Path = None, level: int = logging.INFO +) -> logging.Logger: + """Setup a logger on a scheduler's worker. + + Args: + name: Name of the logger. + log_dir: Path to the directory for the log file. + level: Logging level. + + Returns: + logger: Logger instance. + """ + logger = logging.getLogger(name) + + formatter = NewLineFormatter( + "%(asctime)s %(name)-12s %(levelname)-8s %(message)s", datefmt="%m-%d %H:%M" + ) + + logger.setLevel(level) + # if the worker is set up for the first time + if not logger.hasHandlers(): + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + # Remove all FileHandlers, keep others (like StreamHandler) + # it should always have at least the StreamHandler from above but for safety + if logger.hasHandlers(): + for handler in logger.handlers[:]: + if isinstance(handler, logging.FileHandler): + logger.removeHandler(handler) + handler.close() # optional: closes the file descriptor + + if log_dir is not None: + log_file = log_dir / "worker.log" + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger diff --git a/src/queens_interfaces/fourc/driver.py b/src/queens_interfaces/fourc/driver.py index 4e81477b2..4b8ef88b5 100644 --- a/src/queens_interfaces/fourc/driver.py +++ b/src/queens_interfaces/fourc/driver.py @@ -14,6 +14,8 @@ # """Driver to run 4C.""" +import logging + from queens.drivers.jobscript import Jobscript from queens.utils.logger_settings import log_init_args @@ -41,6 +43,8 @@ def __init__( post_processor="", post_options="", mpi_cmd="/usr/bin/mpirun --bind-to none", + worker_log_level=logging.INFO, + write_worker_log_files=True, ): """Initialize Fourc object. @@ -54,6 +58,9 @@ def __init__( post_processor (path, opt): path to post_processor post_options (str, opt): options for post-processing mpi_cmd (str, opt): mpi command + worker_log_level (int | str): Logging level used on the worker (default: "INFO") + write_worker_log_files (bool): Control writing of worker logs to files (one per job) + (default: True) """ # pylint: disable=duplicate-code extra_options = { @@ -70,4 +77,6 @@ def __init__( data_processor=data_processor, gradient_data_processor=gradient_data_processor, extra_options=extra_options, + worker_log_level=worker_log_level, + write_worker_log_files=write_worker_log_files, )