diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index d0efa17c1..f3524112b 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -147,7 +147,7 @@ jobs: python -m pip install -r docs/requirements.txt - name: "Build documentation and check for consistency" env: - CHECKSUM: "5779fa47cef4264d5c279196c77b1cc18eb9eb798f9d4d53b4d33a4b71ebd085" + CHECKSUM: "4b8189e16ca2dbc74b83c552775e4582e5842314f429de69b17734c473638db5" run: | cd docs HASH="$(make checksum | tail -n1)" diff --git a/docs/source/ext/connector.rst b/docs/source/ext/connector.rst index 95ad0c52d..8f2b67c32 100644 --- a/docs/source/ext/connector.rst +++ b/docs/source/ext/connector.rst @@ -45,9 +45,7 @@ The ``streamflow.core.deployment`` module defines the ``Connector`` interface, w async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: ... @@ -80,7 +78,7 @@ The ``deploy`` method instantiates the remote execution environment, making it r The ``undeploy`` method destroys the remote execution environment, potentially cleaning up all the temporary resources instantiated during the workflow execution (e.g., intermediate results). If a ``deployment`` object is marked as ``external``, the ``undeploy`` method should not destroy it but just close all the connections opened by the ``deploy`` method. -The ``get_available_locations`` method is used in the scheduling phase to obtain the locations available for job execution, identified by their unique name (see :ref:`here `). The method receives some optional input parameters to filter valid locations. The ``service`` parameter specifies a specific set of locations in a deployment, and its precise meaning differs for each deployment type (see :ref:`here `). The other three parameters (``input_directory``, ``output_directory``, and ``tmp_directory``) allow the ``Connector`` to return correct disk usage values for each of the three folders in case of remote instances with multiple volumes attached. +The ``get_available_locations`` method is used in the scheduling phase to obtain the locations available for job execution, identified by their unique name (see :ref:`here `). The method receives some optional input parameters to filter valid locations. The ``service`` parameter specifies a specific set of locations in a deployment, and its precise meaning differs for each deployment type (see :ref:`here `). The ``directories`` parameter allows the ``Connector`` to return correct disk usage values for each of the folders in the list in case of remote instances with multiple volumes attached. The ``get_stream_reader`` method returns a ``StreamWrapperContextManager`` instance, which allows the ``src`` data on the ``location`` to be read using a stream (see :ref:`here `). The stream must be read respecting the size of the available buffer, which is defined by the ``transferBufferSize`` attribute of the ``Connector`` instance. This method improve performance of data copies between pairs of remote locations. diff --git a/streamflow/config/config.py b/streamflow/config/config.py index dc7c67cbf..8006b0a62 100644 --- a/streamflow/config/config.py +++ b/streamflow/config/config.py @@ -65,16 +65,6 @@ def _process_binding(self, binding: MutableMapping[str, Any]): if isinstance(binding["target"], MutableSequence) else [binding["target"]] ) - for target in targets: - policy = target.get( - "policy", - self.deployments[target.get("deployment", target.get("model", {}))].get( - "policy", "__DEFAULT__" - ), - ) - if policy not in self.policies: - raise WorkflowDefinitionException(f"Policy {policy} is not defined") - target["policy"] = self.policies[policy] target_type = "step" if "step" in binding else "port" if target_type == "port" and "workdir" not in binding["target"]: raise WorkflowDefinitionException( diff --git a/streamflow/config/schemas/v1.0/config_schema.json b/streamflow/config/schemas/v1.0/config_schema.json index 8a91d3667..fb157177d 100644 --- a/streamflow/config/schemas/v1.0/config_schema.json +++ b/streamflow/config/schemas/v1.0/config_schema.json @@ -296,11 +296,6 @@ "description": "If greater than one, the STREAMFLOW_HOSTS variable contains the comma-separated list of nodes allocated for the task", "default": 1 }, - "policy": { - "type": "string", - "description": "The scheduling policy to be used with this target. Overrides the deployment policy when present.", - "default": "data_locality" - }, "service": { "type": "string" }, diff --git a/streamflow/core/asyncache.py b/streamflow/core/asyncache.py index de9f343b5..b1ef6081d 100644 --- a/streamflow/core/asyncache.py +++ b/streamflow/core/asyncache.py @@ -89,12 +89,14 @@ async def wrapper(self, *args, **kwargs): k = key(*args, **kwargs) try: return c[k] - except KeyError: - pass # key not found + except (KeyError, TypeError): + # KeyError: key not found + # TypeError: it is necessary because list inputs are not hashable + pass v = await method(self, *args, **kwargs) try: c[k] = v - except ValueError: + except (ValueError, TypeError): pass # value too large return v diff --git a/streamflow/core/deployment.py b/streamflow/core/deployment.py index fdfef33e9..0724af5ed 100644 --- a/streamflow/core/deployment.py +++ b/streamflow/core/deployment.py @@ -106,9 +106,7 @@ async def deploy(self, external: bool) -> None: ... async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: ... @abstractmethod @@ -156,7 +154,16 @@ async def undeploy_all(self): ... class DeploymentConfig(PersistableEntity): - __slots__ = ("name", "type", "config", "external", "lazy", "workdir", "wraps") + __slots__ = ( + "name", + "type", + "config", + "external", + "lazy", + "policy", + "workdir", + "wraps", + ) def __init__( self, @@ -165,6 +172,7 @@ def __init__( config: MutableMapping[str, Any], external: bool = False, lazy: bool = True, + policy: Config | None = None, workdir: str | None = None, wraps: WrapsConfig | None = None, ) -> None: @@ -174,6 +182,10 @@ def __init__( self.config: MutableMapping[str, Any] = config or {} self.external: bool = external self.lazy: bool = lazy + # todo: change Config into PolicyConfig and save it into database + self.policy: Config | None = policy or Config( + name="__DEFAULT__", type="data_locality", config={} + ) self.workdir: str | None = workdir self.wraps: WrapsConfig | None = wraps @@ -185,6 +197,7 @@ async def load( loading_context: DatabaseLoadingContext, ) -> DeploymentConfig: row = await context.database.get_deployment(persistent_id) + # todo: load policy from database obj = cls( name=row["name"], type=row["type"], @@ -198,6 +211,7 @@ async def load( return obj async def save(self, context: StreamFlowContext) -> None: + # todo: save policy into database async with self.persistence_lock: if not self.persistent_id: self.persistent_id = await context.database.add_deployment( @@ -222,7 +236,6 @@ def __init__( locations: int = 1, service: str | None = None, scheduling_group: str | None = None, - scheduling_policy: Config | None = None, workdir: str | None = None, ): super().__init__() @@ -230,9 +243,6 @@ def __init__( self.locations: int = locations self.service: str | None = service self.scheduling_group: str | None = scheduling_group - self.scheduling_policy: Config | None = scheduling_policy or Config( - name="__DEFAULT__", type="data_locality", config={} - ) self.workdir: str = ( workdir or self.deployment.workdir or _init_workdir(deployment.name) ) diff --git a/streamflow/core/scheduling.py b/streamflow/core/scheduling.py index f3533121d..14b540e84 100644 --- a/streamflow/core/scheduling.py +++ b/streamflow/core/scheduling.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from abc import ABC, abstractmethod from typing import Any, TYPE_CHECKING, Type, cast @@ -14,72 +15,27 @@ from typing import MutableSequence, MutableMapping -class Hardware: +class DeploymentHardware: def __init__( self, - cores: float = 0.0, - memory: float = 0.0, - input_directory: float = 0.0, - output_directory: float = 0.0, - tmp_directory: float = 0.0, + cores: float, + memory: float, + storage: MutableMapping[str, float], ): self.cores: float = cores self.memory: float = memory - self.input_directory: float = input_directory - self.output_directory: float = output_directory - self.tmp_directory: float = tmp_directory - - def __add__(self, other): - if not isinstance(other, Hardware): - return NotImplementedError - return self.__class__( - **{ - k: vars(self).get(k, 0.0) + vars(other).get(k, 0.0) - for k in vars(self).keys() - } - ) - - def __sub__(self, other): - if not isinstance(other, Hardware): - return NotImplementedError - return self.__class__( - **{ - k: vars(self).get(k, 0.0) - vars(other).get(k, 0.0) - for k in vars(self).keys() - } - ) - - def __ge__(self, other): - if not isinstance(other, Hardware): - return NotImplementedError - return all( - vars(self).get(k, 0.0) >= vars(other).get(k, 0.0) - for k in set().union(vars(self).keys(), vars(other).keys()) - ) - - def __gt__(self, other): - if not isinstance(other, Hardware): - return NotImplementedError - return all( - vars(self).get(k, 0.0) > vars(other).get(k, 0.0) - for k in set().union(vars(self).keys(), vars(other).keys()) - ) + self.storage: MutableMapping[str, float] = storage - def __le__(self, other): - if not isinstance(other, Hardware): - return NotImplementedError - return all( - vars(self).get(k, 0.0) <= vars(other).get(k, 0.0) - for k in set().union(vars(self).keys(), vars(other).keys()) - ) - def __lt__(self, other): - if not isinstance(other, Hardware): - return NotImplementedError - return all( - vars(self).get(k, 0.0) < vars(other).get(k, 0.0) - for k in set().union(vars(self).keys(), vars(other).keys()) - ) +class JobHardwareRequirement: + def __init__( + self, cores: float, memory: float, tmp_dir_size: float, out_dir_size: float + ): + self.cores = cores + self.memory = memory + self.tmp_dir_size = tmp_dir_size + self.out_dir_size = out_dir_size + # todo: add timeout class HardwareRequirement(ABC): @@ -98,7 +54,7 @@ async def _save_additional_params( ) -> MutableMapping[str, Any]: ... @abstractmethod - def eval(self, inputs: MutableMapping[str, Token]) -> Hardware: ... + def eval(self, inputs: MutableMapping[str, Token]) -> JobHardwareRequirement: ... @classmethod async def load( @@ -128,13 +84,13 @@ def __init__( target: Target, locations: MutableSequence[ExecutionLocation], status: Status, - hardware: Hardware, + hardware: JobHardwareRequirement, ): self.job: str = job self.target: Target = target self.locations: MutableSequence[ExecutionLocation] = locations self.status: Status = status - self.hardware: Hardware = hardware + self.hardware: JobHardwareRequirement = hardware class AvailableLocation: @@ -152,10 +108,10 @@ def __init__( hostname: str, service: str | None = None, slots: int = 1, - hardware: Hardware | None = None, + hardware: DeploymentHardware | None = None, wraps: AvailableLocation | None = None, ): - self.hardware: Hardware | None = hardware + self.hardware: DeploymentHardware | None = hardware self.location: ExecutionLocation = ExecutionLocation( deployment=deployment, hostname=hostname, @@ -198,17 +154,31 @@ async def save(self, context: StreamFlowContext): return self.config +class JobContext: + __slots__ = ("job", "event", "targets", "hardware_requirement") + + def __init__( + self, + job: Job, + targets: MutableSequence[Target], + hardware_requirement: JobHardwareRequirement, + ) -> None: + self.job: Job = job + self.event: asyncio.Event = asyncio.Event() + self.targets: MutableSequence[Target] = targets + self.hardware_requirement: JobHardwareRequirement = hardware_requirement + + class Policy(SchemaEntity): @abstractmethod async def get_location( self, context: StreamFlowContext, - job: Job, - hardware_requirement: Hardware, + pending_jobs: MutableSequence[JobContext], available_locations: MutableMapping[str, AvailableLocation], - jobs: MutableMapping[str, JobAllocation], + scheduled_jobs: MutableSequence[JobAllocation], locations: MutableMapping[str, MutableMapping[str, LocationAllocation]], - ) -> AvailableLocation | None: ... + ) -> MutableMapping[str, AvailableLocation]: ... class Scheduler(SchemaEntity): @@ -225,7 +195,7 @@ async def close(self) -> None: ... def get_allocation(self, job_name: str) -> JobAllocation | None: return self.job_allocations.get(job_name) - def get_hardware(self, job_name: str) -> Hardware | None: + def get_hardware(self, job_name: str) -> DeploymentHardware | None: allocation = self.get_allocation(job_name) return allocation.hardware if allocation else None @@ -259,5 +229,59 @@ async def notify_status(self, job_name: str, status: Status) -> None: ... @abstractmethod async def schedule( - self, job: Job, binding_config: BindingConfig, hardware_requirement: Hardware + self, + job: Job, + binding_config: BindingConfig, + hardware_requirement: JobHardwareRequirement, ) -> None: ... + + +def diff_hw( + deployment_hw: DeploymentHardware, hw_requirement: JobHardwareRequirement +) -> DeploymentHardware: + # job = job_context.job + # hw_requirement = job_context.hardware_requirement + # storage = {} + # for path, size in deployment_hw.storage.items(): + # # todo: it is not really correct. They can have the same starts but be in different volumes + # # e.g. / -> volume1 and /mnt/data -> volume2 + # if job.tmp_directory.startswith(path): + # size -= hw_requirement.tmp_dir_size + # elif job.output_directory.startswith(path): + # size -= hw_requirement.out_dir_size + # storage[path] = size + return DeploymentHardware( + cores=deployment_hw.cores - hw_requirement.cores, + memory=deployment_hw.memory - hw_requirement.memory, + storage=deployment_hw.storage, + ) + + +def sum_job_req( + job_hw_reqs: MutableSequence[JobHardwareRequirement], +) -> JobHardwareRequirement: + cores = 0 + memory = 0 + tmp_dir_size = 0 + out_dir_size = 0 + for job_hw_req in job_hw_reqs: + cores += job_hw_req.cores + memory += job_hw_req.memory + tmp_dir_size += job_hw_req.tmp_dir_size + out_dir_size += job_hw_req.out_dir_size + return JobHardwareRequirement( + cores=cores, + memory=memory, + tmp_dir_size=tmp_dir_size, + out_dir_size=out_dir_size, + ) + + +def greater_eq_hw( + deployment_hw: DeploymentHardware, job_hw_req: JobHardwareRequirement +) -> bool: + # todo: dirs comparison + return ( + deployment_hw.cores >= job_hw_req.cores + and deployment_hw.memory >= job_hw_req.memory + ) diff --git a/streamflow/core/utils.py b/streamflow/core/utils.py index 1e3d59199..1e539dcaa 100644 --- a/streamflow/core/utils.py +++ b/streamflow/core/utils.py @@ -200,6 +200,20 @@ def get_option( raise TypeError("Unsupported value type") +def get_disks_usage(directories: MutableSequence[str]): + return [ + "df", + *directories, + "|", + "tail", + "-n", + str(len(directories)), + "|", + "awk", + "'{{print $6, $2}}'", + ] + + async def get_remote_to_remote_write_command( src_connector: Connector, src_location: ExecutionLocation, diff --git a/streamflow/cwl/hardware.py b/streamflow/cwl/hardware.py index 205b57582..f72c33f54 100644 --- a/streamflow/cwl/hardware.py +++ b/streamflow/cwl/hardware.py @@ -5,7 +5,10 @@ from streamflow.core.context import StreamFlowContext from streamflow.core.persistence import DatabaseLoadingContext -from streamflow.core.scheduling import Hardware, HardwareRequirement +from streamflow.core.scheduling import ( + HardwareRequirement, + JobHardwareRequirement, +) from streamflow.core.workflow import Token from streamflow.cwl.utils import eval_expression from streamflow.workflow.utils import get_token_value @@ -18,7 +21,7 @@ def __init__( cores: str | float | None = None, memory: str | float | None = None, tmpdir: str | float | None = None, - outdir: str | float | None = None, + outdir: str | float | None = None, # todo: add timeout full_js: bool = False, expression_lib: MutableSequence[str] | None = None, ): @@ -70,11 +73,11 @@ def _process_requirement( ) ) - def eval(self, inputs: MutableMapping[str, Token]) -> Hardware: + def eval(self, inputs: MutableMapping[str, Token]) -> JobHardwareRequirement: context = {"inputs": {name: get_token_value(t) for name, t in inputs.items()}} - return Hardware( + return JobHardwareRequirement( cores=self._process_requirement(self.cores, context), memory=self._process_requirement(self.memory, context), - tmp_directory=self._process_requirement(self.tmpdir, context), - output_directory=self._process_requirement(self.outdir, context), + tmp_dir_size=self._process_requirement(self.tmpdir, context), + out_dir_size=self._process_requirement(self.outdir, context), ) diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 0f7d57a93..5462a1d2a 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -30,7 +30,7 @@ WorkflowDefinitionException, WorkflowExecutionException, ) -from streamflow.core.scheduling import Hardware +from streamflow.core.scheduling import JobHardwareRequirement from streamflow.core.utils import random_name from streamflow.core.workflow import Job, Token, Workflow from streamflow.cwl.expression import DependencyResolver @@ -221,7 +221,7 @@ def build_context( inputs: MutableMapping[str, Token], output_directory: str | None = None, tmp_directory: str | None = None, - hardware: Hardware | None = None, + hardware: JobHardwareRequirement | None = None, ) -> MutableMapping[str, Any]: context = {"inputs": {}, "self": None, "runtime": {}} for name, token in inputs.items(): @@ -234,9 +234,9 @@ def build_context( context["runtime"]["cores"] = hardware.cores context["runtime"]["ram"] = hardware.memory # noinspection PyUnresolvedReferences - context["runtime"]["tmpdirSize"] = hardware.tmp_directory + context["runtime"]["tmpdirSize"] = hardware.tmp_dir_size # noinspection PyUnresolvedReferences - context["runtime"]["outdirSize"] = hardware.output_directory + context["runtime"]["outdirSize"] = hardware.out_dir_size return context diff --git a/streamflow/deployment/connector/container.py b/streamflow/deployment/connector/container.py index cedfdf59a..92b2c23f9 100644 --- a/streamflow/deployment/connector/container.py +++ b/streamflow/deployment/connector/container.py @@ -672,9 +672,7 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: return { container_id: await self._get_location(container_id) @@ -867,9 +865,7 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: ps_command = (await self._get_base_command()) + "".join( ["ps ", "--format ", "json ", service or ""] @@ -1209,9 +1205,7 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: location_tasks = {} for instance_name in self.instanceNames: diff --git a/streamflow/deployment/connector/kubernetes.py b/streamflow/deployment/connector/kubernetes.py index c9537e1fa..913ea0325 100644 --- a/streamflow/deployment/connector/kubernetes.py +++ b/streamflow/deployment/connector/kubernetes.py @@ -471,9 +471,7 @@ async def deploy(self, external: bool): async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: pods = await self._get_running_pods() valid_targets = {} diff --git a/streamflow/deployment/connector/local.py b/streamflow/deployment/connector/local.py index 8814a458c..96340d20a 100644 --- a/streamflow/deployment/connector/local.py +++ b/streamflow/deployment/connector/local.py @@ -15,14 +15,24 @@ ExecutionLocation, LOCAL_LOCATION, ) -from streamflow.core.scheduling import AvailableLocation, Hardware +from streamflow.core.scheduling import AvailableLocation, DeploymentHardware from streamflow.deployment.connector.base import BaseConnector -def _get_disk_usage(path: Path): - while not os.path.exists(path): - path = path.parent - return float(shutil.disk_usage(path).free / 2**20) +def _get_disks_usage(directories: MutableSequence[str]) -> MutableMapping[str, float]: + volumes = {} + for directory in directories: + path = Path(directory) + while not os.path.exists(path): + path = path.parent + volume_name = os.sep + for partition in psutil.disk_partitions(all=False): + if path.name.startswith(partition.mountpoint) and len( + partition.mountpoint + ) > len(volume_name): + volume_name = path.name + volumes[volume_name] = float(shutil.disk_usage(path).free / 2**20) + return volumes class LocalConnector(BaseConnector): @@ -84,9 +94,7 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: return { LOCAL_LOCATION: AvailableLocation( @@ -95,23 +103,15 @@ async def get_available_locations( service=service, hostname="localhost", slots=1, - hardware=Hardware( + hardware=DeploymentHardware( cores=self.cores, memory=self.memory, - input_directory=( - _get_disk_usage(Path(input_directory)) - if input_directory - else float("inf") - ), - output_directory=( - _get_disk_usage(Path(output_directory)) - if output_directory - else float("inf") - ), - tmp_directory=( - _get_disk_usage(Path(tmp_directory)) - if tmp_directory - else float("inf") + storage=( + _get_disks_usage(directories) + if directories + else {} + # todo: Set float("inf") in the volumes not defined. + # Probably in the comparison methods of Hardware ), ), ) diff --git a/streamflow/deployment/connector/occam.py b/streamflow/deployment/connector/occam.py index 8d0a0e189..9dfefa613 100644 --- a/streamflow/deployment/connector/occam.py +++ b/streamflow/deployment/connector/occam.py @@ -373,9 +373,7 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: nodes = ( self.jobs_table.get(service, []) diff --git a/streamflow/deployment/connector/queue_manager.py b/streamflow/deployment/connector/queue_manager.py index 1bde9ac7c..42116e41c 100644 --- a/streamflow/deployment/connector/queue_manager.py +++ b/streamflow/deployment/connector/queue_manager.py @@ -457,9 +457,7 @@ def _service_class(self) -> type[QueueManagerService]: ... async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: if service is not None and service not in self.services: raise WorkflowDefinitionException( diff --git a/streamflow/deployment/connector/ssh.py b/streamflow/deployment/connector/ssh.py index 814ac2338..89d56af0a 100644 --- a/streamflow/deployment/connector/ssh.py +++ b/streamflow/deployment/connector/ssh.py @@ -19,7 +19,8 @@ from streamflow.core.data import StreamWrapperContextManager from streamflow.core.deployment import Connector, ExecutionLocation from streamflow.core.exception import WorkflowExecutionException -from streamflow.core.scheduling import AvailableLocation, Hardware +from streamflow.core.scheduling import AvailableLocation, DeploymentHardware +from streamflow.core.utils import get_disks_usage from streamflow.deployment import aiotarstream from streamflow.deployment.connector.base import BaseConnector, extract_tar_stream from streamflow.deployment.stream import StreamReaderWrapper, StreamWriterWrapper @@ -596,20 +597,25 @@ def _get_data_transfer_process( environment=environment, ) - async def _get_disk_usage(self, location: str, directory: str) -> float: + async def _get_disks_usage( + self, location: str, directories: MutableSequence[str] + ) -> MutableMapping[str, float]: + if not directories: + return {} async with self._get_ssh_client_process( location=location, - command=f"df {directory} | tail -n 1 | awk '{{print $2}}'", + command=" ".join(get_disks_usage(directories)), stderr=asyncio.subprocess.STDOUT, ) as proc: - if directory: - result = await proc.wait() - if result.returncode == 0: - return float(result.stdout.strip()) / 2**10 - else: - raise WorkflowExecutionException(result.returncode) + result = await proc.wait() + if result.returncode == 0: + volumes = {} + for line in str(result.stdout.strip()).split("\n"): + volume_name, size = line.split(" ") + volumes[volume_name] = size + return volumes else: - return float("inf") + raise WorkflowExecutionException(result.returncode) async def _get_existing_parent(self, location: str, directory: str): if directory is None: @@ -632,16 +638,17 @@ async def _get_existing_parent(self, location: str, directory: str): async def _get_location_hardware( self, location: str, - input_directory: str, - output_directory: str, - tmp_directory: str, - ) -> Hardware: - return Hardware( - await self._get_cores(location), - await self._get_memory(location), - await self._get_disk_usage(location, input_directory), - await self._get_disk_usage(location, output_directory), - await self._get_disk_usage(location, tmp_directory), + directories: MutableSequence[str] | None = None, + ) -> DeploymentHardware: + cores, memory, storage = await asyncio.gather( + asyncio.create_task(self._get_cores(location)), + asyncio.create_task(self._get_memory(location)), + asyncio.create_task(self._get_disks_usage(location, directories)), + ) + return DeploymentHardware( + cores, + memory, + (storage if directories else {}), ) async def _get_memory(self, location: str) -> float: @@ -726,28 +733,25 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: locations = {} for location_obj in self.nodes.values(): - inpdir, outdir, tmpdir = await asyncio.gather( - asyncio.create_task( - self._get_existing_parent(location_obj.hostname, input_directory) - ), - asyncio.create_task( - self._get_existing_parent(location_obj.hostname, output_directory) - ), - asyncio.create_task( - self._get_existing_parent(location_obj.hostname, tmp_directory) - ), + dir_values = ( + await asyncio.gather( + *( + asyncio.create_task( + self._get_existing_parent(location_obj.hostname, directory) + ) + for directory in directories + ) + ) + if directories + else [] ) hardware = await self._get_location_hardware( location=location_obj.hostname, - input_directory=inpdir, - output_directory=outdir, - tmp_directory=tmpdir, + directories=[str(posix_dir) for posix_dir in dir_values], ) locations[location_obj.hostname] = AvailableLocation( name=location_obj.hostname, diff --git a/streamflow/deployment/future.py b/streamflow/deployment/future.py index be07fe421..da1b75fd6 100644 --- a/streamflow/deployment/future.py +++ b/streamflow/deployment/future.py @@ -126,9 +126,7 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: if self.connector is None: if not self.deploying: @@ -137,10 +135,7 @@ async def get_available_locations( else: await self._safe_deploy_event_wait() return await self.connector.get_available_locations( - service=service, - input_directory=input_directory, - output_directory=output_directory, - tmp_directory=tmp_directory, + service=service, directories=directories ) async def get_stream_reader( diff --git a/streamflow/deployment/wrapper.py b/streamflow/deployment/wrapper.py index ea83bc92e..61eee74f0 100644 --- a/streamflow/deployment/wrapper.py +++ b/streamflow/deployment/wrapper.py @@ -76,15 +76,11 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: return await self.connector.get_available_locations( service=service, - input_directory=input_directory, - output_directory=output_directory, - tmp_directory=tmp_directory, + directories=directories, ) async def get_stream_reader( diff --git a/streamflow/scheduling/policy/data_locality.py b/streamflow/scheduling/policy/data_locality.py index dae75ff8e..d82254fcd 100644 --- a/streamflow/scheduling/policy/data_locality.py +++ b/streamflow/scheduling/policy/data_locality.py @@ -1,77 +1,130 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, MutableSequence from importlib_resources import files from streamflow.core.context import StreamFlowContext from streamflow.core.data import DataType from streamflow.core.exception import WorkflowExecutionException -from streamflow.core.scheduling import Hardware, JobAllocation, Policy +from streamflow.core.scheduling import ( + JobAllocation, + Policy, + JobContext, + sum_job_req, + diff_hw, + greater_eq_hw, + JobHardwareRequirement, +) +from streamflow.core.workflow import Status from streamflow.workflow.token import FileToken if TYPE_CHECKING: from streamflow.core.scheduling import AvailableLocation, LocationAllocation - from streamflow.core.workflow import Job from typing import MutableMapping class DataLocalityPolicy(Policy): + + def _is_valid( + self, + location: AvailableLocation, + job_context: JobContext, + used_hardware: JobHardwareRequirement, + num_scheduled_jobs: int, + ) -> bool: + # If location is segmentable and job provides requirements, compute the used amount of locations + if ( + location.hardware is not None + and job_context.hardware_requirement is not None + ): + available_hardware = diff_hw(location.hardware, used_hardware) + return greater_eq_hw(available_hardware, used_hardware) + # If location is segmentable but job does not provide requirements, treat it as null-weighted + elif location.hardware is not None: + return True + # Otherwise, simply compute the number of allocated slots + else: + return num_scheduled_jobs < location.slots + async def get_location( self, context: StreamFlowContext, - job: Job, - hardware_requirement: Hardware, + pending_jobs: MutableSequence[JobContext], available_locations: MutableMapping[str, AvailableLocation], - jobs: MutableMapping[str, JobAllocation], + scheduled_jobs: MutableSequence[JobAllocation], locations: MutableMapping[str, MutableMapping[str, LocationAllocation]], - ) -> AvailableLocation | None: - valid_locations = list(available_locations.keys()) - deployments = {loc.deployment for loc in available_locations.values()} - if len(deployments) > 1: - raise WorkflowExecutionException( - f"Available locations coming from multiple deployments: {deployments}" + ) -> MutableMapping[str, AvailableLocation]: + job_candidates = {} + running_jobs = list( + filter( + lambda x: (x.status == Status.RUNNING or x.status == Status.FIREABLE), + scheduled_jobs, ) - # For each input token sorted by weight - weights = { - k: v - for k, v in zip( - job.inputs, - await asyncio.gather( - *( - asyncio.create_task(t.get_weight(context)) - for t in job.inputs.values() - ) - ), - ) - } - for _, token in sorted( - job.inputs.items(), key=lambda item: weights[item[0]], reverse=True - ): - related_locations = set() - # For FileTokens, retrieve related locations - if isinstance(token, FileToken): - for path in await token.get_paths(context): - related_locations.update( - [ - loc.name - for loc in context.data_manager.get_data_locations( - path=path, - deployment=next(iter(deployments)), - data_type=DataType.PRIMARY, - ) - ] - ) - # Check if one of the related locations is free - for current_location in related_locations: - if current_location in valid_locations: - return available_locations[current_location] - # If a data-related allocation is not possible, assign a location among the remaining free ones - for location in valid_locations: - return available_locations[location] - # If there are no available locations, return None - return None + ) + used_hardware = sum_job_req(j.hardware for j in running_jobs if j.hardware) + num_scheduled_jobs = len(running_jobs) + for job_context in pending_jobs: + job = job_context.job + locations = {} + for k, loc in available_locations.items(): + if self._is_valid(loc, job_context, used_hardware, num_scheduled_jobs): + locations[k] = loc + num_scheduled_jobs += 1 + if job_context.hardware_requirement: + used_hardware = sum_job_req( + [used_hardware, job_context.hardware_requirement] + ) + valid_locations = list(locations.keys()) + deployments = {loc.deployment for loc in locations.values()} + if len(deployments) > 1: + raise WorkflowExecutionException( + f"Available locations coming from multiple deployments: {deployments}" + ) + # For each input token sorted by weight + weights = { + k: v + for k, v in zip( + job.inputs, + await asyncio.gather( + *( + asyncio.create_task(t.get_weight(context)) + for t in job.inputs.values() + ) + ), + ) + } + for _, token in sorted( + job.inputs.items(), key=lambda item: weights[item[0]], reverse=True + ): + related_locations = set() + # For FileTokens, retrieve related locations + if isinstance(token, FileToken): + for path in await token.get_paths(context): + related_locations.update( + [ + loc.name + for loc in context.data_manager.get_data_locations( + path=path, + deployment=next(iter(deployments)), + data_type=DataType.PRIMARY, + ) + ] + ) + # Check if one of the related locations is free + for current_location in related_locations: + if current_location in valid_locations: + job_candidates[job.name] = available_locations[current_location] + break + if job.name in job_candidates: + break + # If a data-related allocation is not possible, assign a location among the remaining free ones + for location in valid_locations: + job_candidates[job.name] = available_locations[location] + break + # If there are no available locations, return None + return job_candidates @classmethod def get_schema(cls) -> str: diff --git a/streamflow/scheduling/scheduler.py b/streamflow/scheduling/scheduler.py index cb1c0b544..a8c98ac7b 100644 --- a/streamflow/scheduling/scheduler.py +++ b/streamflow/scheduling/scheduler.py @@ -15,11 +15,12 @@ Target, ) from streamflow.core.scheduling import ( - Hardware, JobAllocation, LocationAllocation, Policy, Scheduler, + JobContext, + JobHardwareRequirement, ) from streamflow.core.workflow import Job, Status from streamflow.deployment.connector import LocalConnector @@ -33,31 +34,26 @@ from typing import MutableMapping -class JobContext: - __slots__ = ("job", "lock", "scheduled") - - def __init__(self, job: Job): - self.job: Job = job - self.lock: asyncio.Lock = asyncio.Lock() - self.scheduled: bool = False - - class DefaultScheduler(Scheduler): def __init__( self, context: StreamFlowContext, retry_delay: int | None = None ) -> None: super().__init__(context) - self.allocation_groups: MutableMapping[str, MutableSequence[Job]] = {} self.binding_filter_map: MutableMapping[str, BindingFilter] = {} + self.pending_jobs: MutableMapping[str, MutableSequence[JobContext]] = {} + self.pending_jobs_conditional = asyncio.Condition() + self.pending_job_event = asyncio.Event() self.policy_map: MutableMapping[str, Policy] = {} self.retry_interval: int | None = retry_delay if retry_delay != 0 else None self.scheduling_groups: MutableMapping[str, MutableSequence[str]] = {} - self.wait_queues: MutableMapping[str, asyncio.Condition] = {} + self.scheduling_task: asyncio.Task = asyncio.create_task( + self._scheduling_task() + ) def _allocate_job( self, job: Job, - hardware: Hardware, + hardware: JobHardwareRequirement, selected_locations: MutableSequence[ExecutionLocation], target: Target, ): @@ -89,7 +85,7 @@ def _allocate_job( target=target, locations=selected_locations, status=Status.FIREABLE, - hardware=hardware or Hardware(), + hardware=hardware or None, ) for loc in selected_locations: if loc not in self.location_allocations: @@ -133,222 +129,157 @@ def _get_binding_filter(self, config: FilterConfig): ) return self.binding_filter_map[config.name] - async def _get_locations( + async def _get_jobs_to_schedule( self, - job: Job, - hardware_requirement: Hardware, - locations: int, scheduling_policy: Policy, - available_locations: MutableMapping[str, AvailableLocation], - ) -> MutableSequence[ExecutionLocation] | None: - selected_locations = [] - for _ in range(locations): - selected_location = await scheduling_policy.get_location( - context=self.context, - job=job, - hardware_requirement=hardware_requirement, - available_locations=available_locations, - jobs=self.job_allocations, - locations=self.location_allocations, - ) - if selected_location is not None: - selected_locations.append(selected_location) - available_locations = { - k: v - for k, v in available_locations.items() - if v != selected_location - } - else: - return None - return [loc.location for loc in selected_locations] + job_contexts: MutableSequence[JobContext], + valid_locations: MutableMapping[str, AvailableLocation], + ) -> MutableMapping[str, MutableSequence[ExecutionLocation]]: + scheduled_jobs = [] + for valid_location in valid_locations.values(): + if valid_location.name in self.location_allocations.get( + valid_location.deployment, {} + ): + for job_name in self.location_allocations[valid_location.deployment][ + valid_location.name + ].jobs: + scheduled_jobs.append(self.job_allocations[job_name]) + + jobs_to_schedule = await scheduling_policy.get_location( + context=self.context, + pending_jobs=job_contexts, + available_locations=valid_locations, + scheduled_jobs=scheduled_jobs, + locations=self.location_allocations, + ) + return { + job_name: [available_location.location] + for job_name, available_location in jobs_to_schedule.items() + } def _get_policy(self, config: Config): if config.name not in self.policy_map: self.policy_map[config.name] = policy_classes[config.type](**config.config) return self.policy_map[config.name] - def _is_valid( - self, location: AvailableLocation, hardware_requirement: Hardware - ) -> bool: - if location.name in self.location_allocations.get(location.deployment, {}): - running_jobs = list( - filter( - lambda x: ( - self.job_allocations[x].status == Status.RUNNING - or self.job_allocations[x].status == Status.FIREABLE - ), - self.location_allocations[location.deployment][location.name].jobs, - ) + async def _get_available_locations( + self, target: Target, job_contexts: MutableSequence[JobContext] + ) -> MutableMapping[str, AvailableLocation]: + available_locations = {} + for job_context in job_contexts: + directories = { + job_context.job.input_directory or target.workdir, + job_context.job.tmp_directory or target.workdir, + job_context.job.output_directory or target.workdir, + } + connector = self.context.deployment_manager.get_connector( + target.deployment.name ) - else: - running_jobs = [] - # If location is segmentable and job provides requirements, compute the used amount of locations - if location.hardware is not None and hardware_requirement is not None: - used_hardware = sum( - (self.job_allocations[j].hardware for j in running_jobs), - start=hardware_requirement.__class__(), + available_locations = await connector.get_available_locations( + service=target.service, directories=list(directories) ) - if (location.hardware - used_hardware) >= hardware_requirement: - return True - else: - return False - # If location is segmentable but job does not provide requirements, treat it as null-weighted - elif location.hardware is not None: - return True - # Otherwise, simply compute the number of allocated slots - else: - return len(running_jobs) < location.slots + return available_locations - async def _process_target( - self, target: Target, job_context: JobContext, hardware_requirement: Hardware - ): - deployment = target.deployment.name - if deployment not in self.wait_queues: - self.wait_queues[deployment] = asyncio.Condition() - async with self.wait_queues[deployment]: + async def _scheduling_task(self): + try: while True: - async with job_context.lock: - if job_context.scheduled: - return - connector = self.context.deployment_manager.get_connector( - deployment - ) + # Events that awake the scheduling task: + # - there is a new job to schedule + # - some resources are released + await self.pending_job_event.wait() + async with self.pending_jobs_conditional: + self.pending_job_event.clear() + for deployment_name, job_contexts in self.pending_jobs.items(): if logger.isEnabledFor(logging.DEBUG): logger.debug( - "Retrieving available locations for job {} on {}.".format( - job_context.job.name, - ( - posixpath.join(deployment, target.service) - if target.service - else deployment - ), - ) - ) - available_locations = dict( - await connector.get_available_locations( - service=target.service, - input_directory=job_context.job.input_directory - or target.workdir, - output_directory=job_context.job.output_directory - or target.workdir, - tmp_directory=job_context.job.tmp_directory - or target.workdir, + f"Checking jobs scheduling on deployment {deployment_name}" ) + + # deployments = { + # target.deployment + # for job_context in self.pending_jobs + # for target in job_context.targets + # } + # for deployment in deployments: + # targets = { + # target + # for job_context in self.pending_jobs + # for target in job_context.targets + # if target.deployment == deployment + # } + + if not job_contexts: + continue + target = next( + target + for job_context in job_contexts + for target in job_context.targets + if target.deployment.name == deployment_name ) - valid_locations = { - k: loc - for k, loc in available_locations.items() - if self._is_valid( - location=loc, hardware_requirement=hardware_requirement - ) - } - if valid_locations: - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - "Available locations for job {} on {} are {}.".format( - job_context.job.name, - ( - posixpath.join(deployment, target.service) - if target.service - else deployment - ), - list(valid_locations.keys()), - ) - ) - if target.scheduling_group is not None: - if target.scheduling_group not in self.allocation_groups: - self.allocation_groups[target.scheduling_group] = [] - self.allocation_groups[target.scheduling_group].append( - job_context.job - ) - group_size = len( - self.scheduling_groups[target.scheduling_group] - ) - if ( - len( - self.allocation_groups.get( - target.scheduling_group, [] - ) - ) - == group_size - ): - allocated_jobs = [] - for j in self.allocation_groups[ - target.scheduling_group - ]: - selected_locations = await self._get_locations( - job=job_context.job, - hardware_requirement=hardware_requirement, - locations=target.locations, - scheduling_policy=self._get_policy( - target.scheduling_policy + valid_locations = await self._get_available_locations( + target, job_contexts + ) + + scheduling_policy = self._get_policy(target.deployment.policy) + jobs_to_schedule = await self._get_jobs_to_schedule( + scheduling_policy, + job_contexts, + valid_locations, + ) + if logger.isEnabledFor(logging.DEBUG): + if jobs_to_schedule: + for job_name, locs in jobs_to_schedule.items(): + logger.debug( + "Available locations for job {} on {} are {}.".format( + job_name, + ( + posixpath.join( + deployment_name, target.service + ) + if target.service + else deployment_name ), - available_locations=valid_locations, - ) - if selected_locations is None: - break - self._allocate_job( - job=j, - hardware=hardware_requirement, - selected_locations=selected_locations, - target=target, + {str(loc) for loc in locs}, ) - allocated_jobs.append(j) - if len(allocated_jobs) < group_size: - for j in allocated_jobs: - self._deallocate_job(j.name) - else: - job_context.scheduled = True - return - else: - selected_locations = await self._get_locations( - job=job_context.job, - hardware_requirement=hardware_requirement, - locations=target.locations, - scheduling_policy=self._get_policy( - target.scheduling_policy - ), - available_locations=valid_locations, - ) - if selected_locations is not None: - self._allocate_job( - job=job_context.job, - hardware=hardware_requirement, - selected_locations=selected_locations, - target=target, ) - job_context.scheduled = True - return - else: - if logger.isEnabledFor(logging.DEBUG): + else: logger.debug( "No location available for job {} on deployment {}.".format( - job_context.job.name, + [ + job_context.job.name + for job_context in job_contexts + ], ( - posixpath.join(deployment, target.service) + posixpath.join(deployment_name, target.service) if target.service - else deployment + else deployment_name ), ) ) - try: - await asyncio.wait_for( - self.wait_queues[deployment].wait(), timeout=self.retry_interval - ) - except TimeoutError: - if logger.isEnabledFor(logging.DEBUG): - target_name = ( - "/".join([target.deployment.name, target.service]) - if target.service is not None - else target.deployment.name + + for job_name, locs in jobs_to_schedule.items(): + job_context = next( + job_context + for job_context in job_contexts + if job_context.job.name == job_name ) - logger.debug( - f"No locations available for job {job_context.job.name} " - f"in target {target_name}. Waiting {self.retry_interval} seconds." + self.pending_jobs[deployment_name].remove(job_context) + self._allocate_job( + job_context.job, + job_context.hardware_requirement, + locs, + job_context.targets[0], ) + job_context.event.set() + # todo: fix job with multi-targets case + except Exception as e: + # todo: propagate error to context (?) + logger.exception(f"Scheduler failed: {e}") + await self.context.close() + raise async def close(self): - pass + self.scheduling_task.cancel() @classmethod def get_schema(cls) -> str: @@ -360,42 +291,35 @@ def get_schema(cls) -> str: ) async def notify_status(self, job_name: str, status: Status) -> None: - connector = self.get_connector(job_name) - if connector: - if connector.deployment_name in self.wait_queues: - async with self.wait_queues[connector.deployment_name]: - if job_name in self.job_allocations: - if status != self.job_allocations[job_name].status: - self.job_allocations[job_name].status = status - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - f"Job {job_name} changed status to {status.name}" - ) - if status in [Status.COMPLETED, Status.FAILED]: - self.wait_queues[connector.deployment_name].notify_all() + if job_name in self.job_allocations: + if status != self.job_allocations[job_name].status: + self.job_allocations[job_name].status = status + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Job {job_name} changed status to {status.name}") + + # Notify scheduling task: there are free resources + async with self.pending_jobs_conditional: + self.pending_job_event.set() async def schedule( - self, job: Job, binding_config: BindingConfig, hardware_requirement: Hardware + self, + job: Job, + binding_config: BindingConfig, + hardware_requirement: JobHardwareRequirement, ) -> None: - job_context = JobContext(job) + logger.info(f"Adding job {job.name} in pending jobs to schedule") targets = list(binding_config.targets) for f in (self._get_binding_filter(f) for f in binding_config.filters): targets = await f.get_targets(job, targets) - wait_tasks = [ - asyncio.create_task( - self._process_target( - target=target, - job_context=job_context, - hardware_requirement=hardware_requirement, - ) - ) - for target in targets - ] - # Capture finished tasks and call result() to check for exceptions - finished, _ = await asyncio.wait( - wait_tasks, return_when=asyncio.FIRST_COMPLETED - ) - for task in finished: - if task.cancelled(): - continue - task.result() + job_context = JobContext(job, targets, hardware_requirement) + for target in targets: + deployment = target.deployment + self.pending_jobs.setdefault(deployment.name, []).append(job_context) + + # Notify scheduling task: there is a job to schedule + async with self.pending_jobs_conditional: + self.pending_job_event.set() + + # Wait the job is scheduled + await job_context.event.wait() + logger.info(f"Job {job.name} scheduled") diff --git a/tests/conftest.py b/tests/conftest.py index b7ff35b3c..756fd707a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -99,8 +99,7 @@ async def context(chosen_deployment_types) -> StreamFlowContext: await _context.deployment_manager.deploy(config) yield _context await _context.deployment_manager.undeploy_all() - # Close the database connection - await _context.database.close() + await _context.close() @pytest.fixture(scope="session") diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index ed7290bbd..ec098bb73 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -6,7 +6,7 @@ import pytest_asyncio from streamflow.core import utils -from streamflow.core.config import BindingConfig, Config +from streamflow.core.config import BindingConfig # Config from streamflow.core.context import StreamFlowContext from streamflow.core.deployment import ( DeploymentConfig, @@ -14,14 +14,14 @@ LocalTarget, Target, ) -from streamflow.core.scheduling import Hardware +from streamflow.core.scheduling import JobHardwareRequirement, DeploymentHardware from streamflow.core.workflow import Job, Status from tests.utils.connector import ParameterizableHardwareConnector from tests.utils.deployment import ( get_docker_deployment_config, get_service, get_deployment_config, - ReverseTargetsBindingFilter, + # ReverseTargetsBindingFilter, ) @@ -51,7 +51,9 @@ async def test_scheduling( output_directory=utils.random_name(), tmp_directory=utils.random_name(), ) - hardware_requirement = Hardware(cores=1) + hardware_requirement = JobHardwareRequirement( + cores=1, memory=100, tmp_dir_size=0, out_dir_size=0 + ) target = Target( deployment=deployment_config, service=service, @@ -68,12 +70,10 @@ async def test_scheduling( async def test_single_env_few_resources(context: StreamFlowContext): """Test scheduling two jobs on single environment but with resources for one job at a time.""" - machine_hardware = Hardware( + machine_hardware = DeploymentHardware( cores=1, memory=100, - input_directory=100, - output_directory=100, - tmp_directory=100, + storage={}, ) # inject custom connector to manipulate available resources @@ -100,7 +100,9 @@ async def test_single_env_few_resources(context: StreamFlowContext): tmp_directory=utils.random_name(), ) ) - hardware_requirement = Hardware(cores=1) + hardware_requirement = JobHardwareRequirement( + cores=1, memory=100, tmp_dir_size=0, out_dir_size=0 + ) local_target = LocalTarget() binding_config = BindingConfig(targets=[local_target]) task_pending = [ @@ -145,13 +147,7 @@ async def test_single_env_few_resources(context: StreamFlowContext): async def test_single_env_enough_resources(context: StreamFlowContext): """Test scheduling two jobs on a single environment with resources for all jobs together.""" - machine_hardware = Hardware( - cores=2, - memory=100, - input_directory=100, - output_directory=100, - tmp_directory=100, - ) + machine_hardware = DeploymentHardware(cores=2, memory=200, storage={}) # Inject custom connector to manipulate available resources conn = context.deployment_manager.get_connector(LOCAL_LOCATION) @@ -177,7 +173,9 @@ async def test_single_env_enough_resources(context: StreamFlowContext): tmp_directory=utils.random_name(), ) ) - hardware_requirement = Hardware(cores=1) + hardware_requirement = JobHardwareRequirement( + cores=1, memory=100, tmp_dir_size=0, out_dir_size=0 + ) local_target = LocalTarget() binding_config = BindingConfig(targets=[local_target]) task_pending = [ @@ -212,7 +210,7 @@ async def test_multi_env(context: StreamFlowContext): """Test scheduling two jobs on two different environments.""" # Inject custom connector to manipulate available resources - machine_hardware = Hardware(cores=1) + machine_hardware = DeploymentHardware(cores=10, memory=200, storage={}) conn = context.deployment_manager.get_connector(LOCAL_LOCATION) context.deployment_manager.deployments_map[LOCAL_LOCATION] = ( ParameterizableHardwareConnector( @@ -245,7 +243,9 @@ async def test_multi_env(context: StreamFlowContext): BindingConfig(targets=[local_target] if i == 0 else [docker_target]), ) ) - hardware_requirement = Hardware(cores=1) + hardware_requirement = JobHardwareRequirement( + cores=1, memory=100, tmp_dir_size=0, out_dir_size=0 + ) task_pending = [ asyncio.create_task( context.scheduler.schedule(job, binding_config, hardware_requirement) @@ -273,198 +273,203 @@ async def test_multi_env(context: StreamFlowContext): assert context.scheduler.job_allocations[j.name].status == Status.COMPLETED -@pytest.mark.asyncio -async def test_multi_targets_one_job(context: StreamFlowContext): - """Test scheduling one jobs with two targets: Local and Docker Image. The job will be scheduled in the first""" - - # Inject custom connector to manipulate available resources - machine_hardware = Hardware(cores=1) - conn = context.deployment_manager.get_connector(LOCAL_LOCATION) - context.deployment_manager.deployments_map[LOCAL_LOCATION] = ( - ParameterizableHardwareConnector( - deployment_name=conn.deployment_name, - config_dir=conn.config_dir, - transferBufferSize=conn.transferBufferSize, - hardware=machine_hardware, - ) - ) - - # Create fake job with two targets and schedule it - job = Job( - name=utils.random_name(), - workflow_id=0, - inputs={}, - input_directory=utils.random_name(), - output_directory=utils.random_name(), - tmp_directory=utils.random_name(), - ) - local_target = LocalTarget() - docker_target = Target( - deployment=get_docker_deployment_config(), - service="test-multi-targ-1", - workdir=utils.random_name(), - ) - binding_config = BindingConfig(targets=[local_target, docker_target]) - - hardware_requirement = Hardware(cores=1) - task_pending = [ - asyncio.create_task( - context.scheduler.schedule(job, binding_config, hardware_requirement) - ) - ] - assert len(task_pending) == 1 - - # Available resources to schedule the job on the first target (timeout parameter useful if a deadlock occurs) - _, task_pending = await asyncio.wait( - task_pending, return_when=asyncio.FIRST_COMPLETED, timeout=60 - ) - assert len(task_pending) == 0 - assert context.scheduler.job_allocations[job.name].status == Status.FIREABLE - - # Check if it has been scheduled into the first target - assert ( - context.scheduler.job_allocations[job.name].target.deployment.name - == LOCAL_LOCATION - ) - - # Job changes status to RUNNING - await context.scheduler.notify_status(job.name, Status.RUNNING) - assert context.scheduler.job_allocations[job.name].status == Status.RUNNING - - # Job changes status to COMPLETED - await context.scheduler.notify_status(job.name, Status.COMPLETED) - assert context.scheduler.job_allocations[job.name].status == Status.COMPLETED - - -@pytest.mark.asyncio -async def test_multi_targets_two_jobs(context: StreamFlowContext): - """ - Test scheduling two jobs with two same targets: Local and Docker Image. - The first job will be scheduled in the local target and the second job in the docker target because the local resources will be full. - """ - - # Inject custom connector to manipulate available resources - machine_hardware = Hardware(cores=1) - conn = context.deployment_manager.get_connector(LOCAL_LOCATION) - context.deployment_manager.deployments_map[LOCAL_LOCATION] = ( - ParameterizableHardwareConnector( - deployment_name=conn.deployment_name, - config_dir=conn.config_dir, - transferBufferSize=conn.transferBufferSize, - hardware=machine_hardware, - ) - ) - - # Create fake jobs with two same targets and schedule them - jobs = [] - for _ in range(2): - jobs.append( - Job( - name=utils.random_name(), - workflow_id=0, - inputs={}, - input_directory=utils.random_name(), - output_directory=utils.random_name(), - tmp_directory=utils.random_name(), - ) - ) - local_target = LocalTarget() - docker_target = Target( - deployment=get_docker_deployment_config(), - service="test-multi-targ-2", - workdir=utils.random_name(), - ) - binding_config = BindingConfig(targets=[local_target, docker_target]) - - hardware_requirement = Hardware(cores=1) - task_pending = [ - asyncio.create_task( - context.scheduler.schedule(job, binding_config, hardware_requirement) - ) - for job in jobs - ] - assert len(task_pending) == 2 - - # Available resources to schedule the jobs on the two targets (timeout parameter useful if a deadlock occurs) - _, task_pending = await asyncio.wait( - task_pending, return_when=asyncio.ALL_COMPLETED, timeout=60 - ) - assert len(task_pending) == 0 - for j in jobs: - assert context.scheduler.job_allocations[j.name].status == Status.FIREABLE - - # Check if they have been scheduled into the right targets - assert ( - context.scheduler.job_allocations[jobs[0].name].target.deployment.name - == LOCAL_LOCATION - ) - assert ( - context.scheduler.job_allocations[jobs[1].name].target.deployment.name - == get_docker_deployment_config().name - ) - - # Jobs change status to RUNNING - for j in jobs: - await context.scheduler.notify_status(j.name, Status.RUNNING) - assert context.scheduler.job_allocations[j.name].status == Status.RUNNING - - # Jobs change status to COMPLETED - for j in jobs: - await context.scheduler.notify_status(j.name, Status.COMPLETED) - assert context.scheduler.job_allocations[j.name].status == Status.COMPLETED - - -@pytest.mark.asyncio -async def test_binding_filter(context: StreamFlowContext): - """Test Binding Filter using a job with two targets both free. With the CustomBindingFilter the scheduling will choose the second target""" - job = Job( - name=utils.random_name(), - workflow_id=0, - inputs={}, - input_directory=utils.random_name(), - output_directory=utils.random_name(), - tmp_directory=utils.random_name(), - ) - local_target = LocalTarget() - docker_target = Target( - deployment=get_docker_deployment_config(), - service="test-binding-target", - workdir=utils.random_name(), - ) - - # Inject custom filter that returns the targets list backwards - filter_config_name = "custom" - filter_config = Config(name=filter_config_name, type="shuffle", config={}) - binding_config = BindingConfig( - targets=[local_target, docker_target], filters=[filter_config] - ) - context.scheduler.binding_filter_map[filter_config_name] = ( - ReverseTargetsBindingFilter() - ) - - # Schedule the job - task_pending = [ - asyncio.create_task(context.scheduler.schedule(job, binding_config, None)) - ] - assert len(task_pending) == 1 - - # Both targets are available for scheduling (timeout parameter useful if a deadlock occurs) - _, task_pending = await asyncio.wait( - task_pending, return_when=asyncio.FIRST_COMPLETED, timeout=60 - ) - assert len(task_pending) == 0 - assert context.scheduler.job_allocations[job.name].status == Status.FIREABLE - - # Check if the job has been scheduled into the second target - assert ( - context.scheduler.job_allocations[job.name].target.deployment.name - == get_docker_deployment_config().name - ) - - # Job changes status to RUNNING - await context.scheduler.notify_status(job.name, Status.RUNNING) - assert context.scheduler.job_allocations[job.name].status == Status.RUNNING - - # Job changes status to COMPLETED - await context.scheduler.notify_status(job.name, Status.COMPLETED) - assert context.scheduler.job_allocations[job.name].status == Status.COMPLETED +# todo: decomment when the multi targets feature works +# @pytest.mark.asyncio +# async def test_multi_targets_one_job(context: StreamFlowContext): +# """Test scheduling one jobs with two targets: Local and Docker Image. The job will be scheduled in the first""" +# +# # Inject custom connector to manipulate available resources +# machine_hardware = DeploymentHardware(cores=1, memory=100, storage={}) +# conn = context.deployment_manager.get_connector(LOCAL_LOCATION) +# context.deployment_manager.deployments_map[LOCAL_LOCATION] = ( +# ParameterizableHardwareConnector( +# deployment_name=conn.deployment_name, +# config_dir=conn.config_dir, +# transferBufferSize=conn.transferBufferSize, +# hardware=machine_hardware, +# ) +# ) +# +# # Create fake job with two targets and schedule it +# job = Job( +# name=utils.random_name(), +# workflow_id=0, +# inputs={}, +# input_directory=utils.random_name(), +# output_directory=utils.random_name(), +# tmp_directory=utils.random_name(), +# ) +# local_target = LocalTarget() +# docker_target = Target( +# deployment=get_docker_deployment_config(), +# service="test-multi-targ-1", +# workdir=utils.random_name(), +# ) +# binding_config = BindingConfig(targets=[local_target, docker_target]) +# +# hardware_requirement = JobHardwareRequirement( +# cores=1, memory=100, tmp_dir_size=0, out_dir_size=0 +# ) +# task_pending = [ +# asyncio.create_task( +# context.scheduler.schedule(job, binding_config, hardware_requirement) +# ) +# ] +# assert len(task_pending) == 1 +# +# # Available resources to schedule the job on the first target (timeout parameter useful if a deadlock occurs) +# _, task_pending = await asyncio.wait( +# task_pending, return_when=asyncio.FIRST_COMPLETED, timeout=60 +# ) +# assert len(task_pending) == 0 +# assert context.scheduler.job_allocations[job.name].status == Status.FIREABLE +# +# # Check if it has been scheduled into the first target +# assert ( +# context.scheduler.job_allocations[job.name].target.deployment.name +# == LOCAL_LOCATION +# ) +# +# # Job changes status to RUNNING +# await context.scheduler.notify_status(job.name, Status.RUNNING) +# assert context.scheduler.job_allocations[job.name].status == Status.RUNNING +# +# # Job changes status to COMPLETED +# await context.scheduler.notify_status(job.name, Status.COMPLETED) +# assert context.scheduler.job_allocations[job.name].status == Status.COMPLETED +# +# +# @pytest.mark.asyncio +# async def test_multi_targets_two_jobs(context: StreamFlowContext): +# """ +# Test scheduling two jobs with two same targets: Local and Docker Image. +# The first job will be scheduled in the local target and the second job in the docker target because the local resources will be full. +# """ +# +# # Inject custom connector to manipulate available resources +# machine_hardware = DeploymentHardware(cores=1, memory=100, storage={}) +# conn = context.deployment_manager.get_connector(LOCAL_LOCATION) +# context.deployment_manager.deployments_map[LOCAL_LOCATION] = ( +# ParameterizableHardwareConnector( +# deployment_name=conn.deployment_name, +# config_dir=conn.config_dir, +# transferBufferSize=conn.transferBufferSize, +# hardware=machine_hardware, +# ) +# ) +# +# # Create fake jobs with two same targets and schedule them +# jobs = [] +# for _ in range(2): +# jobs.append( +# Job( +# name=utils.random_name(), +# workflow_id=0, +# inputs={}, +# input_directory=utils.random_name(), +# output_directory=utils.random_name(), +# tmp_directory=utils.random_name(), +# ) +# ) +# local_target = LocalTarget() +# docker_target = Target( +# deployment=get_docker_deployment_config(), +# service="test-multi-targ-2", +# workdir=utils.random_name(), +# ) +# binding_config = BindingConfig(targets=[local_target, docker_target]) +# +# hardware_requirement = JobHardwareRequirement( +# cores=1, memory=100, tmp_dir_size=0, out_dir_size=0 +# ) +# task_pending = [ +# asyncio.create_task( +# context.scheduler.schedule(job, binding_config, hardware_requirement) +# ) +# for job in jobs +# ] +# assert len(task_pending) == 2 +# +# # Available resources to schedule the jobs on the two targets (timeout parameter useful if a deadlock occurs) +# _, task_pending = await asyncio.wait( +# task_pending, return_when=asyncio.ALL_COMPLETED, timeout=60 +# ) +# assert len(task_pending) == 0 +# for j in jobs: +# assert context.scheduler.job_allocations[j.name].status == Status.FIREABLE +# +# # Check if they have been scheduled into the right targets +# assert ( +# context.scheduler.job_allocations[jobs[0].name].target.deployment.name +# == LOCAL_LOCATION +# ) +# assert ( +# context.scheduler.job_allocations[jobs[1].name].target.deployment.name +# == get_docker_deployment_config().name +# ) +# +# # Jobs change status to RUNNING +# for j in jobs: +# await context.scheduler.notify_status(j.name, Status.RUNNING) +# assert context.scheduler.job_allocations[j.name].status == Status.RUNNING +# +# # Jobs change status to COMPLETED +# for j in jobs: +# await context.scheduler.notify_status(j.name, Status.COMPLETED) +# assert context.scheduler.job_allocations[j.name].status == Status.COMPLETED +# +# +# @pytest.mark.asyncio +# async def test_binding_filter(context: StreamFlowContext): +# """Test Binding Filter using a job with two targets both free. With the CustomBindingFilter the scheduling will choose the second target""" +# job = Job( +# name=utils.random_name(), +# workflow_id=0, +# inputs={}, +# input_directory=utils.random_name(), +# output_directory=utils.random_name(), +# tmp_directory=utils.random_name(), +# ) +# local_target = LocalTarget() +# docker_target = Target( +# deployment=get_docker_deployment_config(), +# service="test-binding-target", +# workdir=utils.random_name(), +# ) +# +# # Inject custom filter that returns the targets list backwards +# filter_config_name = "custom" +# filter_config = Config(name=filter_config_name, type="shuffle", config={}) +# binding_config = BindingConfig( +# targets=[local_target, docker_target], filters=[filter_config] +# ) +# context.scheduler.binding_filter_map[filter_config_name] = ( +# ReverseTargetsBindingFilter() +# ) +# +# # Schedule the job +# task_pending = [ +# asyncio.create_task(context.scheduler.schedule(job, binding_config, None)) +# ] +# assert len(task_pending) == 1 +# +# # Both targets are available for scheduling (timeout parameter useful if a deadlock occurs) +# _, task_pending = await asyncio.wait( +# task_pending, return_when=asyncio.FIRST_COMPLETED, timeout=60 +# ) +# assert len(task_pending) == 0 +# assert context.scheduler.job_allocations[job.name].status == Status.FIREABLE +# +# # Check if the job has been scheduled into the second target +# assert ( +# context.scheduler.job_allocations[job.name].target.deployment.name +# == get_docker_deployment_config().name +# ) +# +# # Job changes status to RUNNING +# await context.scheduler.notify_status(job.name, Status.RUNNING) +# assert context.scheduler.job_allocations[job.name].status == Status.RUNNING +# +# # Job changes status to COMPLETED +# await context.scheduler.notify_status(job.name, Status.COMPLETED) +# assert context.scheduler.job_allocations[job.name].status == Status.COMPLETED diff --git a/tests/test_schema.py b/tests/test_schema.py index 4ebfdefa5..c008d7ba0 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -83,7 +83,6 @@ def test_ext_support(): "deployment": "example", "service": "example", "locations": 2, - "policy": "data_locality", "workdir": "/path/to/workdir", }, } diff --git a/tests/utils/connector.py b/tests/utils/connector.py index 55f54266a..b5013a394 100644 --- a/tests/utils/connector.py +++ b/tests/utils/connector.py @@ -9,7 +9,7 @@ LOCAL_LOCATION, ExecutionLocation, ) -from streamflow.core.scheduling import AvailableLocation, Hardware +from streamflow.core.scheduling import AvailableLocation, DeploymentHardware from streamflow.deployment.connector import LocalConnector from streamflow.log_handler import logger @@ -60,9 +60,7 @@ async def deploy(self, external: bool) -> None: async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: raise FailureConnectorException("FailureConnector get_available_locations") @@ -95,7 +93,7 @@ def __init__( self, deployment_name: str, config_dir: str, - hardware: Hardware, + hardware: DeploymentHardware, transferBufferSize: int = 2**16, ): super().__init__(deployment_name, config_dir, transferBufferSize) @@ -104,9 +102,7 @@ def __init__( async def get_available_locations( self, service: str | None = None, - input_directory: str | None = None, - output_directory: str | None = None, - tmp_directory: str | None = None, + directories: MutableSequence[str] | None = None, ) -> MutableMapping[str, AvailableLocation]: return { LOCAL_LOCATION: AvailableLocation(