Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
python -m pip install -r docs/requirements.txt
- name: "Build documentation and check for consistency"
env:
CHECKSUM: "5779fa47cef4264d5c279196c77b1cc18eb9eb798f9d4d53b4d33a4b71ebd085"
CHECKSUM: "4b8189e16ca2dbc74b83c552775e4582e5842314f429de69b17734c473638db5"
run: |
cd docs
HASH="$(make checksum | tail -n1)"
Expand Down
6 changes: 2 additions & 4 deletions docs/source/ext/connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ The ``streamflow.core.deployment`` module defines the ``Connector`` interface, w
async def get_available_locations(
self,
service: str | None = None,
input_directory: str | None = None,
output_directory: str | None = None,
tmp_directory: str | None = None,
directories: MutableSequence[str] | None = None,
) -> MutableMapping[str, AvailableLocation]:
...

Expand Down Expand Up @@ -80,7 +78,7 @@ The ``deploy`` method instantiates the remote execution environment, making it r

The ``undeploy`` method destroys the remote execution environment, potentially cleaning up all the temporary resources instantiated during the workflow execution (e.g., intermediate results). If a ``deployment`` object is marked as ``external``, the ``undeploy`` method should not destroy it but just close all the connections opened by the ``deploy`` method.

The ``get_available_locations`` method is used in the scheduling phase to obtain the locations available for job execution, identified by their unique name (see :ref:`here <Scheduling>`). The method receives some optional input parameters to filter valid locations. The ``service`` parameter specifies a specific set of locations in a deployment, and its precise meaning differs for each deployment type (see :ref:`here <Binding steps and deployments>`). The other three parameters (``input_directory``, ``output_directory``, and ``tmp_directory``) allow the ``Connector`` to return correct disk usage values for each of the three folders in case of remote instances with multiple volumes attached.
The ``get_available_locations`` method is used in the scheduling phase to obtain the locations available for job execution, identified by their unique name (see :ref:`here <Scheduling>`). The method receives some optional input parameters to filter valid locations. The ``service`` parameter specifies a specific set of locations in a deployment, and its precise meaning differs for each deployment type (see :ref:`here <Binding steps and deployments>`). The ``directories`` parameter allows the ``Connector`` to return correct disk usage values for each of the folders in the list in case of remote instances with multiple volumes attached.

The ``get_stream_reader`` method returns a ``StreamWrapperContextManager`` instance, which allows the ``src`` data on the ``location`` to be read using a stream (see :ref:`here <Streaming>`). The stream must be read respecting the size of the available buffer, which is defined by the ``transferBufferSize`` attribute of the ``Connector`` instance. This method improve performance of data copies between pairs of remote locations.

Expand Down
10 changes: 0 additions & 10 deletions streamflow/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,6 @@ def _process_binding(self, binding: MutableMapping[str, Any]):
if isinstance(binding["target"], MutableSequence)
else [binding["target"]]
)
for target in targets:
policy = target.get(
"policy",
self.deployments[target.get("deployment", target.get("model", {}))].get(
"policy", "__DEFAULT__"
),
)
if policy not in self.policies:
raise WorkflowDefinitionException(f"Policy {policy} is not defined")
target["policy"] = self.policies[policy]
target_type = "step" if "step" in binding else "port"
if target_type == "port" and "workdir" not in binding["target"]:
raise WorkflowDefinitionException(
Expand Down
5 changes: 0 additions & 5 deletions streamflow/config/schemas/v1.0/config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,6 @@
"description": "If greater than one, the STREAMFLOW_HOSTS variable contains the comma-separated list of nodes allocated for the task",
"default": 1
},
"policy": {
"type": "string",
"description": "The scheduling policy to be used with this target. Overrides the deployment policy when present.",
"default": "data_locality"
},
"service": {
"type": "string"
},
Expand Down
8 changes: 5 additions & 3 deletions streamflow/core/asyncache.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ async def wrapper(self, *args, **kwargs):
k = key(*args, **kwargs)
try:
return c[k]
except KeyError:
pass # key not found
except (KeyError, TypeError):
# KeyError: key not found
# TypeError: it is necessary because list inputs are not hashable
pass
v = await method(self, *args, **kwargs)
try:
c[k] = v
except ValueError:
except (ValueError, TypeError):
pass # value too large
return v

Expand Down
26 changes: 18 additions & 8 deletions streamflow/core/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ async def deploy(self, external: bool) -> None: ...
async def get_available_locations(
self,
service: str | None = None,
input_directory: str | None = None,
output_directory: str | None = None,
tmp_directory: str | None = None,
directories: MutableSequence[str] | None = None,
) -> MutableMapping[str, AvailableLocation]: ...

@abstractmethod
Expand Down Expand Up @@ -156,7 +154,16 @@ async def undeploy_all(self): ...


class DeploymentConfig(PersistableEntity):
__slots__ = ("name", "type", "config", "external", "lazy", "workdir", "wraps")
__slots__ = (
"name",
"type",
"config",
"external",
"lazy",
"policy",
"workdir",
"wraps",
)

def __init__(
self,
Expand All @@ -165,6 +172,7 @@ def __init__(
config: MutableMapping[str, Any],
external: bool = False,
lazy: bool = True,
policy: Config | None = None,
workdir: str | None = None,
wraps: WrapsConfig | None = None,
) -> None:
Expand All @@ -174,6 +182,10 @@ def __init__(
self.config: MutableMapping[str, Any] = config or {}
self.external: bool = external
self.lazy: bool = lazy
# todo: change Config into PolicyConfig and save it into database
self.policy: Config | None = policy or Config(
name="__DEFAULT__", type="data_locality", config={}
)
self.workdir: str | None = workdir
self.wraps: WrapsConfig | None = wraps

Expand All @@ -185,6 +197,7 @@ async def load(
loading_context: DatabaseLoadingContext,
) -> DeploymentConfig:
row = await context.database.get_deployment(persistent_id)
# todo: load policy from database
obj = cls(
name=row["name"],
type=row["type"],
Expand All @@ -198,6 +211,7 @@ async def load(
return obj

async def save(self, context: StreamFlowContext) -> None:
# todo: save policy into database
async with self.persistence_lock:
if not self.persistent_id:
self.persistent_id = await context.database.add_deployment(
Expand All @@ -222,17 +236,13 @@ def __init__(
locations: int = 1,
service: str | None = None,
scheduling_group: str | None = None,
scheduling_policy: Config | None = None,
workdir: str | None = None,
):
super().__init__()
self.deployment: DeploymentConfig = deployment
self.locations: int = locations
self.service: str | None = service
self.scheduling_group: str | None = scheduling_group
self.scheduling_policy: Config | None = scheduling_policy or Config(
name="__DEFAULT__", type="data_locality", config={}
)
self.workdir: str = (
workdir or self.deployment.workdir or _init_workdir(deployment.name)
)
Expand Down
164 changes: 94 additions & 70 deletions streamflow/core/scheduling.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from typing import Any, TYPE_CHECKING, Type, cast

Expand All @@ -14,72 +15,27 @@
from typing import MutableSequence, MutableMapping


class Hardware:
class DeploymentHardware:
def __init__(
self,
cores: float = 0.0,
memory: float = 0.0,
input_directory: float = 0.0,
output_directory: float = 0.0,
tmp_directory: float = 0.0,
cores: float,
memory: float,
storage: MutableMapping[str, float],
):
self.cores: float = cores
self.memory: float = memory
self.input_directory: float = input_directory
self.output_directory: float = output_directory
self.tmp_directory: float = tmp_directory

def __add__(self, other):
if not isinstance(other, Hardware):
return NotImplementedError
return self.__class__(
**{
k: vars(self).get(k, 0.0) + vars(other).get(k, 0.0)
for k in vars(self).keys()
}
)

def __sub__(self, other):
if not isinstance(other, Hardware):
return NotImplementedError
return self.__class__(
**{
k: vars(self).get(k, 0.0) - vars(other).get(k, 0.0)
for k in vars(self).keys()
}
)

def __ge__(self, other):
if not isinstance(other, Hardware):
return NotImplementedError
return all(
vars(self).get(k, 0.0) >= vars(other).get(k, 0.0)
for k in set().union(vars(self).keys(), vars(other).keys())
)

def __gt__(self, other):
if not isinstance(other, Hardware):
return NotImplementedError
return all(
vars(self).get(k, 0.0) > vars(other).get(k, 0.0)
for k in set().union(vars(self).keys(), vars(other).keys())
)
self.storage: MutableMapping[str, float] = storage

def __le__(self, other):
if not isinstance(other, Hardware):
return NotImplementedError
return all(
vars(self).get(k, 0.0) <= vars(other).get(k, 0.0)
for k in set().union(vars(self).keys(), vars(other).keys())
)

def __lt__(self, other):
if not isinstance(other, Hardware):
return NotImplementedError
return all(
vars(self).get(k, 0.0) < vars(other).get(k, 0.0)
for k in set().union(vars(self).keys(), vars(other).keys())
)
class JobHardwareRequirement:
def __init__(
self, cores: float, memory: float, tmp_dir_size: float, out_dir_size: float
):
self.cores = cores
self.memory = memory
self.tmp_dir_size = tmp_dir_size
self.out_dir_size = out_dir_size
# todo: add timeout


class HardwareRequirement(ABC):
Expand All @@ -98,7 +54,7 @@ async def _save_additional_params(
) -> MutableMapping[str, Any]: ...

@abstractmethod
def eval(self, inputs: MutableMapping[str, Token]) -> Hardware: ...
def eval(self, inputs: MutableMapping[str, Token]) -> JobHardwareRequirement: ...

@classmethod
async def load(
Expand Down Expand Up @@ -128,13 +84,13 @@ def __init__(
target: Target,
locations: MutableSequence[ExecutionLocation],
status: Status,
hardware: Hardware,
hardware: JobHardwareRequirement,
):
self.job: str = job
self.target: Target = target
self.locations: MutableSequence[ExecutionLocation] = locations
self.status: Status = status
self.hardware: Hardware = hardware
self.hardware: JobHardwareRequirement = hardware


class AvailableLocation:
Expand All @@ -152,10 +108,10 @@ def __init__(
hostname: str,
service: str | None = None,
slots: int = 1,
hardware: Hardware | None = None,
hardware: DeploymentHardware | None = None,
wraps: AvailableLocation | None = None,
):
self.hardware: Hardware | None = hardware
self.hardware: DeploymentHardware | None = hardware
self.location: ExecutionLocation = ExecutionLocation(
deployment=deployment,
hostname=hostname,
Expand Down Expand Up @@ -198,17 +154,31 @@ async def save(self, context: StreamFlowContext):
return self.config


class JobContext:
__slots__ = ("job", "event", "targets", "hardware_requirement")

def __init__(
self,
job: Job,
targets: MutableSequence[Target],
hardware_requirement: JobHardwareRequirement,
) -> None:
self.job: Job = job
self.event: asyncio.Event = asyncio.Event()
self.targets: MutableSequence[Target] = targets
self.hardware_requirement: JobHardwareRequirement = hardware_requirement


class Policy(SchemaEntity):
@abstractmethod
async def get_location(
self,
context: StreamFlowContext,
job: Job,
hardware_requirement: Hardware,
pending_jobs: MutableSequence[JobContext],
available_locations: MutableMapping[str, AvailableLocation],
jobs: MutableMapping[str, JobAllocation],
scheduled_jobs: MutableSequence[JobAllocation],
locations: MutableMapping[str, MutableMapping[str, LocationAllocation]],
) -> AvailableLocation | None: ...
) -> MutableMapping[str, AvailableLocation]: ...


class Scheduler(SchemaEntity):
Expand All @@ -225,7 +195,7 @@ async def close(self) -> None: ...
def get_allocation(self, job_name: str) -> JobAllocation | None:
return self.job_allocations.get(job_name)

def get_hardware(self, job_name: str) -> Hardware | None:
def get_hardware(self, job_name: str) -> DeploymentHardware | None:
allocation = self.get_allocation(job_name)
return allocation.hardware if allocation else None

Expand Down Expand Up @@ -259,5 +229,59 @@ async def notify_status(self, job_name: str, status: Status) -> None: ...

@abstractmethod
async def schedule(
self, job: Job, binding_config: BindingConfig, hardware_requirement: Hardware
self,
job: Job,
binding_config: BindingConfig,
hardware_requirement: JobHardwareRequirement,
) -> None: ...


def diff_hw(
deployment_hw: DeploymentHardware, hw_requirement: JobHardwareRequirement
) -> DeploymentHardware:
# job = job_context.job
# hw_requirement = job_context.hardware_requirement
# storage = {}
# for path, size in deployment_hw.storage.items():
# # todo: it is not really correct. They can have the same starts but be in different volumes
# # e.g. / -> volume1 and /mnt/data -> volume2
# if job.tmp_directory.startswith(path):
# size -= hw_requirement.tmp_dir_size
# elif job.output_directory.startswith(path):
# size -= hw_requirement.out_dir_size
# storage[path] = size
return DeploymentHardware(
cores=deployment_hw.cores - hw_requirement.cores,
memory=deployment_hw.memory - hw_requirement.memory,
storage=deployment_hw.storage,
)


def sum_job_req(
job_hw_reqs: MutableSequence[JobHardwareRequirement],
) -> JobHardwareRequirement:
cores = 0
memory = 0
tmp_dir_size = 0
out_dir_size = 0
for job_hw_req in job_hw_reqs:
cores += job_hw_req.cores
memory += job_hw_req.memory
tmp_dir_size += job_hw_req.tmp_dir_size
out_dir_size += job_hw_req.out_dir_size
return JobHardwareRequirement(
cores=cores,
memory=memory,
tmp_dir_size=tmp_dir_size,
out_dir_size=out_dir_size,
)


def greater_eq_hw(
deployment_hw: DeploymentHardware, job_hw_req: JobHardwareRequirement
) -> bool:
# todo: dirs comparison
return (
deployment_hw.cores >= job_hw_req.cores
and deployment_hw.memory >= job_hw_req.memory
)
Loading