Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class FluxJobExecutor(BaseExecutor):
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution.

Examples:
```
Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
log_obj_size: bool = False,
enable_debug_mode: bool = False,
):
"""
The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager
Expand Down Expand Up @@ -153,6 +155,7 @@ def __init__(
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -189,6 +192,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
enable_debug_mode=enable_debug_mode,
)
)
else:
Expand Down
3 changes: 3 additions & 0 deletions executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def __init__(
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
log_obj_size: bool = False,
enable_debug_mode: bool = False,
):
"""
The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload
Expand Down Expand Up @@ -139,6 +140,7 @@ def __init__(
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
enable_debug_mode (bool): Enable debug mode which provides additional information on the execution.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -171,6 +173,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
enable_debug_mode=enable_debug_mode,
)
)
else:
Expand Down
4 changes: 4 additions & 0 deletions executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class SlurmJobExecutor(BaseExecutor):
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
enable_debug_mode (bool): Enable debug mode which provides additional information on the execution.

Examples:
```
Expand Down Expand Up @@ -313,6 +314,7 @@ def __init__(
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
log_obj_size: bool = False,
enable_debug_mode: bool = False,
):
"""
The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload
Expand Down Expand Up @@ -361,6 +363,7 @@ def __init__(
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
enable_debug_mode (bool): Enable debug mode which provides additional information on the execution.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -394,6 +397,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
enable_debug_mode=enable_debug_mode,
)
)
else:
Expand Down
20 changes: 18 additions & 2 deletions executorlib/task_scheduler/interactive/dependency.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import queue
import warnings
from concurrent.futures import Future
from threading import Thread
from time import sleep
Expand Down Expand Up @@ -28,6 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase):
refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution.

Attributes:
_future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
Expand All @@ -44,13 +46,15 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
enable_debug_mode: bool = False,
) -> None:
super().__init__(max_cores=max_cores)
self._process_kwargs = {
"future_queue": self._future_queue,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": refresh_rate,
"enable_debug_mode": enable_debug_mode,
}
self._set_process(
Thread(
Expand Down Expand Up @@ -223,6 +227,7 @@ def _execute_tasks_with_dependencies(
executor_queue: queue.Queue,
executor: TaskSchedulerBase,
refresh_rate: float = 0.01,
enable_debug_mode: bool = False,
):
"""
Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
Expand All @@ -233,6 +238,7 @@ def _execute_tasks_with_dependencies(
executor_queue (Queue): Queue for the internal executor.
executor (TaskSchedulerBase): Executor to execute the tasks with after the dependencies are resolved.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution.
"""
wait_lst = []
while True:
Expand Down Expand Up @@ -285,7 +291,9 @@ def _execute_tasks_with_dependencies(
number_waiting = len(wait_lst)
# Check functions in the wait list and execute them if all future objects are now ready
wait_lst = _update_waiting_task(
wait_lst=wait_lst, executor_queue=executor_queue
wait_lst=wait_lst,
executor_queue=executor_queue,
enable_debug_mode=enable_debug_mode,
)
# if no job is ready, sleep for a moment
if len(wait_lst) == number_waiting:
Expand All @@ -295,13 +303,16 @@ def _execute_tasks_with_dependencies(
sleep(refresh_rate)


def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list:
def _update_waiting_task(
wait_lst: list[dict], executor_queue: queue.Queue, enable_debug_mode: bool = False
) -> list:
"""
Submit the waiting tasks, which future inputs have been completed, to the executor

Args:
wait_lst (list): List of waiting tasks
executor_queue (Queue): Queue of the internal executor
enable_debug_mode (bool, optional): Enable debug mode which provides additional information on the execution.

Returns:
list: list tasks which future inputs have not been completed
Expand Down Expand Up @@ -333,4 +344,9 @@ def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l
task_wait_dict["future"].set_result(done_lst)
else:
wait_tmp_lst.append(task_wait_dict)
if enable_debug_mode:
warnings.warn(
f"{len(wait_tmp_lst)} tasks are still waiting for dependencies, previously {len(wait_lst)}",
stacklevel=2,
)
return wait_tmp_lst
Loading