diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md index 2412a11b81..6703c1285d 100644 --- a/doc/manuals/taskvine/index.md +++ b/doc/manuals/taskvine/index.md @@ -2935,7 +2935,7 @@ scheduler. The class `DaskVine` implements a TaskVine manager that has a f.min_workers = 1 with f: with dask.config.set(scheduler=m.get): - result = distance.compute(resources={"cores": 1}, resources_mode="max", lazy_transfers=True) + result = distance.compute(resources={"cores": 1}, resources_mode="max", worker_transfers=True) print(f"distance = {result}") print("Terminating workers...", end="") print("done!") @@ -2945,13 +2945,34 @@ The `compute` call above may receive the following keyword arguments: | Keyword | Description | |------------ |---------| -| environment | A TaskVine file that provides an [environment](#execution-contexts) to execute each task. | +| environment | A TaskVine file (or a string path to a poncho env tarball) that provides an [environment](#execution-contexts) to execute each task. | | env\_vars | A dictionary of VAR=VALUE environment variables to set per task. A value should be either a string, or a function that accepts as arguments the manager and task, and that returns a string. | | extra\_files | A dictionary of {taskvine.File: "remote_name"} of input files to attach to each task.| -| lazy\_transfer | Whether to bring each result back from the workers (False, default), or keep transient results at workers (True) | +| worker\_transfers | Whether to keep intermediate results only at workers for higher throughput (True, default), or to bring back each result to the manager for better fault tolerance (False). | | resources | A dictionary to specify [maximum resources](#task-resources), e.g. `{"cores": 1, "memory": 2000"}` | | resources\_mode | [Automatic resource management](#automatic-resource-management) to use, e.g., "fixed", "max", or "max throughput"| | task\_mode | Mode to execute individual tasks, such as [function calls](#serverless-computing). to use, e.g., "tasks", or "function-calls"| +| task\_priority\_mode | How tasks are prioritized for submission; higher priority is considered first (default "largest-input-first" for faster data pruning). | +| scheduling\_mode | Strategy to dispatch tasks to workers (default "files": prefer workers that already have more of the required input files). | +| retries | Number of times to attempt a task (default 5). | +| submit\_per\_cycle | Maximum number of tasks to submit to the scheduler per loop; None means no limit. | +| max\_pending | Maximum number of tasks without a result before new ones are submitted; None means no limit. | +| verbose | If true, emit additional debugging information. | +| progress\_disable | If True, disable progress bar. | +| progress\_label | Label to use in progress bar. | +| reconstruct | Reconstruct graph based on annotated functions. | +| merge\_size | When reconstructing a merge function, merge this many at a time. | +| wrapper | Function to wrap dask calls for debugging; should return `(wrapper result, dask call result)`. | +| wrapper\_proc | Function to process results from wrapper on completion (default is print). | +| prune\_depth | Control pruning behavior: 0 (default) no pruning; 1 checks direct consumers (most aggressive); 2+ checks consumers up to specified depth (the higher means more conservative). | +| env\_per\_task | If true, each task individually expands its own environment (requires `environment` be a string). | +| lib\_extra\_functions | Additional functions to include in execution library (only for `task_mode="function-calls"`). | +| lib\_resources | Resources for the execution library (only for `task_mode="function-calls"`), e.g. `{"cores": 4, "memory": 2000, "disk": 1000, "slots": 4}`. | +| lib\_command | Command to be prefixed to the execution of a Library task (only for `task_mode="function-calls"`). | +| lib\_modules | Hoist these execution-library imports to avoid redundant module imports across individual function invocations (only for `task_mode="function-calls"`). | +| lazy\_transfers | Deprecated alias for `worker_transfers`. | +| hoisting\_modules | Deprecated alias for `lib_modules`. | +| import\_modules | Deprecated alias for `lib_modules`. | ## Appendix for Developers diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 1b1def413f..0e2fa6bdcf 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -139,7 +139,8 @@ def get(self, dsk, keys, *, lib_command=None, lib_modules=None, task_mode='tasks', - scheduling_mode='FIFO', + task_priority_mode='largest-input-first', + scheduling_mode="files", env_per_task=False, progress_disable=False, reconstruct=False, @@ -186,7 +187,8 @@ def get(self, dsk, keys, *, else: self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated self.task_mode = task_mode - self.scheduling_mode = scheduling_mode + self.task_priority_mode = task_priority_mode + self.set_scheduler(scheduling_mode) self.env_per_task = env_per_task self.reconstruct = reconstruct self.merge_size = merge_size @@ -370,15 +372,15 @@ def category_name(self, node): def _task_priority(self, dag, cat, key): task_depth = dag.depth_of(key) - if self.scheduling_mode == "random": + if self.task_priority_mode == "random": priority = random.randint(self.min_priority, self.max_priority) - elif self.scheduling_mode == "depth-first": + elif self.task_priority_mode == "depth-first": # dig more information about different kinds of tasks priority = task_depth - elif self.scheduling_mode == "breadth-first": + elif self.task_priority_mode == "breadth-first": # prefer to start all branches as soon as possible priority = -task_depth - elif self.scheduling_mode == "longest-category-first": + elif self.task_priority_mode == "longest-category-first": # if no tasks have been executed in this category, set a high priority # so that we know more information about each category if self.category_info[cat]["num_tasks"]: @@ -388,7 +390,7 @@ def _task_priority(self, dag, cat, key): ) else: priority = self.max_priority - elif self.scheduling_mode == "shortest-category-first": + elif self.task_priority_mode == "shortest-category-first": # if no tasks have been executed in this category, set a high priority # so that we know more information about each category if self.category_info[cat]["num_tasks"]: @@ -398,17 +400,17 @@ def _task_priority(self, dag, cat, key): ) else: priority = self.max_priority - elif self.scheduling_mode == "FIFO": + elif self.task_priority_mode == "FIFO": # first in first out, the default behavior priority = -round(time.time(), 6) - elif self.scheduling_mode == "LIFO": + elif self.task_priority_mode == "LIFO": # last in first out, the opposite of FIFO priority = round(time.time(), 6) - elif self.scheduling_mode == "largest-input-first": + elif self.task_priority_mode == "largest-input-first": # best for saving disk space (with pruing) priority = sum([len(dag.get_result(c)._file) for c in dag.get_dependencies(key)]) else: - raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}") + raise ValueError(f"Unknown scheduling mode {self.task_priority_mode}") return priority