diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index 7a40ba48..b9ffda9f 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -66,6 +66,7 @@ class FluxJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. Examples: ``` @@ -106,6 +107,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + enable_debug_mode: bool = False, ): """ The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager @@ -153,6 +155,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -189,6 +192,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + enable_debug_mode=enable_debug_mode, ) ) else: diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 2f75d0c0..a69de075 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -95,6 +95,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + enable_debug_mode: bool = False, ): """ The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -139,6 +140,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + enable_debug_mode (bool): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -171,6 +173,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + enable_debug_mode=enable_debug_mode, ) ) else: diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 97b27c49..6881e188 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -276,6 +276,7 @@ class SlurmJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + enable_debug_mode (bool): Enable debug mode which provides additional information on the execution. Examples: ``` @@ -313,6 +314,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + enable_debug_mode: bool = False, ): """ The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -361,6 +363,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + enable_debug_mode (bool): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -394,6 +397,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + enable_debug_mode=enable_debug_mode, ) ) else: diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index a3a43cb8..660f2176 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -1,4 +1,5 @@ import queue +import warnings from concurrent.futures import Future from threading import Thread from time import sleep @@ -28,6 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase): refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01. plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. Attributes: _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object. @@ -44,6 +46,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + enable_debug_mode: bool = False, ) -> None: super().__init__(max_cores=max_cores) self._process_kwargs = { @@ -51,6 +54,7 @@ def __init__( "executor_queue": executor._future_queue, "executor": executor, "refresh_rate": refresh_rate, + "enable_debug_mode": enable_debug_mode, } self._set_process( Thread( @@ -223,6 +227,7 @@ def _execute_tasks_with_dependencies( executor_queue: queue.Queue, executor: TaskSchedulerBase, refresh_rate: float = 0.01, + enable_debug_mode: bool = False, ): """ Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from @@ -233,6 +238,7 @@ def _execute_tasks_with_dependencies( executor_queue (Queue): Queue for the internal executor. executor (TaskSchedulerBase): Executor to execute the tasks with after the dependencies are resolved. refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. """ wait_lst = [] while True: @@ -285,7 +291,9 @@ def _execute_tasks_with_dependencies( number_waiting = len(wait_lst) # Check functions in the wait list and execute them if all future objects are now ready wait_lst = _update_waiting_task( - wait_lst=wait_lst, executor_queue=executor_queue + wait_lst=wait_lst, + executor_queue=executor_queue, + enable_debug_mode=enable_debug_mode, ) # if no job is ready, sleep for a moment if len(wait_lst) == number_waiting: @@ -295,13 +303,16 @@ def _execute_tasks_with_dependencies( sleep(refresh_rate) -def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list: +def _update_waiting_task( + wait_lst: list[dict], executor_queue: queue.Queue, enable_debug_mode: bool = False +) -> list: """ Submit the waiting tasks, which future inputs have been completed, to the executor Args: wait_lst (list): List of waiting tasks executor_queue (Queue): Queue of the internal executor + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. Returns: list: list tasks which future inputs have not been completed @@ -333,4 +344,9 @@ def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l task_wait_dict["future"].set_result(done_lst) else: wait_tmp_lst.append(task_wait_dict) + if enable_debug_mode: + warnings.warn( + f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}", + stacklevel=2, + ) return wait_tmp_lst