From b46ede68ea8d1dd999f5bc5429617ab8a2045efb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 17 Sep 2025 20:13:36 +0200 Subject: [PATCH 1/4] Add debug mode to list waiting jobs --- executorlib/executor/flux.py | 4 ++++ executorlib/executor/single.py | 3 +++ executorlib/executor/slurm.py | 4 ++++ .../task_scheduler/interactive/dependency.py | 13 +++++++++++-- 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index 7a40ba48..82c6cb98 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -66,6 +66,7 @@ class FluxJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + debug (bool, optional): Enable debug mode which provides additional information on the execution. Examples: ``` @@ -106,6 +107,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + debug: bool = False, ): """ The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager @@ -153,6 +155,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + debug (bool, optional): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -189,6 +192,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + debug=debug, ) ) else: diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 2f75d0c0..9f387df4 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -95,6 +95,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + debug: bool = False, ): """ The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -139,6 +140,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + debug (bool): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -171,6 +173,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + debug=debug, ) ) else: diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 97b27c49..a06cac24 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -276,6 +276,7 @@ class SlurmJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + debug (bool): Enable debug mode which provides additional information on the execution. Examples: ``` @@ -313,6 +314,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + debug: bool = False, ): """ The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -361,6 +363,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + debug (bool): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -394,6 +397,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + debug=debug, ) ) else: diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index a3a43cb8..ec3d5676 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -1,4 +1,5 @@ import queue +import warnings from concurrent.futures import Future from threading import Thread from time import sleep @@ -28,6 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase): refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01. plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + debug (bool, optional): Enable debug mode which provides additional information on the execution. Attributes: _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object. @@ -44,6 +46,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + debug: bool = False, ) -> None: super().__init__(max_cores=max_cores) self._process_kwargs = { @@ -51,6 +54,7 @@ def __init__( "executor_queue": executor._future_queue, "executor": executor, "refresh_rate": refresh_rate, + "debug": debug, } self._set_process( Thread( @@ -223,6 +227,7 @@ def _execute_tasks_with_dependencies( executor_queue: queue.Queue, executor: TaskSchedulerBase, refresh_rate: float = 0.01, + debug: bool = False, ): """ Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from @@ -233,6 +238,7 @@ def _execute_tasks_with_dependencies( executor_queue (Queue): Queue for the internal executor. executor (TaskSchedulerBase): Executor to execute the tasks with after the dependencies are resolved. refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + debug (bool, optional): Enable debug mode which provides additional information on the execution. """ wait_lst = [] while True: @@ -285,7 +291,7 @@ def _execute_tasks_with_dependencies( number_waiting = len(wait_lst) # Check functions in the wait list and execute them if all future objects are now ready wait_lst = _update_waiting_task( - wait_lst=wait_lst, executor_queue=executor_queue + wait_lst=wait_lst, executor_queue=executor_queue, debug=debug, ) # if no job is ready, sleep for a moment if len(wait_lst) == number_waiting: @@ -295,13 +301,14 @@ def _execute_tasks_with_dependencies( sleep(refresh_rate) -def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list: +def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue, debug: bool = False) -> list: """ Submit the waiting tasks, which future inputs have been completed, to the executor Args: wait_lst (list): List of waiting tasks executor_queue (Queue): Queue of the internal executor + debug (bool, optional): Enable debug mode which provides additional information on the execution. Returns: list: list tasks which future inputs have not been completed @@ -333,4 +340,6 @@ def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l task_wait_dict["future"].set_result(done_lst) else: wait_tmp_lst.append(task_wait_dict) + if debug: + warnings.warn(f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}") return wait_tmp_lst From 89dba399d9e62cc4cb58dbdb1ba64eca28ce2fca Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 17 Sep 2025 18:15:08 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/dependency.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index ec3d5676..bffbf270 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -291,7 +291,9 @@ def _execute_tasks_with_dependencies( number_waiting = len(wait_lst) # Check functions in the wait list and execute them if all future objects are now ready wait_lst = _update_waiting_task( - wait_lst=wait_lst, executor_queue=executor_queue, debug=debug, + wait_lst=wait_lst, + executor_queue=executor_queue, + debug=debug, ) # if no job is ready, sleep for a moment if len(wait_lst) == number_waiting: @@ -301,7 +303,9 @@ def _execute_tasks_with_dependencies( sleep(refresh_rate) -def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue, debug: bool = False) -> list: +def _update_waiting_task( + wait_lst: list[dict], executor_queue: queue.Queue, debug: bool = False +) -> list: """ Submit the waiting tasks, which future inputs have been completed, to the executor @@ -341,5 +345,7 @@ def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue, debu else: wait_tmp_lst.append(task_wait_dict) if debug: - warnings.warn(f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}") + warnings.warn( + f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}" + ) return wait_tmp_lst From 2d45b3882b7edb7f4e7b39b3382e8b2554be30f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 17 Sep 2025 20:20:09 +0200 Subject: [PATCH 3/4] fix ruff --- executorlib/task_scheduler/interactive/dependency.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index bffbf270..e995aef3 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -346,6 +346,7 @@ def _update_waiting_task( wait_tmp_lst.append(task_wait_dict) if debug: warnings.warn( - f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}" + f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}", + stacklevel=2, ) return wait_tmp_lst From 5690e43e5158a91d3ee84eb13443d06f9121b048 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 17 Sep 2025 20:39:27 +0200 Subject: [PATCH 4/4] rename debug to enable_debug_mode --- executorlib/executor/flux.py | 8 ++++---- executorlib/executor/single.py | 6 +++--- executorlib/executor/slurm.py | 8 ++++---- .../task_scheduler/interactive/dependency.py | 18 +++++++++--------- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index 82c6cb98..b9ffda9f 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -66,7 +66,7 @@ class FluxJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - debug (bool, optional): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. Examples: ``` @@ -107,7 +107,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, - debug: bool = False, + enable_debug_mode: bool = False, ): """ The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager @@ -155,7 +155,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - debug (bool, optional): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -192,7 +192,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, - debug=debug, + enable_debug_mode=enable_debug_mode, ) ) else: diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 9f387df4..a69de075 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -95,7 +95,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, - debug: bool = False, + enable_debug_mode: bool = False, ): """ The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -140,7 +140,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - debug (bool): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -173,7 +173,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, - debug=debug, + enable_debug_mode=enable_debug_mode, ) ) else: diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index a06cac24..6881e188 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -276,7 +276,7 @@ class SlurmJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - debug (bool): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool): Enable debug mode which provides additional information on the execution. Examples: ``` @@ -314,7 +314,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, - debug: bool = False, + enable_debug_mode: bool = False, ): """ The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -363,7 +363,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - debug (bool): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool): Enable debug mode which provides additional information on the execution. """ default_resource_dict: dict = { @@ -397,7 +397,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, - debug=debug, + enable_debug_mode=enable_debug_mode, ) ) else: diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index e995aef3..660f2176 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -29,7 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase): refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01. plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. - debug (bool, optional): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. Attributes: _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object. @@ -46,7 +46,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - debug: bool = False, + enable_debug_mode: bool = False, ) -> None: super().__init__(max_cores=max_cores) self._process_kwargs = { @@ -54,7 +54,7 @@ def __init__( "executor_queue": executor._future_queue, "executor": executor, "refresh_rate": refresh_rate, - "debug": debug, + "enable_debug_mode": enable_debug_mode, } self._set_process( Thread( @@ -227,7 +227,7 @@ def _execute_tasks_with_dependencies( executor_queue: queue.Queue, executor: TaskSchedulerBase, refresh_rate: float = 0.01, - debug: bool = False, + enable_debug_mode: bool = False, ): """ Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from @@ -238,7 +238,7 @@ def _execute_tasks_with_dependencies( executor_queue (Queue): Queue for the internal executor. executor (TaskSchedulerBase): Executor to execute the tasks with after the dependencies are resolved. refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. - debug (bool, optional): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. """ wait_lst = [] while True: @@ -293,7 +293,7 @@ def _execute_tasks_with_dependencies( wait_lst = _update_waiting_task( wait_lst=wait_lst, executor_queue=executor_queue, - debug=debug, + enable_debug_mode=enable_debug_mode, ) # if no job is ready, sleep for a moment if len(wait_lst) == number_waiting: @@ -304,7 +304,7 @@ def _execute_tasks_with_dependencies( def _update_waiting_task( - wait_lst: list[dict], executor_queue: queue.Queue, debug: bool = False + wait_lst: list[dict], executor_queue: queue.Queue, enable_debug_mode: bool = False ) -> list: """ Submit the waiting tasks, which future inputs have been completed, to the executor @@ -312,7 +312,7 @@ def _update_waiting_task( Args: wait_lst (list): List of waiting tasks executor_queue (Queue): Queue of the internal executor - debug (bool, optional): Enable debug mode which provides additional information on the execution. + enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution. Returns: list: list tasks which future inputs have not been completed @@ -344,7 +344,7 @@ def _update_waiting_task( task_wait_dict["future"].set_result(done_lst) else: wait_tmp_lst.append(task_wait_dict) - if debug: + if enable_debug_mode: warnings.warn( f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}", stacklevel=2,