diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index a18f44b97..981e33f31 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -157,7 +157,7 @@ jobs: python -m pip install -r docs/requirements.txt - name: "Build documentation and check for consistency" env: - CHECKSUM: "198f61804843130d3cae0675c67cad121d980c4648cf27c7541d87219afa3d6e" + CHECKSUM: "a827c3b92177f5ba8f4c83be1dc104651a20e086e33e7221ac1107f5ad19a89f" run: | cd docs HASH="$(make checksum | tail -n1)" diff --git a/docs/source/connector/container.rst b/docs/source/connector/container.rst index 41cd71685..5d2880b94 100644 --- a/docs/source/connector/container.rst +++ b/docs/source/connector/container.rst @@ -2,4 +2,4 @@ ContainerConnector ================== -The ``ContainerConnector`` is an abstract connector that serves as a base class to implement software container connectors (e.g., :ref:`Docker `, :ref:`Docker Compose `, and :ref:`Singularity `). It extends the abstract :ref:`ConnectorWrapper ` interface, allowing users to spawn software containers on top of local or remote execution environments using the :ref:`stacked locations ` mechanism. Plus, it prevents :ref:`BatchConnector ` instances to be wrapped as inner connectors. \ No newline at end of file +The ``ContainerConnector`` is an abstract connector that serves as a base class to implement software container connectors (e.g., :ref:`Docker `, :ref:`Docker Compose `, and :ref:`Singularity `). It extends the abstract :ref:`ConnectorWrapper ` interface, allowing users to spawn software containers on top of local or remote execution environments using the :ref:`stacked locations ` mechanism. Plus, it prevents :ref:`BatchConnector ` instances to be wrapped as inner connectors, unless the ``ContainerConnector`` is marked as ``ephemeral``. \ No newline at end of file diff --git a/docs/source/connector/docker.rst b/docs/source/connector/docker.rst index 3e0c151ea..ddf478cc8 100644 --- a/docs/source/connector/docker.rst +++ b/docs/source/connector/docker.rst @@ -2,7 +2,7 @@ DockerConnector =============== -The `Docker `_ connector can spawn one or more instances of a Docker container locally on the StreamFlow node. The units of deployment and binding for this connector correspond to the set of homogeneous container instances, while the unit of scheduling is the single instance. It extends the :ref:`ContainerConnector `, which inherits from the :ref:`ConnectorWrapper ` interface, allowing users to spawn Docker containers on top of local or remote execution environments using the :ref:`stacked locations ` mechanism. +The `Docker `_ connector can spawn one or more instances of a Docker container locally on the StreamFlow node. It extends the :ref:`ContainerConnector `, which inherits from the :ref:`ConnectorWrapper ` interface, allowing users to spawn Docker containers on top of local or remote execution environments using the :ref:`stacked locations ` mechanism. Normally, a single Docker container is reused for multiple workflow commands, reducing cold start overhead. However, when the ``ephemeral`` option is set to ``True``, a fresh container instance is spawned for each command to prevent internal state contamination. In addition, a ``ContainerConnector`` marked as ``ephemeral`` can successfully wrap :ref:`BatchConnector ` instances. .. jsonschema:: ../../../streamflow/deployment/connector/schemas/docker.json :lift_description: true \ No newline at end of file diff --git a/docs/source/connector/singularity.rst b/docs/source/connector/singularity.rst index e31e13ac0..657f4247e 100644 --- a/docs/source/connector/singularity.rst +++ b/docs/source/connector/singularity.rst @@ -2,7 +2,7 @@ SingularityConnector ===================== -The `Singularity `_ connector can spawn one or more instances of a Singularity container locally on the StreamFlow node. The units of deployment and binding for this connector correspond to the set of homogeneous container instances, while the unit of scheduling is the single instance. It extends the :ref:`ContainerConnector `, which inherits from the :ref:`ConnectorWrapper ` interface, allowing users to spawn Singularity containers on top of local or remote execution environments using the :ref:`stacked locations ` mechanism. +The `Singularity `_ connector can spawn one or more instances of a Singularity container locally on the StreamFlow node. It extends the :ref:`ContainerConnector `, which inherits from the :ref:`ConnectorWrapper ` interface, allowing users to spawn Singularity containers on top of local or remote execution environments using the :ref:`stacked locations ` mechanism. Normally, a single Singularity instance is reused for multiple workflow commands, reducing cold start overhead. However, when the ``ephemeral`` option is set to ``True``, a fresh container instance is spawned for each command to prevent internal state contamination. In addition, a ``ContainerConnector`` marked as ``ephemeral`` can successfully wrap :ref:`BatchConnector ` instances. .. jsonschema:: ../../../streamflow/deployment/connector/schemas/singularity.json :lift_description: true \ No newline at end of file diff --git a/docs/source/cwl/docker-requirement.rst b/docs/source/cwl/docker-requirement.rst index df5e4b4f1..d91b4cf00 100644 --- a/docs/source/cwl/docker-requirement.rst +++ b/docs/source/cwl/docker-requirement.rst @@ -43,6 +43,7 @@ By default, StreamFlow automatically maps a step with the ``DockerRequirement`` type: docker config: image: node:slim + ephemeral: true StreamFlow also supports the possibility to map a CWL ``DockerRequirement`` onto different types of connectors through the :ref:`CWLDockerTranslator ` extension point. In particular, the ``docker`` section of a workflow configuration can bind each step or subworkflow to a specific translator type, making it possible to convert a pure CWL workflow with ``DockerRequirement`` features into a hybrid workflow. The available translator types are: ``docker``, ``kubernetes``, ``none`` and ``singularity``. diff --git a/docs/source/cwl/docker/docker.rst b/docs/source/cwl/docker/docker.rst index 3d7b43917..7ab466480 100644 --- a/docs/source/cwl/docker/docker.rst +++ b/docs/source/cwl/docker/docker.rst @@ -2,7 +2,7 @@ DockerCWLDockerTranslator ========================= -The Docker :ref:`CWLDockerTranslator ` instantiates a :ref:`DockerConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. +The Docker :ref:`CWLDockerTranslator ` instantiates a :ref:`DockerConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. Note that the resulting ``DockerConnector`` instance spawns ``ephemeral`` containers, making it able to wrap also :ref:`BatchConnector ` instances for HPC deployments. .. jsonschema:: ../../../../streamflow/cwl/requirement/docker/schemas/docker.json :lift_description: true diff --git a/docs/source/cwl/docker/kubernetes.rst b/docs/source/cwl/docker/kubernetes.rst index 0812b75a4..a6c9db390 100644 --- a/docs/source/cwl/docker/kubernetes.rst +++ b/docs/source/cwl/docker/kubernetes.rst @@ -2,7 +2,7 @@ KubernetesCWLDockerTranslator ============================= -The Kubernetes :ref:`CWLDockerTranslator ` instantiates a :ref:`KubernetesConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. +The Kubernetes :ref:`CWLDockerTranslator ` instantiates a :ref:`KubernetesConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. Note that, unlike other ``CWLDockerTranslator`` classes, the ``KubernetesConnector`` does not support ``ephemeral`` containers. .. jsonschema:: ../../../../streamflow/cwl/requirement/docker/schemas/kubernetes.json :lift_description: true diff --git a/docs/source/cwl/docker/singularity.rst b/docs/source/cwl/docker/singularity.rst index f0f129d01..2f0f76ad3 100644 --- a/docs/source/cwl/docker/singularity.rst +++ b/docs/source/cwl/docker/singularity.rst @@ -2,7 +2,7 @@ SingularityCWLDockerTranslator ============================== -The Singularity :ref:`CWLDockerTranslator ` instantiates a :ref:`SingularityConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. +The Singularity :ref:`CWLDockerTranslator ` instantiates a :ref:`SingularityConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. Note that the resulting ``SingularityConnector`` instance spawns ``ephemeral`` containers, making it able to wrap also :ref:`BatchConnector ` instances for HPC deployments. .. jsonschema:: ../../../../streamflow/cwl/requirement/docker/schemas/docker.json :lift_description: true diff --git a/docs/source/ext/cwl-docker-translator.rst b/docs/source/ext/cwl-docker-translator.rst index 7712f669b..6906c204a 100644 --- a/docs/source/ext/cwl-docker-translator.rst +++ b/docs/source/ext/cwl-docker-translator.rst @@ -28,7 +28,8 @@ Implementations =================================================== ================================================================ Type Class =================================================== ================================================================ -:ref:`docker ` streamflow.cwl.requirement.docker.DockerCWLDockerTranslator -:ref:`kubernetes ` streamflow.cwl.requirement.docker.KubernetesCWLDockerTranslator -:ref:`singularity ` streamflow.cwl.requirement.docker.SingularityCWLDockerTranslator +:ref:`docker ` streamflow.cwl.requirement.docker.docker.DockerCWLDockerTranslator +:ref:`kubernetes ` streamflow.cwl.requirement.docker.kubernetes.KubernetesCWLDockerTranslator +:ref:`none ` streamflow.cwl.requirement.docker.nocontainer.NoContainerCWLDockerTranslator +:ref:`singularity ` streamflow.cwl.requirement.docker.singularity.SingularityCWLDockerTranslator =================================================== ================================================================ diff --git a/streamflow/cwl/requirement/docker/docker.py b/streamflow/cwl/requirement/docker/docker.py index 5450d887c..8482440ca 100644 --- a/streamflow/cwl/requirement/docker/docker.py +++ b/streamflow/cwl/requirement/docker/docker.py @@ -47,6 +47,7 @@ def __init__( entrypoint: str | None = None, env: MutableSequence[str] | None = None, envFile: MutableSequence[str] | None = None, + ephemeral: bool = True, expose: MutableSequence[str] | None = None, gpus: MutableSequence[str] | None = None, groupAdd: MutableSequence[str] | None = None, @@ -139,6 +140,7 @@ def __init__( self.entrypoint: str | None = entrypoint self.env: MutableSequence[str] | None = env self.envFile: MutableSequence[str] | None = envFile + self.ephemeral: bool = ephemeral self.expose: MutableSequence[str] | None = expose self.gpus: MutableSequence[str] | None = gpus self.groupAdd: MutableSequence[str] | None = groupAdd @@ -217,14 +219,10 @@ def get_target( volume = list(self.volume) if self.volume else [] volume.append(f"{target.workdir}:/tmp/streamflow") if output_directory is not None: - if target.deployment.type == "local": - volume.append( - f"{os.path.join(target.workdir, utils.random_name())}:{output_directory}" - ) - else: - volume.append( - f"{posixpath.join(target.workdir, utils.random_name())}:{output_directory}" - ) + path_processor = os.path if target.deployment.type == "local" else posixpath + volume.append( + f"{path_processor.join(target.workdir, utils.random_name())}:{output_directory}" + ) return Target( deployment=DeploymentConfig( name=utils.random_name(), @@ -263,6 +261,7 @@ def get_target( "entrypoint": self.entrypoint, "env": self.env, "envFile": self.envFile, + "ephemeral": self.ephemeral, "expose": self.expose, "gpus": self.gpus, "groupAdd": self.groupAdd, @@ -320,6 +319,7 @@ def get_target( "volume": volume, "volumeDriver": self.volumeDriver, "volumesFrom": self.volumesFrom, + "workdir": self.workdir, }, workdir="/tmp/streamflow", # nosec wraps=( diff --git a/streamflow/cwl/requirement/docker/schemas/docker.json b/streamflow/cwl/requirement/docker/schemas/docker.json index 379ae1634..4361644d0 100644 --- a/streamflow/cwl/requirement/docker/schemas/docker.json +++ b/streamflow/cwl/requirement/docker/schemas/docker.json @@ -60,7 +60,7 @@ "items": { "type": "string" }, - "description": "Command to run when deploying the container" + "description": "Command to run when deploying the container. Not supported for ephemeral containers" }, "cpuPeriod": { "type": "integer", @@ -199,6 +199,11 @@ "uniqueItems": true, "description": "Read in a file of environment variables" }, + "ephemeral": { + "type": "boolean", + "default": true, + "description": "Invoke a fresh container instance for every command in the workflow" + }, "expose": { "type": "array", "items": { diff --git a/streamflow/cwl/requirement/docker/schemas/singularity.json b/streamflow/cwl/requirement/docker/schemas/singularity.json index 0d7c195d8..2bd56258e 100644 --- a/streamflow/cwl/requirement/docker/schemas/singularity.json +++ b/streamflow/cwl/requirement/docker/schemas/singularity.json @@ -15,6 +15,10 @@ "type": "string", "description": "Apply cgroups from file for container processes (root only)" }, + "arch": { + "type": "string", + "description": "architecture to pull from library" + }, "bind": { "type": "array", "items": { @@ -36,7 +40,7 @@ }, "boot": { "type": "boolean", - "description": "Execute /sbin/init to boot container (root only)" + "description": "Execute /sbin/init to boot container (root only). Not supported for ephemeral containers" }, "cleanenv": { "type": "boolean", @@ -48,7 +52,7 @@ "items": { "type": "string" }, - "description": "Command to run when deploying the container" + "description": "Command to run when deploying the container. Not supported for ephemeral containers" }, "compat": { "type": "boolean", @@ -65,7 +69,8 @@ }, "cpuShares": { "type": "integer", - "description": "CPU shares for container (default -1)" + "description": "CPU shares for container", + "default": -1 }, "cpus": { "type": "string", @@ -85,7 +90,7 @@ }, "dns": { "type": "string", - "description": "List of DNS server separated by commas to add in resolv.conf" + "description": "List of DNS server separated by commas to add in resolv.conf. Not supported for ephemeral containers" }, "dockerHost": { "type": "string", @@ -107,6 +112,11 @@ "type": "string", "description": "Pass environment variables from file to contained process" }, + "ephemeral": { + "type": "boolean", + "default": true, + "description": "Invoke a fresh container instance for every command in the workflow" + }, "fakeroot": { "type": "boolean", "description": "Run container in new user namespace as uid 0" @@ -230,12 +240,17 @@ }, "pidFile": { "type": "string", - "description": "Write instance PID to the file with the given name" + "description": "Write instance PID to the file with the given name. Not supported for ephemeral containers" }, "pidsLimit": { "type": "integer", "description": "Limit number of container PIDs, use -1 for unlimited" }, + "pullDir": { + "type": "string", + "description": "Download images to the specific directory", + "default": "The target workdir" + }, "rocm": { "type": "boolean", "description": "Enable experimental ROCM support" diff --git a/streamflow/cwl/requirement/docker/singularity.py b/streamflow/cwl/requirement/docker/singularity.py index 6d2193602..ff041668d 100644 --- a/streamflow/cwl/requirement/docker/singularity.py +++ b/streamflow/cwl/requirement/docker/singularity.py @@ -19,6 +19,7 @@ def __init__( addCaps: str | None = None, allowSetuid: bool = False, applyCgroups: str | None = None, + arch: str | None = None, bind: MutableSequence[str] | None = None, blkioWeight: int | None = None, blkioWeightDevice: MutableSequence[str] | None = None, @@ -38,6 +39,7 @@ def __init__( dropCaps: str | None = None, env: MutableSequence[str] | None = None, envFile: str | None = None, + ephemeral: bool = True, fakeroot: bool = False, fusemount: MutableSequence[str] | None = None, home: str | None = None, @@ -65,6 +67,7 @@ def __init__( pemPath: str | None = None, pidFile: str | None = None, pidsLimit: int | None = None, + pullDir: str | None = None, rocm: bool = False, scratch: MutableSequence[str] | None = None, security: MutableSequence[str] | None = None, @@ -78,6 +81,7 @@ def __init__( self.addCaps: str | None = addCaps self.allowSetuid: bool = allowSetuid self.applyCgroups: str | None = applyCgroups + self.arch: str | None = arch self.bind: MutableSequence[str] | None = bind self.blkioWeight: int | None = blkioWeight self.blkioWeightDevice: MutableSequence[str] | None = blkioWeightDevice @@ -97,6 +101,7 @@ def __init__( self.dockerHost: str | None = dockerHost self.env: MutableSequence[str] | None = env self.envFile: str | None = envFile + self.ephemeral: bool = ephemeral self.fakeroot: bool = fakeroot self.fusemount: MutableSequence[str] | None = fusemount self.home: str | None = home @@ -124,6 +129,7 @@ def __init__( self.pemPath: str | None = pemPath self.pidFile: str | None = pidFile self.pidsLimit: int | None = pidsLimit + self.pullDir: str | None = pullDir self.rocm: bool = rocm self.scratch: MutableSequence[str] | None = scratch self.security: MutableSequence[str] | None = security @@ -153,14 +159,10 @@ def get_target( bind = list(self.bind) if self.bind else [] bind.append(f"{target.workdir}:/tmp/streamflow") if output_directory is not None: - if target.deployment.type == "local": - bind.append( - f"{os.path.join(target.workdir, utils.random_name())}:{output_directory}" - ) - else: - bind.append( - f"{posixpath.join(target.workdir, utils.random_name())}:{output_directory}" - ) + path_processor = os.path if target.deployment.type == "local" else posixpath + bind.append( + f"{path_processor.join(target.workdir, utils.random_name())}:{output_directory}" + ) return Target( deployment=DeploymentConfig( name=utils.random_name(), @@ -170,6 +172,7 @@ def get_target( "addCaps": self.addCaps, "allowSetuid": self.allowSetuid, "applyCgroups": self.applyCgroups, + "arch": self.arch, "bind": bind, "blkioWeight": self.blkioWeight, "blkioWeightDevice": self.blkioWeightDevice, @@ -189,6 +192,7 @@ def get_target( "dockerHost": self.dockerHost, "env": self.env, "envFile": self.envFile, + "ephemeral": self.ephemeral, "fakeroot": self.fakeroot, "fusemount": self.fusemount, "home": self.home, @@ -216,12 +220,16 @@ def get_target( "pemPath": self.pemPath, "pidFile": self.pidFile, "pidsLimit": self.pidsLimit, + "pullDir": ( + self.pullDir if self.pullDir is not None else target.workdir + ), "rocm": self.rocm, "scratch": self.scratch, "security": self.security, "transferBufferSize": self.transferBufferSize, "userns": self.userns, "uts": self.uts, + "workdir": self.workdir, "writable": self.writable, "writableTmpfs": self.writableTmpfs, }, diff --git a/streamflow/deployment/connector/container.py b/streamflow/deployment/connector/container.py index 87d6c13d3..fed35c527 100644 --- a/streamflow/deployment/connector/container.py +++ b/streamflow/deployment/connector/container.py @@ -2,6 +2,7 @@ import asyncio import base64 +import hashlib import json import logging import os @@ -35,7 +36,11 @@ copy_remote_to_remote, copy_same_connector, ) -from streamflow.deployment.wrapper import ConnectorWrapper, get_inner_location +from streamflow.deployment.wrapper import ( + ConnectorWrapper, + get_inner_location, + get_inner_locations, +) from streamflow.log_handler import logger @@ -97,6 +102,15 @@ def _parse_mount(mount: str) -> tuple[str, str]: return source, destination +class BindMount: + __slots__ = ("src", "dst", "read_only") + + def __init__(self, src: str, dst: str, read_only: bool): + self.src: str = src + self.dst: str = dst + self.read_only: bool = read_only + + class ContainerInstance: __slots__ = ("address", "cores", "current_user", "memory", "volumes") @@ -121,10 +135,11 @@ def __init__( deployment_name: str, config_dir: str, connector: Connector, + ephemeral: bool, service: str | None, transferBufferSize: int, ): - if isinstance(connector, BatchConnector): + if not ephemeral and isinstance(connector, BatchConnector): raise WorkflowDefinitionException( f"Deployment {self.deployment_name} of type {self.__class__.__name__} " f"cannot wrap deployment {connector.deployment_name} " @@ -137,8 +152,10 @@ def __init__( service=service, transferBufferSize=transferBufferSize, ) + self.ephemeral: bool = ephemeral self._inner_location: AvailableLocation | None = None self._instances: MutableMapping[str, ContainerInstance] = {} + self._mounts: MutableSequence[BindMount] = [] async def _check_effective_location( self, @@ -194,275 +211,24 @@ async def _local_copy( if logger.isEnabledFor(logging.INFO): logger.info(f"COMPLETED copy from {src} to {dst} on location {location}") - async def copy_local_to_remote( - self, - src: str, - dst: str, - locations: MutableSequence[ExecutionLocation], - read_only: bool = False, - ) -> None: - bind_locations = {} - copy_tasks = [] - dst = await get_local_to_remote_destination(self, locations[0], src, dst) - for location in await self._get_effective_locations(locations, dst): - instance = await self._get_instance(location.name) - # Check if the container user is the current host user - # and if the destination path is on a mounted volume - if ( - instance.current_user - and (adjusted_dst := self._get_host_path(instance, dst)) is not None - ): - # If data is read_only, check if the source path is bound to a mounted volume, too - if ( - read_only - and (adjusted_src := self._get_container_path(instance, src)) - is not None - ): - # If yes, then create a symbolic link - copy_tasks.append( - asyncio.create_task( - self._local_copy( - src=adjusted_src, - dst=dst, - location=location, - read_only=read_only, - ) - ) - ) - # Otherwise, delegate transfer to the inner connector - else: - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - f"Delegating the transfer of {src} " - f"to {adjusted_dst} from local file-system " - f"to location {location.name} to the inner connector." - ) - bind_locations.setdefault(adjusted_dst, []).append(location) - # Otherwise, check if the source path is bound to a mounted volume - elif (adjusted_src := self._get_container_path(instance, src)) is not None: - # If yes, perform a copy command through the container connector - copy_tasks.append( - asyncio.create_task( - self._local_copy( - src=adjusted_src, - dst=dst, - location=location, - read_only=read_only, - ) - ) - ) - else: - # If not, perform a transfer through the container connector - copy_tasks.append( - asyncio.create_task( - copy_local_to_remote( - connector=self, - location=location, - src=src, - dst=dst, - writer_command=["tar", "xf", "-", "-C", "/"], - ) - ) - ) - # Delegate bind transfers to the inner connector, preventing symbolic links - for dst, locations in bind_locations.items(): - copy_tasks.append( - asyncio.create_task( - self.connector.copy_local_to_remote( - src=src, dst=dst, locations=locations, read_only=False - ) - ) - ) - await asyncio.gather(*copy_tasks) - - async def copy_remote_to_local( - self, - src: str, - dst: str, - location: ExecutionLocation, - read_only: bool = False, - ) -> None: - instance = await self._get_instance(location.name) - # Check if the container user is the current host user - # and the source path is on a mounted volume in the source location - if ( - instance.current_user - and (adjusted_src := self._get_host_path(instance, src)) is not None - ): - # If data is read_only, check if the destination path is bound to a mounted volume, too - if ( - read_only - and self._wraps_local() - and not os.path.exists(dst) - and self._get_container_path(instance, dst) is not None - ): - # If yes, then create a symbolic link - if logger.isEnabledFor(logging.INFO): - logger.info( - f"COPYING from {adjusted_src} to {dst} on local file-system" - ) - os.symlink(adjusted_src, dst) - if logger.isEnabledFor(logging.INFO): - logger.info( - f"COMPLETED copy from {adjusted_src} to {dst} on local file-system" - ) - # Otherwise, delegate transfer to the inner connector - else: - logger.debug( - f"Delegating the transfer of {adjusted_src} " - f"to {dst} from location {location.name} " - f"to the local file-system {location.name} to the inner " - f"{self.connector.__class__.__name__} connector." - ) - await self.connector.copy_remote_to_local( - src=adjusted_src, dst=dst, location=location, read_only=read_only - ) - # Otherwise, check if the destination path is bound to a mounted volume - elif (adjusted_dst := self._get_container_path(instance, dst)) is not None: - # If yes, perform a copy command through the container connector - await self._local_copy( - src=src, dst=adjusted_dst, location=location, read_only=read_only - ) - else: - # If not, perform a transfer through the container connector - await copy_remote_to_local( - connector=self, - location=location, - src=src, - dst=dst, - reader_command=["tar", "chf", "-", "-C", *posixpath.split(src)], - ) - - async def copy_remote_to_remote( - self, - src: str, - dst: str, - locations: MutableSequence[ExecutionLocation], - source_location: ExecutionLocation, - source_connector: Connector | None = None, - read_only: bool = False, - ) -> None: - source_connector = source_connector or self - # Check if the source path is on a mounted volume on the source location - if ( - source_connector == self - and ( - host_src := self._get_host_path( - await self._get_instance(source_location.name), src - ) - ) - is not None - ): - # If performing a transfer between two containers that wrap a LocalConnector, - # then perform a local-to-remote copy - if self._wraps_local(): - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - f"Performing a local-to-remote copy of {host_src} " - f"to {dst} from location {source_location.name} to " - f"to locations {', '.join(loc.name for loc in locations)} " - f"through the {self.__class__.__name__} copy strategy." - ) - await self.copy_local_to_remote( - src=host_src, dst=dst, locations=locations, read_only=read_only - ) - # Otherwise, check for optimizations - else: - effective_locations = await self._get_effective_locations( - locations, dst - ) - bind_locations = {} - unbound_locations = [] - copy_tasks = [] - for location in effective_locations: - instance = await self._get_instance(location.name) - # Check if the source path is on a mounted volume - if ( - adjusted_src := self._get_container_path(instance, host_src) - ) is not None: - # If yes, perform a copy command through the container connector - copy_tasks.append( - asyncio.create_task( - self._local_copy( - src=adjusted_src, - dst=dst, - location=location, - read_only=read_only, - ) - ) - ) - # Otherwise, check if the container user is the current host user - # and the destination path is on a mounted volume - elif ( - instance.current_user - and (adjusted_dst := self._get_host_path(instance, dst)) - is not None - ): - # If yes, then delegate transfer to the inner connector - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - f"Delegating the transfer of {src} " - f"to {adjusted_dst} from location {source_location.name} " - f"to location {location.name} to the inner " - f"{self.connector.__class__.__name__} connector." - ) - bind_locations.setdefault(adjusted_dst, []).append(location) - # Otherwise, mask the location as not bound - else: - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - f"Copying from {src} to {dst} from location {source_location.name} to " - f"location {location.name} through the " - f"{self.__class__.__name__} copy strategy." - ) - unbound_locations.append(location) - # Delegate bind transfers to the inner connector, preventing symbolic links - for host_dst, locs in bind_locations.items(): - copy_tasks.append( - asyncio.create_task( - self.connector.copy_local_to_remote( - src=host_src, - dst=host_dst, - locations=locs, - read_only=False, - ) - ) - ) - # Perform a standard remote-to-remote copy for unbound locations - copy_tasks.append( - asyncio.create_task( - copy_remote_to_remote( - connector=self, - locations=unbound_locations, - src=src, - dst=dst, - source_connector=source_connector, - source_location=source_location, - ) - ) - ) - # Wait for all copy tasks to finish - await asyncio.gather(*copy_tasks) - # Otherwise, perform a standard remote-to-remote copy - else: - if locations := await copy_same_connector( - connector=self, - locations=await self._get_effective_locations( - locations, dst, source_location + async def _get_available_location_from_instance( + self, instanceId: str | None = None + ) -> MutableMapping[str, AvailableLocation]: + instance = self._instances[instanceId] + return { + instanceId: AvailableLocation( + name=instanceId, + deployment=self.deployment_name, + hostname=instance.address, + hardware=Hardware( + cores=instance.cores, + memory=instance.memory, + storage=instance.volumes, ), - source_location=source_location, - src=src, - dst=dst, - read_only=read_only, - ): - await copy_remote_to_remote( - connector=self, - locations=locations, - src=src, - dst=dst, - source_connector=source_connector, - source_location=source_location, - ) + stacked=True, + wraps=self._inner_location, + ) + } async def _get_effective_locations( self, @@ -539,6 +305,341 @@ async def _prepare_volumes( def _wraps_local(self) -> bool: return self._inner_location.local + async def copy_local_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[ExecutionLocation], + read_only: bool = False, + ) -> None: + # If container is ephemeral + if self.ephemeral: + # Delegate the copy to the inner connector + await super().copy_local_to_remote( + src=src, + dst=dst, + locations=get_inner_locations(locations=locations), + read_only=read_only, + ) + # Add the source path to the mount points + self._mounts.append( + BindMount( + src=dst, + dst=os.path.realpath(dst) if self._wraps_local() else dst, + read_only=read_only, + ) + ) + # Otherwise, try to optimise the data movement + else: + bind_locations = {} + copy_tasks = [] + dst = await get_local_to_remote_destination(self, locations[0], src, dst) + for location in await self._get_effective_locations(locations, dst): + instance = await self._get_instance(location.name) + # Check if the container user is the current host user + # and if the destination path is on a mounted volume + if ( + instance.current_user + and (adjusted_dst := self._get_host_path(instance, dst)) is not None + ): + # If data is read_only, check if the source path is bound to a mounted volume, too + if ( + read_only + and (adjusted_src := self._get_container_path(instance, src)) + is not None + ): + # If yes, then create a symbolic link + copy_tasks.append( + asyncio.create_task( + self._local_copy( + src=adjusted_src, + dst=dst, + location=location, + read_only=read_only, + ) + ) + ) + # Otherwise, delegate transfer to the inner connector + else: + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + f"Delegating the transfer of {src} " + f"to {adjusted_dst} from local file-system " + f"to location {location.name} to the inner connector." + ) + bind_locations.setdefault(adjusted_dst, []).append(location) + # Otherwise, check if the source path is bound to a mounted volume + elif ( + adjusted_src := self._get_container_path(instance, src) + ) is not None: + # If yes, perform a copy command through the container connector + copy_tasks.append( + asyncio.create_task( + self._local_copy( + src=adjusted_src, + dst=dst, + location=location, + read_only=read_only, + ) + ) + ) + else: + # If not, perform a transfer through the container connector + copy_tasks.append( + asyncio.create_task( + copy_local_to_remote( + connector=self, + location=location, + src=src, + dst=dst, + writer_command=["tar", "xf", "-", "-C", "/"], + ) + ) + ) + # Delegate bind transfers to the inner connector, preventing symbolic links + for dst, locations in bind_locations.items(): + copy_tasks.append( + asyncio.create_task( + self.connector.copy_local_to_remote( + src=src, dst=dst, locations=locations, read_only=False + ) + ) + ) + await asyncio.gather(*copy_tasks) + + async def copy_remote_to_local( + self, + src: str, + dst: str, + location: ExecutionLocation, + read_only: bool = False, + ) -> None: + # If container is ephemeral, delegate the copy to the inner connector + if self.ephemeral: + await super().copy_remote_to_local( + src=src, + dst=dst, + location=get_inner_location(location=location), + read_only=read_only, + ) + # Otherwise, try to optimise the data movement + else: + instance = await self._get_instance(location.name) + # Check if the container user is the current host user + # and the source path is on a mounted volume in the source location + if ( + instance.current_user + and (adjusted_src := self._get_host_path(instance, src)) is not None + ): + # If data is read_only, check if the destination path is bound to a mounted volume, too + if ( + read_only + and self._wraps_local() + and not os.path.exists(dst) + and self._get_container_path(instance, dst) is not None + ): + # If yes, then create a symbolic link + if logger.isEnabledFor(logging.INFO): + logger.info( + f"COPYING from {adjusted_src} to {dst} on local file-system" + ) + os.symlink(adjusted_src, dst) + if logger.isEnabledFor(logging.INFO): + logger.info( + f"COMPLETED copy from {adjusted_src} to {dst} on local file-system" + ) + # Otherwise, delegate transfer to the inner connector + else: + logger.debug( + f"Delegating the transfer of {adjusted_src} " + f"to {dst} from location {location.name} " + f"to the local file-system {location.name} to the inner " + f"{self.connector.__class__.__name__} connector." + ) + await self.connector.copy_remote_to_local( + src=adjusted_src, + dst=dst, + location=location, + read_only=read_only, + ) + # Otherwise, check if the destination path is bound to a mounted volume + elif (adjusted_dst := self._get_container_path(instance, dst)) is not None: + # If yes, perform a copy command through the container connector + await self._local_copy( + src=src, dst=adjusted_dst, location=location, read_only=read_only + ) + else: + # If not, perform a transfer through the container connector + await copy_remote_to_local( + connector=self, + location=location, + src=src, + dst=dst, + reader_command=["tar", "chf", "-", "-C", *posixpath.split(src)], + ) + + async def copy_remote_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[ExecutionLocation], + source_location: ExecutionLocation, + source_connector: Connector | None = None, + read_only: bool = False, + ) -> None: + # If container is ephemeral + if self.ephemeral: + # Delegate the copy to the inner connector + await super().copy_remote_to_remote( + src=src, + dst=dst, + locations=get_inner_locations(locations=locations), + source_location=source_location, + source_connector=source_connector, + read_only=read_only, + ) + # Follow symbolic link in the destination path + if self._wraps_local(): + src = os.path.realpath(dst) + else: + result, returncode = await self.connector.run( + location=self._inner_location.location, + command=[f'readlink -f "{dst}"'], + capture_output=True, + ) + if returncode == 0: + src = result.strip() + else: + raise WorkflowExecutionException( + f"FAILED copy from {src} to {dst} on deployment " + f"{self.deployment_name}: [{returncode}] {result}" + ) + # Add the destination path to the mount points + self._mounts.append(BindMount(src=src, dst=dst, read_only=read_only)) + # Otherwise, try to optimise the data movement + else: + source_connector = source_connector or self + # Check if the source path is on a mounted volume on the source location + if ( + source_connector == self + and ( + host_src := self._get_host_path( + await self._get_instance(source_location.name), src + ) + ) + is not None + ): + # If performing a transfer between two containers that wrap a LocalConnector, + # then perform a local-to-remote copy + if self._wraps_local(): + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + f"Performing a local-to-remote copy of {host_src} " + f"to {dst} from location {source_location.name} to " + f"to locations {', '.join(loc.name for loc in locations)} " + f"through the {self.__class__.__name__} copy strategy." + ) + await self.copy_local_to_remote( + src=host_src, dst=dst, locations=locations, read_only=read_only + ) + # Otherwise, check for optimizations + else: + effective_locations = await self._get_effective_locations( + locations, dst + ) + bind_locations = {} + unbound_locations = [] + copy_tasks = [] + for location in effective_locations: + instance = await self._get_instance(location.name) + # Check if the source path is on a mounted volume + if ( + adjusted_src := self._get_container_path(instance, host_src) + ) is not None: + # If yes, perform a copy command through the container connector + copy_tasks.append( + asyncio.create_task( + self._local_copy( + src=adjusted_src, + dst=dst, + location=location, + read_only=read_only, + ) + ) + ) + # Otherwise, check if the container user is the current host user + # and the destination path is on a mounted volume + elif ( + instance.current_user + and (adjusted_dst := self._get_host_path(instance, dst)) + is not None + ): + # If yes, then delegate transfer to the inner connector + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + f"Delegating the transfer of {src} " + f"to {adjusted_dst} from location {source_location.name} " + f"to location {location.name} to the inner " + f"{self.connector.__class__.__name__} connector." + ) + bind_locations.setdefault(adjusted_dst, []).append(location) + # Otherwise, mask the location as not bound + else: + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + f"Copying from {src} to {dst} from location {source_location.name} to " + f"location {location.name} through the " + f"{self.__class__.__name__} copy strategy." + ) + unbound_locations.append(location) + # Delegate bind transfers to the inner connector, preventing symbolic links + for host_dst, locs in bind_locations.items(): + copy_tasks.append( + asyncio.create_task( + self.connector.copy_local_to_remote( + src=host_src, + dst=host_dst, + locations=locs, + read_only=False, + ) + ) + ) + # Perform a standard remote-to-remote copy for unbound locations + copy_tasks.append( + asyncio.create_task( + copy_remote_to_remote( + connector=self, + locations=unbound_locations, + src=src, + dst=dst, + source_connector=source_connector, + source_location=source_location, + ) + ) + ) + # Wait for all copy tasks to finish + await asyncio.gather(*copy_tasks) + # Otherwise, perform a standard remote-to-remote copy + else: + if locations := await copy_same_connector( + connector=self, + locations=await self._get_effective_locations( + locations, dst, source_location + ), + source_location=source_location, + src=src, + dst=dst, + read_only=read_only, + ): + await copy_remote_to_remote( + connector=self, + locations=locations, + src=src, + dst=dst, + source_connector=source_connector, + source_location=source_location, + ) + async def deploy(self, external: bool) -> None: # Retrieve the underlying location locations = await self.connector.get_available_locations(service=self.service) @@ -553,29 +654,41 @@ async def deploy(self, external: bool) -> None: async def get_stream_reader( self, command: MutableSequence[str], location: ExecutionLocation ) -> StreamWrapperContextManager: - return await self.connector.get_stream_reader( - command=self._get_run_command( - command=utils.encode_command(" ".join(command), "sh"), - location=location, - interactive=False, - ), - location=get_inner_location(location), - ) + if self.ephemeral: + return await super().get_stream_reader( + command=command, + location=get_inner_location(location=location), + ) + else: + return await self.connector.get_stream_reader( + command=self._get_run_command( + command=utils.encode_command(" ".join(command), "sh"), + location=location, + interactive=False, + ), + location=get_inner_location(location), + ) async def get_stream_writer( self, command: MutableSequence[str], location: ExecutionLocation ) -> StreamWrapperContextManager: - encoded_command = base64.b64encode(" ".join(command).encode("utf-8")).decode( - "utf-8" - ) - return await self.connector.get_stream_writer( - command=self._get_run_command( - command=f"eval $(echo {encoded_command} | base64 -d)", - location=location, - interactive=True, - ), - location=get_inner_location(location), - ) + if self.ephemeral: + return await super().get_stream_reader( + command=command, + location=get_inner_location(location=location), + ) + else: + encoded_command = base64.b64encode( + " ".join(command).encode("utf-8") + ).decode("utf-8") + return await self.connector.get_stream_writer( + command=self._get_run_command( + command=f"eval $(echo {encoded_command} | base64 -d)", + location=location, + interactive=True, + ), + location=get_inner_location(location), + ) async def run( self, @@ -590,33 +703,47 @@ async def run( timeout: int | None = None, job_name: str | None = None, ) -> tuple[Any | None, int] | None: - command = utils.create_command( - self.__class__.__name__, - command, - environment, - workdir, - stdin, - stdout, - stderr, - ) - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - "EXECUTING command {command} on {location} {job}".format( - command=command, - location=location, - job=f"for job {job_name}" if job_name else "", + if not self.ephemeral or job_name is not None: + command = utils.create_command( + self.__class__.__name__, + command, + environment, + workdir, + stdin, + stdout, + stderr, + ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "EXECUTING command {command} on {location} {job}".format( + command=command, + location=location, + job=f"for job {job_name}" if job_name else "", + ) ) + return await self.connector.run( + location=get_inner_location(location), + command=self._get_run_command( + command=utils.encode_command(command, "sh"), + location=location, + ), + capture_output=capture_output, + timeout=timeout, + job_name=job_name, + ) + else: + return await super().run( + location=get_inner_location(location=location), + command=command, + environment=environment, + workdir=workdir, + stdin=stdin, + stdout=stdout, + stderr=stderr, + capture_output=capture_output, + timeout=timeout, + job_name=job_name, ) - return await self.connector.run( - location=get_inner_location(location), - command=self._get_run_command( - command=utils.encode_command(command, "sh"), - location=location, - ), - capture_output=capture_output, - timeout=timeout, - job_name=job_name, - ) class DockerBaseConnector(ContainerConnector, ABC): @@ -625,6 +752,7 @@ def __init__( deployment_name: str, config_dir: str, connector: Connector, + ephemeral: bool, service: str | None, transferBufferSize: int, ): @@ -632,6 +760,7 @@ def __init__( deployment_name=deployment_name, config_dir=config_dir, connector=connector, + ephemeral=ephemeral, service=service, transferBufferSize=transferBufferSize, ) @@ -883,6 +1012,7 @@ def __init__( entrypoint: str | None = None, env: MutableSequence[str] | None = None, envFile: MutableSequence[str] | None = None, + ephemeral: bool = False, expose: MutableSequence[str] | None = None, gpus: MutableSequence[str] | None = None, groupAdd: MutableSequence[str] | None = None, @@ -942,10 +1072,16 @@ def __init__( volumesFrom: MutableSequence[str] | None = None, workdir: str | None = None, ): + if ephemeral and command: + raise WorkflowDefinitionException( + f"Invalid configuration for connector {self.deployment_name}: " + "the `command` option is not supported for ephemeral Docker containers." + ) super().__init__( deployment_name=deployment_name, config_dir=config_dir, connector=connector, + ephemeral=ephemeral, service=service, transferBufferSize=transferBufferSize, ) @@ -1041,6 +1177,113 @@ def __init__( self.volumesFrom: MutableSequence[str] | None = volumesFrom self.workdir: str | None = workdir + def _build_run_command( + self, command: MutableSequence[str], detach: bool, interactive: bool + ) -> MutableSequence[str]: + run_command = [ + "docker", + "run", + get_option("detach", detach), + get_option("interactive", interactive), + get_option("add-host", self.addHost), + get_option("blkio-weight", self.addHost), + get_option("blkio-weight-device", self.blkioWeightDevice), + get_option("cap-add", self.capAdd), + get_option("cap-drop", self.capDrop), + get_option("cgroup-parent", self.cgroupParent), + get_option("cgroupns", self.cgroupns), + get_option("cidfile", self.cidfile), + get_option("cpu-period", self.cpuPeriod), + get_option("cpu-quota", self.cpuQuota), + get_option("cpu-rt-period", self.cpuRTPeriod), + get_option("cpu-rt-runtime", self.cpuRTRuntime), + get_option("cpu-shares", self.cpuShares), + get_option("cpus", self.cpus), + get_option("cpuset-cpus", self.cpusetCpus), + get_option("cpuset-mems", self.cpusetMems), + get_option("detach-keys", self.detachKeys), + get_option("device", self.device), + get_option("device-cgroup-rule", self.deviceCgroupRule), + get_option("device-read-bps", self.deviceReadBps), + get_option("device-read-iops", self.deviceReadIops), + get_option("device-write-bps", self.deviceWriteBps), + get_option("device-write-iops", self.deviceWriteIops), + f"--disable-content-trust={'true' if self.disableContentTrust else 'false'}", + get_option("dns", self.dns), + get_option("dns-option", self.dnsOptions), + get_option("dns-search", self.dnsSearch), + get_option("domainname", self.domainname), + get_option("entrypoint", self.entrypoint), + get_option("env", self.env), + get_option("env-file", self.envFile), + get_option("expose", self.expose), + get_option("gpus", self.gpus), + get_option("group-add", self.groupAdd), + get_option("health-cmd", self.healthCmd), + get_option("health-interval", self.healthInterval), + get_option("health-retries", self.healthRetries), + get_option("health-start-period", self.healthStartPeriod), + get_option("health-timeout", self.healthTimeout), + get_option("hostname", self.hostname), + get_option("init", self.init), + get_option("ip", self.ip), + get_option("ip6", self.ip6), + get_option("ipc", self.ipc), + get_option("isolation", self.isolation), + get_option("kernel-memory", self.kernelMemory), + get_option("label", self.label), + get_option("label-file", self.labelFile), + get_option("link", self.link), + get_option("link-local-ip", self.linkLocalIP), + get_option("log-driver", self.logDriver), + get_option("log-opt", self.logOpts), + get_option("mac-address", self.macAddress), + get_option("memory", self.memory), + get_option("memory-reservation", self.memoryReservation), + get_option("memory-swap", self.memorySwap), + get_option("memory-swappiness", self.memorySwappiness), + get_option( + "mount", + (self.mount or []) + + [ + f"type=bind,src={mount.src},dst={mount.dst}{',readonly' if mount.read_only else ''}" + for mount in self._mounts + ], + ), + get_option("network", self.network), + get_option("network-alias", self.networkAlias), + get_option("no-healthcheck", self.noHealthcheck), + get_option("oom-kill-disable", self.oomKillDisable), + get_option("oom-score-adj", self.oomScoreAdj), + get_option("pid", self.pid), + get_option("pids-limit", self.pidsLimit), + get_option("privileged", self.privileged), + get_option("publish", self.publish), + get_option("publish-all", self.publishAll), + get_option("read-only", self.readOnly), + get_option("restart", self.restart), + get_option("rm", self.rm), + get_option("runtime", self.runtime), + get_option("security-opt", self.securityOpts), + get_option("shm-size", self.shmSize), + f"--sig-proxy={'true' if self.sigProxy else 'false'}", + get_option("stop-signal", self.stopSignal), + get_option("stop-timeout", self.stopTimeout), + get_option("storage-opt", self.storageOpts), + get_option("sysctl", self.sysctl), + get_option("tmpfs", self.tmpfs), + get_option("ulimit", self.ulimit), + get_option("user", self.user), + get_option("userns", self.userns), + get_option("uts", self.uts), + get_option("volume", self.volume), + get_option("volume-driver", self.volumeDriver), + get_option("volumes-from", self.volumesFrom), + get_option("workdir", self.workdir), + self.image, + ] + return (run_command + list(command)) if command else run_command + async def _get_instance(self, location: str) -> ContainerInstance: if location not in self._instances: raise WorkflowExecutionException( @@ -1049,161 +1292,89 @@ async def _get_instance(self, location: str) -> ContainerInstance: ) return self._instances[location] + def _get_run_command( + self, command: str, location: ExecutionLocation, interactive: bool = False + ) -> MutableSequence[str]: + if self.ephemeral: + return self._build_run_command( + detach=False, + interactive=False, + command=["sh", "-c", f"'{command}'"], + ) + else: + return super()._get_run_command( + command=command, location=location, interactive=interactive + ) + async def deploy(self, external: bool) -> None: await super().deploy(external) # Check if Docker is installed in the wrapped connector await self._check_docker_installed() - # If the deployment is not external, deploy the container - if not external: - await self._prepare_volumes(self.volume, self.mount) - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Using Docker {await self._get_docker_version()}.") - # Pull image if it doesn't exist - _, returncode = await self.connector.run( + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Using Docker {await self._get_docker_version()}.") + # Prepare volumes + await self._prepare_volumes(self.volume, self.mount) + # Pull image if it doesn't exist + _, returncode = await self.connector.run( + location=self._inner_location.location, + command=["docker", "image", "inspect", self.image], + capture_output=True, + ) + if returncode != 0: + await self.connector.run( location=self._inner_location.location, - command=["docker", "image", "inspect", self.image], - capture_output=True, + command=[ + "docker", + "pull", + "--quiet", + f"--disable-content-trust={'true' if self.disableContentTrust else 'false'}", + self.image, + ], ) - if returncode != 0: - await self.connector.run( + # If the deployment is not ephemeral + if not self.ephemeral: + # If the deployment is not external, deploy the container + if not external: + # Deploy the Docker container + deploy_command = self._build_run_command( + command=self.command, detach=True, interactive=True + ) + stdout, returncode = await self.connector.run( location=self._inner_location.location, - command=["docker", "pull", "--quiet", self.image], + command=deploy_command, + capture_output=True, ) - # Deploy the Docker container - deploy_command = [ - "docker", - "run", - "--detach", - "--interactive", - get_option("add-host", self.addHost), - get_option("blkio-weight", self.addHost), - get_option("blkio-weight-device", self.blkioWeightDevice), - get_option("cap-add", self.capAdd), - get_option("cap-drop", self.capDrop), - get_option("cgroup-parent", self.cgroupParent), - get_option("cgroupns", self.cgroupns), - get_option("cidfile", self.cidfile), - get_option("cpu-period", self.cpuPeriod), - get_option("cpu-quota", self.cpuQuota), - get_option("cpu-rt-period", self.cpuRTPeriod), - get_option("cpu-rt-runtime", self.cpuRTRuntime), - get_option("cpu-shares", self.cpuShares), - get_option("cpus", self.cpus), - get_option("cpuset-cpus", self.cpusetCpus), - get_option("cpuset-mems", self.cpusetMems), - get_option("detach-keys", self.detachKeys), - get_option("device", self.device), - get_option("device-cgroup-rule", self.deviceCgroupRule), - get_option("device-read-bps", self.deviceReadBps), - get_option("device-read-iops", self.deviceReadIops), - get_option("device-write-bps", self.deviceWriteBps), - get_option("device-write-iops", self.deviceWriteIops), - f"--disable-content-trust={'true' if self.disableContentTrust else 'false'}", - get_option("dns", self.dns), - get_option("dns-option", self.dnsOptions), - get_option("dns-search", self.dnsSearch), - get_option("domainname", self.domainname), - get_option("entrypoint", self.entrypoint), - get_option("env", self.env), - get_option("env-file", self.envFile), - get_option("expose", self.expose), - get_option("gpus", self.gpus), - get_option("group-add", self.groupAdd), - get_option("health-cmd", self.healthCmd), - get_option("health-interval", self.healthInterval), - get_option("health-retries", self.healthRetries), - get_option("health-start-period", self.healthStartPeriod), - get_option("health-timeout", self.healthTimeout), - get_option("hostname", self.hostname), - get_option("init", self.init), - get_option("ip", self.ip), - get_option("ip6", self.ip6), - get_option("ipc", self.ipc), - get_option("isolation", self.isolation), - get_option("kernel-memory", self.kernelMemory), - get_option("label", self.label), - get_option("label-file", self.labelFile), - get_option("link", self.link), - get_option("link-local-ip", self.linkLocalIP), - get_option("log-driver", self.logDriver), - get_option("log-opt", self.logOpts), - get_option("mac-address", self.macAddress), - get_option("memory", self.memory), - get_option("memory-reservation", self.memoryReservation), - get_option("memory-swap", self.memorySwap), - get_option("memory-swappiness", self.memorySwappiness), - get_option("mount", self.mount), - get_option("network", self.network), - get_option("network-alias", self.networkAlias), - get_option("no-healthcheck", self.noHealthcheck), - get_option("oom-kill-disable", self.oomKillDisable), - get_option("oom-score-adj", self.oomScoreAdj), - get_option("pid", self.pid), - get_option("pids-limit", self.pidsLimit), - get_option("privileged", self.privileged), - get_option("publish", self.publish), - get_option("publish-all", self.publishAll), - get_option("read-only", self.readOnly), - get_option("restart", self.restart), - get_option("rm", self.rm), - get_option("runtime", self.runtime), - get_option("security-opt", self.securityOpts), - get_option("shm-size", self.shmSize), - f"--sig-proxy={'true' if self.sigProxy else 'false'}", - get_option("stop-signal", self.stopSignal), - get_option("stop-timeout", self.stopTimeout), - get_option("storage-opt", self.storageOpts), - get_option("sysctl", self.sysctl), - get_option("tmpfs", self.tmpfs), - get_option("ulimit", self.ulimit), - get_option("user", self.user), - get_option("userns", self.userns), - get_option("uts", self.uts), - get_option("volume", self.volume), - get_option("volume-driver", self.volumeDriver), - get_option("volumes-from", self.volumesFrom), - get_option("workdir", self.workdir), - self.image, - f"{' '.join(self.command) if self.command else ''}", - ] - stdout, returncode = await self.connector.run( - location=self._inner_location.location, - command=deploy_command, - capture_output=True, - ) - if returncode == 0: - self.containerId = stdout - else: - raise WorkflowExecutionException( - f"FAILED Deployment of {self.deployment_name} environment [{returncode}]:\n\t{stdout}" + if returncode == 0: + self.containerId = stdout + else: + raise WorkflowExecutionException( + f"FAILED Deployment of {self.deployment_name} environment [{returncode}]:\n\t{stdout}" + ) + # Otherwise, check if a containerId has been explicitly specified + elif self.containerId is None: + raise WorkflowDefinitionException( + f"FAILED Deployment of {self.deployment_name} environment:\n\t" + "external Docker deployments must specify the containerId of an existing Docker container" ) - # Otherwise, check if a containerId has been explicitly specified - elif self.containerId is None: - raise WorkflowDefinitionException( - f"FAILED Deployment of {self.deployment_name} environment:\n\t" - "external Docker deployments must specify the containerId of an existing Docker container" - ) - # Populate instance - await self._populate_instance(self.containerId) + # Populate instance + await self._populate_instance(self.containerId) async def get_available_locations( self, service: str | None = None ) -> MutableMapping[str, AvailableLocation]: - instance = self._instances[self.containerId] - return { - self.containerId: AvailableLocation( - name=self.containerId, - deployment=self.deployment_name, - hostname=instance.address, - hardware=Hardware( - cores=instance.cores, - memory=instance.memory, - storage=instance.volumes, - ), - stacked=True, - wraps=self._inner_location, - ) - } + if self.ephemeral: + name = f"{self._inner_location.name}/{self.deployment_name}" + return { + name: AvailableLocation( + name=name, + deployment=self.deployment_name, + hostname=self._inner_location.hostname, + stacked=True, + wraps=self._inner_location, + ) + } + else: + return await super()._get_available_location_from_instance(self.containerId) @classmethod def get_schema(cls) -> str: @@ -1215,7 +1386,7 @@ def get_schema(cls) -> str: ) async def undeploy(self, external: bool) -> None: - if not external: + if not (external or self.ephemeral): stdout, returncode = await self.connector.run( location=self._inner_location.location, command=["docker", "stop", self.containerId], @@ -1268,6 +1439,7 @@ def __init__( connector=connector, service=service, transferBufferSize=transferBufferSize, + ephemeral=False, ) self.files = [ file if os.path.isabs(file) else os.path.join(self.config_dir, file) @@ -1444,8 +1616,8 @@ async def get_available_locations( ) } return { - k: AvailableLocation( - name=k, + name: AvailableLocation( + name=name, deployment=self.deployment_name, hostname=instance.address, hardware=Hardware( @@ -1456,7 +1628,7 @@ async def get_available_locations( stacked=True, wraps=self._inner_location, ) - for k, instance in instances.items() + for name, instance in instances.items() } @classmethod @@ -1493,6 +1665,7 @@ def __init__( addCaps: str | None = None, allowSetuid: bool = False, applyCgroups: str | None = None, + arch: str | None = None, bind: MutableSequence[str] | None = None, blkioWeight: int | None = None, blkioWeightDevice: MutableSequence[str] | None = None, @@ -1512,6 +1685,7 @@ def __init__( dropCaps: str | None = None, env: MutableSequence[str] | None = None, envFile: str | None = None, + ephemeral: bool = False, fakeroot: bool = False, fusemount: MutableSequence[str] | None = None, home: str | None = None, @@ -1540,6 +1714,7 @@ def __init__( pemPath: str | None = None, pidFile: str | None = None, pidsLimit: int | None = None, + pullDir: str | None = None, rocm: bool = False, scratch: MutableSequence[str] | None = None, security: MutableSequence[str] | None = None, @@ -1550,10 +1725,32 @@ def __init__( writable: bool = False, writableTmpfs: bool = False, ): + if ephemeral: + if boot: + raise WorkflowDefinitionException( + f"Invalid configuration for connector {self.deployment_name}: " + "the `boot` option is not supported for ephemeral Docker containers." + ) + if command: + raise WorkflowDefinitionException( + f"Invalid configuration for connector {self.deployment_name}: " + "the `command` option is not supported for ephemeral Docker containers." + ) + if dns: + raise WorkflowDefinitionException( + f"Invalid configuration for connector {self.deployment_name}: " + "the `dns` option is not supported for ephemeral Docker containers." + ) + if pidFile: + raise WorkflowDefinitionException( + f"Invalid configuration for connector {self.deployment_name}: " + "the `pidFile` option is not supported for ephemeral Docker containers." + ) super().__init__( deployment_name=deployment_name, config_dir=config_dir, connector=connector, + ephemeral=ephemeral, service=service, transferBufferSize=transferBufferSize, ) @@ -1561,6 +1758,7 @@ def __init__( self.addCaps: str | None = addCaps self.allowSetuid: bool = allowSetuid self.applyCgroups: str | None = applyCgroups + self.arch: str | None = arch self.bind: MutableSequence[str] | None = bind self.blkioWeight: int | None = blkioWeight self.blkioWeightDevice: MutableSequence[str] | None = blkioWeightDevice @@ -1608,6 +1806,7 @@ def __init__( self.pemPath: str | None = pemPath self.pidFile: str | None = pidFile self.pidsLimit: int | None = pidsLimit + self.pullDir: str | None = pullDir self.rocm: bool = rocm self.scratch: MutableSequence[str] | None = scratch self.security: MutableSequence[str] | None = security @@ -1616,6 +1815,81 @@ def __init__( self.workdir: str | None = workdir self.writable: bool = writable self.writableTmpfs: bool = writableTmpfs + self._image_file: str | None = None + + def _build_run_command( + self, + base_command: list[str], + command: MutableSequence[str], + name: str | None = None, + ) -> MutableSequence[str]: + run_command = base_command + [ + get_option("add-caps", self.addCaps), + get_option("allow-setuid", self.allowSetuid), + get_option("apply-cgroups", self.applyCgroups), + get_option("bind", self.bind), + get_option("blkio-weight", self.blkioWeight), + get_option("blkio-weight-device", self.blkioWeightDevice), + get_option("boot", self.boot), + get_option("cleanenv", self.cleanenv), + get_option("compat", self.compat), + get_option("contain", self.contain), + get_option("containall", self.containall), + get_option("cpu-shares", self.cpuShares), + get_option("cpus", self.cpus), + get_option("cpuset-cpus", self.cpusetCpus), + get_option("cpuset-mems", self.cpusetMems), + get_option("disable-cache", self.disableCache), + get_option("docker-host", self.dockerHost), + get_option("dns", self.dns), + get_option("drop-caps", self.dropCaps), + get_option("env-file", self.envFile), + get_option("fakeroot", self.fakeroot), + get_option("fusemount", self.fusemount), + get_option("home", self.home), + get_option("hostname", self.hostname), + get_option("ipc", self.ipc), + get_option("keep-privs", self.keepPrivs), + get_option("memory", self.memory), + get_option("memory-reservation", self.memoryReservation), + get_option("memory-swap", self.memorySwap), + get_option( + "mount", + (self.mount or []) + + [ + f"type=bind,src={mount.src},dst={mount.dst}{',readonly' if mount.read_only else ''}" + for mount in self._mounts + ], + ), + get_option("net", self.net), + get_option("network", self.network), + get_option("network-args", self.networkArgs), + get_option("no-eval", self.noEval), + get_option("no-home", self.noHome), + get_option("no-https", self.noHttps), + get_option("no-init", self.noInit), + get_option("no-mount", self.noMount), + get_option("no-privs", self.noPrivs), + get_option("no-umask", self.noUmask), + get_option("nv", self.nv), + get_option("nvccli", self.nvccli), + get_option("oom-kill-disable", self.oomKillDisable), + get_option("overlay", self.overlay), + get_option("pem-path", self.pemPath), + get_option("pid-file", self.pidFile), + get_option("pids-limit", self.pidsLimit), + get_option("rocm", self.rocm), + get_option("scratch", self.scratch), + get_option("security", self.security), + get_option("userns", self.userns), + get_option("uts", self.uts), + get_option("workdir", self.workdir), + get_option("writable", self.writable), + get_option("writable-tmpfs", self.writableTmpfs), + self._image_file, + name if name is not None else "", + ] + return (run_command + list(command)) if command else run_command async def _check_singularity_installed(self): if self._wraps_local(): @@ -1635,15 +1909,21 @@ async def _check_singularity_installed(self): def _get_run_command( self, command: str, location: ExecutionLocation, interactive: bool = False ) -> MutableSequence[str]: - return [ - "singularity", - "exec", - get_option("cleanenv", self.cleanenv), - f"instance://{location.name}", - "sh", - "-c", - f"'{command}'", - ] + if self.ephemeral: + return self._build_run_command( + base_command=["singularity", "exec"], + command=["sh", "-c", f"'{command}'"], + ) + else: + return [ + "singularity", + "exec", + get_option("cleanenv", self.cleanenv), + f"instance://{location.name}", + "sh", + "-c", + f"'{command}'", + ] async def _get_singularity_version(self) -> str: stdout, _ = await self.connector.run( @@ -1823,113 +2103,139 @@ async def deploy(self, external: bool) -> None: await super().deploy(external) # Check if Singularity is installed in the wrapped connector await self._check_singularity_installed() - # If the deployment is not external, deploy the container - if not external: - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Using {await self._get_singularity_version()}.") - await self._prepare_volumes(self.bind, self.mount) - instance_name = random_name() - deploy_command = [ - "singularity", - "instance", - "start", - get_option("add-caps", self.addCaps), - get_option("allow-setuid", self.allowSetuid), - get_option("apply-cgroups", self.applyCgroups), - get_option("bind", self.bind), - get_option("blkio-weight", self.blkioWeight), - get_option("blkio-weight-device", self.blkioWeightDevice), - get_option("boot", self.boot), - get_option("cleanenv", self.cleanenv), - get_option("compat", self.compat), - get_option("contain", self.contain), - get_option("containall", self.containall), - get_option("cpu-shares", self.cpuShares), - get_option("cpus", self.cpus), - get_option("cpuset-cpus", self.cpusetCpus), - get_option("cpuset-mems", self.cpusetMems), - get_option("disable-cache", self.disableCache), - get_option("docker-host", self.dockerHost), - get_option("dns", self.dns), - get_option("drop-caps", self.dropCaps), - get_option("env-file", self.envFile), - get_option("fakeroot", self.fakeroot), - get_option("fusemount", self.fusemount), - get_option("home", self.home), - get_option("hostname", self.hostname), - get_option("ipc", self.ipc), - get_option("keep-privs", self.keepPrivs), - get_option("memory", self.memory), - get_option("memory-reservation", self.memoryReservation), - get_option("memory-swap", self.memorySwap), - get_option("mount", self.mount), - get_option("net", self.net), - get_option("network", self.network), - get_option("network-args", self.networkArgs), - get_option("no-eval", self.noEval), - get_option("no-home", self.noHome), - get_option("no-https", self.noHttps), - get_option("no-init", self.noInit), - get_option("no-mount", self.noMount), - get_option("no-privs", self.noPrivs), - get_option("no-umask", self.noUmask), - get_option("nv", self.nv), - get_option("nvccli", self.nvccli), - get_option("oom-kill-disable", self.oomKillDisable), - get_option("overlay", self.overlay), - get_option("pem-path", self.pemPath), - get_option("pid-file", self.pidFile), - get_option("pids-limit", self.pidsLimit), - get_option("rocm", self.rocm), - get_option("scratch", self.scratch), - get_option("security", self.security), - get_option("userns", self.userns), - get_option("uts", self.uts), - get_option("workdir", self.workdir), - get_option("writable", self.writable), - get_option("writable-tmpfs", self.writableTmpfs), - self.image, - instance_name, - " ".join(self.command) if self.command else "", - ] + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Using {await self._get_singularity_version()}.") + # Prepare volumes + await self._prepare_volumes(self.bind, self.mount) + # Build remote file name + sif_name = ( + self.image + if self.image.endswith(".sif") + else f"{hashlib.sha256(self.image.encode('utf-8')).hexdigest()}.sif" + ) + if not self.pullDir: + if self._wraps_local(): + self._image_file = os.path.join(os.getcwd(), os.path.basename(sif_name)) + else: + stdout, returncode = await self.connector.run( + location=self._inner_location.location, + command=["pwd"], + capture_output=True, + ) + if returncode == 0: + self._image_file = posixpath.join( + stdout.strip(), os.path.basename(sif_name) + ) + else: + raise WorkflowExecutionException( + f"FAILED Deployment of {self.deployment_name} environment:[{returncode}] {stdout}" + ) + else: + self._image_file = (os.path if self._wraps_local() else posixpath).join( + self.pullDir, os.path.basename(sif_name) + ) + # Check if file is present on the remote connector + if self._wraps_local(): + sif_exists = os.path.exists(self._image_file) + else: stdout, returncode = await self.connector.run( location=self._inner_location.location, - command=deploy_command, + command=[ + "test", + "-e", + self._image_file, + ], capture_output=True, ) - if returncode == 0: - self.instanceName = instance_name - else: + if returncode > 1: raise WorkflowExecutionException( - f"FAILED Deployment of {self.deployment_name} environment:" - f"[{returncode}] {stdout}" + f"FAILED Deployment of {self.deployment_name} environment:[{returncode}] {stdout}" ) - elif self.instanceName is None: - raise WorkflowDefinitionException( - f"FAILED Deployment of {self.deployment_name} environment: " - "external Singularity deployments must specify the instanceName of an existing Singularity container" - ) - # Populate instance - await self._populate_instance(self.instanceName) + sif_exists = returncode == 0 + # If the file does not exist, transfer it + if not sif_exists: + # If the image is a .sif file, transfer it + if self.image.endswith(".sif"): + if self._wraps_local(): + os.symlink(self.image, self._image_file) + else: + await self.connector.copy_local_to_remote( + src=self.image, + dst=self._image_file, + locations=[self._inner_location.location], + read_only=True, + ) + # Otherwise, if the image refers to a remote library, pull it + else: + stdout, returncode = await self.connector.run( + location=self._inner_location.location, + command=[ + "singularity", + "pull", + get_option("arch", self.arch), + get_option("dir", self.pullDir), + sif_name, + self.image, + ], + capture_output=True, + ) + if returncode != 0: + raise WorkflowExecutionException( + f"FAILED Deployment of {self.deployment_name} environment:[{returncode}] {stdout}" + ) + # If the deployment is not ephemeral or external + if not self.ephemeral: + # If the deployment is not external, deploy the container + if not external: + instance_name = random_name() + deploy_command = self._build_run_command( + base_command=[ + "singularity", + "instance", + "start", + ], + command=self.command, + name=instance_name, + ) + stdout, returncode = await self.connector.run( + location=self._inner_location.location, + command=deploy_command, + capture_output=True, + ) + if returncode == 0: + self.instanceName = instance_name + else: + raise WorkflowExecutionException( + f"FAILED Deployment of {self.deployment_name} environment:" + f"[{returncode}] {stdout}" + ) + elif self.instanceName is None: + raise WorkflowDefinitionException( + f"FAILED Deployment of {self.deployment_name} environment: " + "external Singularity deployments must specify the instanceName of " + "an existing Singularity container" + ) + # Populate instance + await self._populate_instance(self.instanceName) async def get_available_locations( self, service: str | None = None ) -> MutableMapping[str, AvailableLocation]: - instance = self._instances[self.instanceName] - return { - self.instanceName: AvailableLocation( - name=self.instanceName, - deployment=self.deployment_name, - hostname=instance.address, - hardware=Hardware( - cores=instance.cores, - memory=instance.memory, - storage=instance.volumes, - ), - stacked=True, - wraps=self._inner_location, + if self.ephemeral: + name = f"{self._inner_location.name}/{self.deployment_name}" + return { + name: AvailableLocation( + name=name, + deployment=self.deployment_name, + hostname=self._inner_location.hostname, + stacked=True, + wraps=self._inner_location, + ) + } + else: + return await super()._get_available_location_from_instance( + self.instanceName ) - } @classmethod def get_schema(cls) -> str: @@ -1941,7 +2247,7 @@ def get_schema(cls) -> str: ) async def undeploy(self, external: bool) -> None: - if not external: + if not (external or self.ephemeral): stdout, returncode = await self.connector.run( location=self._inner_location.location, command=["singularity", "instance", "stop", self.instanceName], diff --git a/streamflow/deployment/connector/schemas/docker.json b/streamflow/deployment/connector/schemas/docker.json index 958315fef..bbb9f23bf 100644 --- a/streamflow/deployment/connector/schemas/docker.json +++ b/streamflow/deployment/connector/schemas/docker.json @@ -64,7 +64,7 @@ "items": { "type": "string" }, - "description": "Command to run when deploying the container" + "description": "Command to run when deploying the container. Not supported for ephemeral containers" }, "containerId": { "type": "string", @@ -207,6 +207,11 @@ "uniqueItems": true, "description": "Read in a file of environment variables" }, + "ephemeral": { + "type": "boolean", + "default": false, + "description": "Invoke a fresh container instance for every command in the workflow" + }, "expose": { "type": "array", "items": { diff --git a/streamflow/deployment/connector/schemas/singularity.json b/streamflow/deployment/connector/schemas/singularity.json index 4caa91b1a..ccd670776 100644 --- a/streamflow/deployment/connector/schemas/singularity.json +++ b/streamflow/deployment/connector/schemas/singularity.json @@ -19,6 +19,10 @@ "type": "string", "description": "Apply cgroups from file for container processes (root only)" }, + "arch": { + "type": "string", + "description": "architecture to pull from library" + }, "bind": { "type": "array", "items": { @@ -40,7 +44,7 @@ }, "boot": { "type": "boolean", - "description": "Execute /sbin/init to boot container (root only)" + "description": "Execute /sbin/init to boot container (root only). Not supported for ephemeral containers" }, "cleanenv": { "type": "boolean", @@ -51,7 +55,7 @@ "items": { "type": "string" }, - "description": "Command to run when deploying the container" + "description": "Command to run when deploying the container. Not supported for ephemeral containers" }, "compat": { "type": "boolean", @@ -87,7 +91,7 @@ }, "dns": { "type": "string", - "description": "List of DNS server separated by commas to add in resolv.conf" + "description": "List of DNS server separated by commas to add in resolv.conf. Not supported for ephemeral containers" }, "dockerHost": { "type": "string", @@ -109,6 +113,11 @@ "type": "string", "description": "Pass environment variables from file to contained process" }, + "ephemeral": { + "type": "boolean", + "default": false, + "description": "Invoke a fresh container instance for every command in the workflow" + }, "fakeroot": { "type": "boolean", "description": "Run container in new user namespace as uid 0" @@ -236,12 +245,16 @@ }, "pidFile": { "type": "string", - "description": "Write instance PID to the file with the given name" + "description": "Write instance PID to the file with the given name. Not supported for ephemeral containers" }, "pidsLimit": { "type": "integer", "description": "Limit number of container PIDs, use -1 for unlimited" }, + "pullDir": { + "type": "string", + "description": "Download images to the specific directory" + }, "rocm": { "type": "boolean", "description": "Enable experimental ROCM support" diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index e871034c2..80e3decda 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -1485,13 +1485,12 @@ async def run(self): if name.startswith("__connector__") }, ) - connectors = await asyncio.gather( + await asyncio.gather( *( asyncio.create_task(port.get_connector(self.name)) for port in connector_ports.values() ) ) - connectors = {c.deployment_name: c for c in connectors} # If there are input ports input_ports = { k: v diff --git a/tests/test_schema.py b/tests/test_schema.py index b66b175c3..956c7f37a 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -181,11 +181,11 @@ def test_schema_generation(): """Check that the `streamflow schema` command generates a correct JSON Schema.""" assert ( hashlib.sha256(SfSchema().dump("v1.0", False).encode()).hexdigest() - == "c60eabe4335124cb1a241496ac370667a1525b8ab5847584f8dbf3877419282e" + == "05cbe170d935209aad8c84ea35fd750efeb00caa6c1750e57f253b3669f327fd" ) assert ( hashlib.sha256(SfSchema().dump("v1.0", True).encode()).hexdigest() - == "d62e2dc7b71778c6aa278ee28562bbc1b0a534e286296825162ce56ecd4aeb3c" + == "a5ebdaba98f3ec9dab6f9c74f5681f242026fd7c9c45ccb4d972aefb0803be28" )