Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2935,7 +2935,7 @@ scheduler. The class `DaskVine` implements a TaskVine manager that has a
f.min_workers = 1
with f:
with dask.config.set(scheduler=m.get):
result = distance.compute(resources={"cores": 1}, resources_mode="max", lazy_transfers=True)
result = distance.compute(resources={"cores": 1}, resources_mode="max", worker_transfers=True)
print(f"distance = {result}")
print("Terminating workers...", end="")
print("done!")
Expand All @@ -2945,13 +2945,34 @@ The `compute` call above may receive the following keyword arguments:

| Keyword | Description |
|------------ |---------|
| environment | A TaskVine file that provides an [environment](#execution-contexts) to execute each task. |
| environment | A TaskVine file (or a string path to a poncho env tarball) that provides an [environment](#execution-contexts) to execute each task. |
| env\_vars | A dictionary of VAR=VALUE environment variables to set per task. A value should be either a string, or a function that accepts as arguments the manager and task, and that returns a string. |
| extra\_files | A dictionary of {taskvine.File: "remote_name"} of input files to attach to each task.|
| lazy\_transfer | Whether to bring each result back from the workers (False, default), or keep transient results at workers (True) |
| worker\_transfers | Whether to keep intermediate results only at workers for higher throughput (True, default), or to bring back each result to the manager for better fault tolerance (False). |
| resources | A dictionary to specify [maximum resources](#task-resources), e.g. `{"cores": 1, "memory": 2000"}` |
| resources\_mode | [Automatic resource management](#automatic-resource-management) to use, e.g., "fixed", "max", or "max throughput"|
| task\_mode | Mode to execute individual tasks, such as [function calls](#serverless-computing). to use, e.g., "tasks", or "function-calls"|
| task\_priority\_mode | How tasks are prioritized for submission; higher priority is considered first (default "largest-input-first" for faster data pruning). |
| scheduling\_mode | Strategy to dispatch tasks to workers (default "files": prefer workers that already have more of the required input files). |
| retries | Number of times to attempt a task (default 5). |
| submit\_per\_cycle | Maximum number of tasks to submit to the scheduler per loop; None means no limit. |
| max\_pending | Maximum number of tasks without a result before new ones are submitted; None means no limit. |
| verbose | If true, emit additional debugging information. |
| progress\_disable | If True, disable progress bar. |
| progress\_label | Label to use in progress bar. |
| reconstruct | Reconstruct graph based on annotated functions. |
| merge\_size | When reconstructing a merge function, merge this many at a time. |
| wrapper | Function to wrap dask calls for debugging; should return `(wrapper result, dask call result)`. |
| wrapper\_proc | Function to process results from wrapper on completion (default is print). |
| prune\_depth | Control pruning behavior: 0 (default) no pruning; 1 checks direct consumers (most aggressive); 2+ checks consumers up to specified depth (the higher means more conservative). |
| env\_per\_task | If true, each task individually expands its own environment (requires `environment` be a string). |
| lib\_extra\_functions | Additional functions to include in execution library (only for `task_mode="function-calls"`). |
| lib\_resources | Resources for the execution library (only for `task_mode="function-calls"`), e.g. `{"cores": 4, "memory": 2000, "disk": 1000, "slots": 4}`. |
| lib\_command | Command to be prefixed to the execution of a Library task (only for `task_mode="function-calls"`). |
| lib\_modules | Hoist these execution-library imports to avoid redundant module imports across individual function invocations (only for `task_mode="function-calls"`). |
| lazy\_transfers | Deprecated alias for `worker_transfers`. |
| hoisting\_modules | Deprecated alias for `lib_modules`. |
| import\_modules | Deprecated alias for `lib_modules`. |

## Appendix for Developers

Expand Down
24 changes: 13 additions & 11 deletions taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def get(self, dsk, keys, *,
lib_command=None,
lib_modules=None,
task_mode='tasks',
scheduling_mode='FIFO',
task_priority_mode='largest-input-first',
scheduling_mode="files",
env_per_task=False,
progress_disable=False,
reconstruct=False,
Expand Down Expand Up @@ -186,7 +187,8 @@ def get(self, dsk, keys, *,
else:
self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated
self.task_mode = task_mode
self.scheduling_mode = scheduling_mode
self.task_priority_mode = task_priority_mode
self.set_scheduler(scheduling_mode)
self.env_per_task = env_per_task
self.reconstruct = reconstruct
self.merge_size = merge_size
Expand Down Expand Up @@ -370,15 +372,15 @@ def category_name(self, node):
def _task_priority(self, dag, cat, key):
task_depth = dag.depth_of(key)

if self.scheduling_mode == "random":
if self.task_priority_mode == "random":
priority = random.randint(self.min_priority, self.max_priority)
elif self.scheduling_mode == "depth-first":
elif self.task_priority_mode == "depth-first":
# dig more information about different kinds of tasks
priority = task_depth
elif self.scheduling_mode == "breadth-first":
elif self.task_priority_mode == "breadth-first":
# prefer to start all branches as soon as possible
priority = -task_depth
elif self.scheduling_mode == "longest-category-first":
elif self.task_priority_mode == "longest-category-first":
# if no tasks have been executed in this category, set a high priority
# so that we know more information about each category
if self.category_info[cat]["num_tasks"]:
Expand All @@ -388,7 +390,7 @@ def _task_priority(self, dag, cat, key):
)
else:
priority = self.max_priority
elif self.scheduling_mode == "shortest-category-first":
elif self.task_priority_mode == "shortest-category-first":
# if no tasks have been executed in this category, set a high priority
# so that we know more information about each category
if self.category_info[cat]["num_tasks"]:
Expand All @@ -398,17 +400,17 @@ def _task_priority(self, dag, cat, key):
)
else:
priority = self.max_priority
elif self.scheduling_mode == "FIFO":
elif self.task_priority_mode == "FIFO":
# first in first out, the default behavior
priority = -round(time.time(), 6)
elif self.scheduling_mode == "LIFO":
elif self.task_priority_mode == "LIFO":
# last in first out, the opposite of FIFO
priority = round(time.time(), 6)
elif self.scheduling_mode == "largest-input-first":
elif self.task_priority_mode == "largest-input-first":
# best for saving disk space (with pruing)
priority = sum([len(dag.get_result(c)._file) for c in dag.get_dependencies(key)])
else:
raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}")
raise ValueError(f"Unknown scheduling mode {self.task_priority_mode}")

return priority

Expand Down