Skip to content
47 changes: 34 additions & 13 deletions src/queens/data_processors/_data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import logging
from pathlib import Path

from queens.utils.logger_settings import setup_logger_on_worker

_logger = logging.getLogger(__name__)


Expand All @@ -36,13 +38,18 @@ class DataProcessor(metaclass=abc.ABCMeta):
The file prefix can contain BASIC regex expression
and subdirectories. Examples are wildcards `*` or
expressions like `[ab]`.
worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
logger_on_worker (logging.Logger): Logger instance used on the dask worker
"""

def __init__(
self,
file_name_identifier=None,
file_options_dict=None,
files_to_be_deleted_regex_lst=None,
worker_log_level=logging.INFO,
write_worker_log_files=True,
):
"""Init data processor class.

Expand All @@ -55,6 +62,9 @@ def __init__(
implement valid options for this dictionary.
files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted.
The paths can contain regex expressions.
worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
(default: True)
"""
if not file_name_identifier:
raise ValueError(
Expand Down Expand Up @@ -90,46 +100,57 @@ def __init__(
self.file_options_dict = file_options_dict
self.file_name_identifier = file_name_identifier

def get_data_from_file(self, base_dir_file):
self.worker_log_level = worker_log_level
self.write_worker_log_files = write_worker_log_files
self.logger_on_worker = None

def get_data_from_file(self, base_dir):
"""Get data of interest from file.

Args:
base_dir_file (Path): Path of the base directory that contains the file of interest
base_dir (Path): Path of the base directory that contains the file of interest

Returns:
processed_data (np.array): Final data from data processor module
"""
if not base_dir_file:
if not base_dir:
raise ValueError(
"The data processor requires a base_directory for the "
"files to operate on! Your input was empty! Abort..."
)
if not isinstance(base_dir_file, Path):
if not isinstance(base_dir, Path):
raise TypeError(
"The argument 'base_dir_file' must be of type 'Path' "
f"but is of type {type(base_dir_file)}. Abort..."
"The argument 'base_dir' must be of type 'Path' "
f"but is of type {type(base_dir)}. Abort..."
)

file_path = self._check_file_exist_and_is_unique(base_dir_file)
worker_log_dir = base_dir if self.write_worker_log_files else None
self.logger_on_worker = setup_logger_on_worker(
name=type(self).__name__, log_dir=worker_log_dir, level=self.worker_log_level
)

file_path = self._check_file_exist_and_is_unique(base_dir)
self.logger_on_worker.info("\nProcessing the following files:")
self.logger_on_worker.info(str(file_path))
processed_data = None
if file_path:
raw_data = self.get_raw_data_from_file(file_path)
filtered_data = self.filter_and_manipulate_raw_data(raw_data)
processed_data = self._subsequent_data_manipulation(filtered_data)

self._clean_up(base_dir_file)
self._clean_up(base_dir)
return processed_data

def _check_file_exist_and_is_unique(self, base_dir_file):
def _check_file_exist_and_is_unique(self, base_dir):
"""Check if file exists.

Args:
base_dir_file (Path): Path to base directory that contains file of interest
base_dir (Path): Path to base directory that contains file of interest

Returns:
file_path (str): Actual path to the file of interest.
"""
file_list = list(base_dir_file.glob(self.file_name_identifier))
file_list = list(base_dir.glob(self.file_name_identifier))

if len(file_list) > 1:
raise RuntimeError(
Expand All @@ -141,8 +162,8 @@ def _check_file_exist_and_is_unique(self, base_dir_file):
if len(file_list) == 1:
file_path = file_list[0]
else:
_logger.warning(
"The file '%s' does not exist!", base_dir_file / self.file_name_identifier
self.logger_on_worker.warning(
"The file '%s' does not exist!", base_dir / self.file_name_identifier
)
file_path = None

Expand Down
9 changes: 9 additions & 0 deletions src/queens/data_processors/csv_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def __init__(
file_name_identifier=None,
file_options_dict=None,
files_to_be_deleted_regex_lst=None,
worker_log_level=logging.INFO,
write_worker_log_files=True,
):
"""Instantiate data processor class for csv data.

Expand Down Expand Up @@ -99,13 +101,20 @@ def __init__(
files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted.
The paths can contain regex expressions.

worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
(default: True)

Returns:
Instance of CsvFile class
"""
# pylint: disable=duplicate-code
super().__init__(
file_name_identifier=file_name_identifier,
file_options_dict=file_options_dict,
files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst,
worker_log_level=worker_log_level,
write_worker_log_files=write_worker_log_files,
)

header_row = file_options_dict.get("header_row")
Expand Down
8 changes: 8 additions & 0 deletions src/queens/data_processors/numpy_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def __init__(
file_name_identifier=None,
file_options_dict=None,
files_to_be_deleted_regex_lst=None,
worker_log_level=logging.INFO,
write_worker_log_files=True,
):
"""Instantiate data processor class for numpy binary data.

Expand All @@ -43,11 +45,17 @@ def __init__(
file_options_dict (dict): Dictionary with read-in options for the file
files_to_be_deleted_regex_lst (lst): List with paths to files that should be deleted.
The paths can contain regex expressions.
worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
(default: True)
"""
# pylint: disable=duplicate-code
super().__init__(
file_name_identifier=file_name_identifier,
file_options_dict=file_options_dict,
files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst,
worker_log_level=worker_log_level,
write_worker_log_files=write_worker_log_files,
)

def get_raw_data_from_file(self, file_path):
Expand Down
8 changes: 8 additions & 0 deletions src/queens/data_processors/pvd_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(
time_steps=None,
block=0,
point_data=True,
worker_log_level=logging.INFO,
write_worker_log_files=True,
):
"""Instantiate data processor class for pvd data extraction.

Expand All @@ -60,11 +62,17 @@ def __init__(
block (int, optional): Considered block of MultiBlock data set (first block by default)
point_data (bool, optional): Whether to extract point data (True) or cell data (False).
Defaults to point data.
worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
(default: True)
"""
# pylint: disable=duplicate-code
super().__init__(
file_name_identifier=file_name_identifier,
file_options_dict=file_options_dict,
files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst,
worker_log_level=worker_log_level,
write_worker_log_files=write_worker_log_files,
)
self.field_name = field_name
if time_steps is None:
Expand Down
8 changes: 8 additions & 0 deletions src/queens/data_processors/txt_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(
logger_prefix=r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} - "
r"queens\.drivers\.driver_\d* - INFO -",
max_file_size_in_mega_byte=200,
worker_log_level=logging.INFO,
write_worker_log_files=True,
):
"""Instantiate data processor class for txt data.

Expand All @@ -72,11 +74,17 @@ def __init__(
max_file_size_in_mega_byte (int): Upper limit of the file size to be read into
memory in megabyte (MB). See comment above on
Potential Improvement.
worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
(default: True)
"""
# pylint: disable=duplicate-code
super().__init__(
file_name_identifier=file_name_identifier,
file_options_dict=file_options_dict,
files_to_be_deleted_regex_lst=files_to_be_deleted_regex_lst,
worker_log_level=worker_log_level,
write_worker_log_files=write_worker_log_files,
)
self.remove_logger_prefix_from_raw_data = remove_logger_prefix_from_raw_data
self.logger_prefix = logger_prefix
Expand Down
84 changes: 81 additions & 3 deletions src/queens/drivers/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import abc
import logging
from pathlib import Path
from typing import final

from queens.utils.config_directories import create_directory, job_directory
from queens.utils.logger_settings import setup_logger_on_worker

_logger = logging.getLogger(__name__)

Expand All @@ -27,14 +31,26 @@ class Driver(metaclass=abc.ABCMeta):
Attributes:
parameters (Parameters): Parameters object
files_to_copy (list): files or directories to copy to experiment_dir
worker_log_level (int | str): Logging level used on the worker
write_worker_log_files (bool): Switch on/off writing of worker logs to files (one per job)
logger_on_worker (logging.Logger): Logger instance used on the worker
"""

def __init__(self, parameters, files_to_copy=None):
def __init__(
self,
parameters,
files_to_copy=None,
worker_log_level=logging.INFO,
write_worker_log_files=True,
):
"""Initialize Driver object.

Args:
parameters (Parameters): Parameters object
files_to_copy (list): files or directories to copy to experiment_dir
worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
(default: True)
"""
self.parameters = parameters
if files_to_copy is None:
Expand All @@ -46,9 +62,13 @@ def __init__(self, parameters, files_to_copy=None):
raise TypeError("files_to_copy must be a list of strings or Path objects")
self.files_to_copy = files_to_copy

@abc.abstractmethod
self.worker_log_level = worker_log_level
self.write_worker_log_files = write_worker_log_files
self.logger_on_worker = None

@final
def run(self, sample, job_id, num_procs, experiment_dir, experiment_name):
"""Abstract method for driver run.
"""Run driver.

Args:
sample (dict): Dict containing sample
Expand All @@ -60,3 +80,61 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name):
Returns:
Result and potentially the gradient
"""
_, worker_log_dir = (
self._manage_paths(job_id, experiment_dir)
if self.write_worker_log_files
else (None, None)
)
self.logger_on_worker = setup_logger_on_worker(
name=type(self).__name__, log_dir=worker_log_dir, level=self.worker_log_level
)

return self._run(
sample,
job_id,
num_procs,
experiment_dir,
experiment_name,
)

@abc.abstractmethod
def _run(
self,
sample,
job_id,
num_procs,
experiment_dir,
experiment_name,
):
"""Abstract method for driver run.

Args:
sample (dict): Dict containing sample
job_id (int): Job ID
num_procs (int): number of processors
experiment_dir (Path): Path to QUEENS experiment directory.
experiment_name (str): name of QUEENS experiment.

Returns:
Result and potentially the gradient
"""

@final
def _manage_paths(self, job_id, experiment_dir):
"""Manage paths for driver run.

Args:
job_id (int): Job ID.
experiment_dir (Path): Path to QUEENS experiment directory.

Returns:
job_dir (Path): Path to job directory.
output_dir (Path): Path to output directory.
output_file (Path): Path to output file(s).
log_file (Path): Path to log file.
"""
job_dir = job_directory(experiment_dir, job_id)
output_dir = job_dir / "output"
create_directory(output_dir)

return job_dir, output_dir
22 changes: 19 additions & 3 deletions src/queens/drivers/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,24 @@ def __init__(
parameters,
function,
external_python_module_function=None,
worker_log_level=logging.INFO,
write_worker_log_files=False,
):
"""Initialize Function object.

Args:
parameters (Parameters): Parameters object
function (callable, str): Function or name of example function provided by QUEENS
external_python_module_function (Path | str): Path to external module with function
worker_log_level (int | str): Logging level used on the worker (default: "INFO")
write_worker_log_files (bool): Control writing of worker logs to files (one per job)
(default: True)
"""
super().__init__(parameters=parameters)
super().__init__(
parameters=parameters,
worker_log_level=worker_log_level,
write_worker_log_files=write_worker_log_files,
)
if external_python_module_function is None:
if isinstance(function, str):
# Try to load existing simulator functions
Expand Down Expand Up @@ -112,15 +121,22 @@ def reshaped_output_function(sample_dict):

return reshaped_output_function

def run(self, sample, job_id, num_procs, experiment_dir, experiment_name):
def _run(
self,
sample,
job_id,
num_procs,
experiment_dir,
experiment_name,
):
"""Run the driver.

Args:
sample (dict): Dict containing sample
job_id (int): Job ID
num_procs (int): number of processors
experiment_name (str): name of QUEENS experiment.
experiment_dir (Path): Path to QUEENS experiment directory.
experiment_name (str): name of QUEENS experiment.

Returns:
Result and potentially the gradient
Expand Down
Loading
Loading