diff --git a/streamflow/core/utils.py b/streamflow/core/utils.py index 54030a9f7..1664538b8 100644 --- a/streamflow/core/utils.py +++ b/streamflow/core/utils.py @@ -7,11 +7,14 @@ import os import posixpath import shlex +import sys import uuid from collections.abc import Iterable, MutableMapping, MutableSequence from pathlib import PurePosixPath from typing import TYPE_CHECKING, Any +import mslex + from streamflow.core.exception import ProcessorTypeError, WorkflowExecutionException from streamflow.core.persistence import PersistableEntity from streamflow.log_handler import logger @@ -126,6 +129,27 @@ def create_command( ) +if sys.platform == "win32": + + def create_shell_command(command: MutableSequence[str], local: bool) -> list[str]: + cmd = " ".join(command) + return ( + ["cmd", "/C", quote(value=cmd, local=local)] + if local + else ["sh", "-c", shlex.quote(cmd)] + ) + +else: + + def create_shell_command(command: MutableSequence[str], local: bool) -> list[str]: + cmd = " ".join(command) + return ( + [os.environ.get("SHELL", "sh"), "-c", quote(value=cmd, local=local)] + if local + else ["sh", "-c", shlex.quote(cmd)] + ) + + def get_job_step_name(job_name: str) -> str: return PurePosixPath(job_name).parent.as_posix() @@ -211,7 +235,7 @@ async def get_local_to_remote_destination( ) -> str: is_dst_dir, status = await dst_connector.run( location=dst_location, - command=[f'test -d "{dst}"'], + command=["test", "-d", shlex.quote(dst)], capture_output=True, ) if status > 1: @@ -254,21 +278,21 @@ async def get_remote_to_remote_write_command( ) -> MutableSequence[str]: is_dst_dir, status = await dst_connector.run( location=dst_locations[0], - command=[f'test -d "{dst}"'], + command=["test", "-d", shlex.quote(dst)], capture_output=True, ) if status > 1: raise WorkflowExecutionException(is_dst_dir) # If destination path exists and is a directory elif status == 0: - return ["tar", "xf", "-", "-C", dst] + return ["tar", "xf", "-", "-C", shlex.quote(dst)] # Otherwise, if destination path does not exist else: # If basename must be renamed during transfer if posixpath.basename(src) != posixpath.basename(dst): is_src_dir, status = await src_connector.run( location=src_location, - command=[f'test -d "{src}"'], + command=["test", "-d", shlex.quote(src)], capture_output=True, ) if status > 1: @@ -279,19 +303,44 @@ async def get_remote_to_remote_write_command( *( asyncio.create_task( dst_connector.run( - location=dst_location, command=["mkdir", "-p", dst] + location=dst_location, + command=["mkdir", "-p", shlex.quote(dst)], ) ) for dst_location in dst_locations ) ) - return ["tar", "xf", "-", "-C", dst, "--strip-components", "1"] + return [ + "tar", + "xf", + "-", + "-C", + shlex.quote(dst), + "--strip-components", + "1", + ] # Otherwise, if source path is a file else: - return ["tar", "xf", "-", "-O", "|", "tee", dst, ">", "/dev/null"] + return [ + "tar", + "xf", + "-", + "-O", + "|", + "tee", + shlex.quote(dst), + ">", + "/dev/null", + ] # Otherwise, if basename must be preserved else: - return ["tar", "xf", "-", "-C", posixpath.dirname(dst)] + return [ + "tar", + "xf", + "-", + "-C", + shlex.quote(posixpath.dirname(dst)), + ] def get_tag(tokens: Iterable[Token]) -> str: @@ -363,5 +412,16 @@ async def run_in_subprocess( return None +if sys.platform == "win32": + + def quote(value: str, local: bool) -> str: + return mslex.quote(value) if local else shlex.quote(value) + +else: + + def quote(value: str, local: bool) -> str: + return shlex.quote(value) + + def wrap_command(command: str) -> list[str]: return ["/bin/sh", "-c", f"{command}"] diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index 63fde0b9a..8bedf8ed4 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -1,11 +1,9 @@ from __future__ import annotations import asyncio -import base64 import json import logging import posixpath -import shlex import time from asyncio.subprocess import STDOUT from collections.abc import MutableMapping, MutableSequence @@ -30,7 +28,7 @@ MapCommandOutputProcessor, UnionCommandOutputProcessor, ) -from streamflow.core.utils import flatten_list +from streamflow.core.utils import create_shell_command, flatten_list, quote from streamflow.core.workflow import ( Command, CommandOptions, @@ -226,11 +224,11 @@ def _build_command_output_processor( ) -def _escape_value(value: Any) -> Any: +def _escape_value(value: Any, local: bool) -> Any: if isinstance(value, MutableSequence): - return [_escape_value(v) for v in value] + return [_escape_value(value=v, local=local) for v in value] else: - return shlex.quote(_get_value_repr(value)) + return quote(value=_get_value_repr(value), local=local) async def _get_source_location( @@ -710,17 +708,23 @@ async def _load( ) def _get_executable_command( - self, context: MutableMapping[str, Any], inputs: MutableMapping[str, Token] + self, + context: MutableMapping[str, Any], + inputs: MutableMapping[str, Token], + local: bool, ) -> MutableSequence[str]: - command = [] options = CWLCommandOptions( context=context, expression_lib=self.expression_lib, full_js=self.full_js, + local=local, ) # Process baseCommand - if self.base_command: - command.append(shlex.join(self.base_command)) + command = ( + [quote(cmd, local=options.local) for cmd in self.base_command] + if self.base_command + else [] + ) # Process tokens bindings = ListCommandToken(name=None, position=None, value=[]) for processor in self.processors: @@ -807,8 +811,14 @@ async def execute(self, job: Job) -> CWLCommandOutput: ) else: inputs = job.inputs + # Get execution target + connector = self.step.workflow.context.scheduler.get_connector(job.name) + locations = self.step.workflow.context.scheduler.get_locations(job.name) + local = all(loc.local for loc in locations) # Build command string - cmd = self._get_executable_command(context, inputs) + cmd = self._get_executable_command(context=context, inputs=inputs, local=local) + if self.is_shell_command: + cmd = create_shell_command(cmd, local=local) # Build environment variables parsed_env = { k: str( @@ -825,24 +835,14 @@ async def execute(self, job: Job) -> CWLCommandOutput: parsed_env["HOME"] = job.output_directory if "TMPDIR" not in parsed_env: parsed_env["TMPDIR"] = job.tmp_directory - # Get execution target - connector = self.step.workflow.context.scheduler.get_connector(job.name) - locations = self.step.workflow.context.scheduler.get_locations(job.name) - cmd_string = " \\\n\t".join( - ["/bin/sh", "-c", '"{cmd}"'.format(cmd=" ".join(cmd))] - if self.is_shell_command - else cmd - ) + # Log and persist command + cmd_string = " \\\n\t".join(cmd) if logger.isEnabledFor(logging.INFO): logger.info( "EXECUTING step {step} (job {job}) {location} into directory {outdir}:\n{command}".format( step=self.step.name, job=job.name, - location=( - "locally" - if locations[0].local - else f"on location {locations[0]}" - ), + location=("locally" if local else f"on location {locations[0]}"), outdir=job.output_directory, command=cmd_string, ) @@ -856,17 +856,6 @@ async def execute(self, job: Job) -> CWLCommandOutput: job_token_id=job_token.persistent_id, cmd=cmd_string, ) - # Escape shell command when needed - if self.is_shell_command: - cmd = [ - "/bin/sh", - "-c", - '"$(echo {command} | base64 -d)"'.format( - command=base64.b64encode(" ".join(cmd).encode("utf-8")).decode( - "utf-8" - ) - ), - ] # If step is assigned to multiple locations, add the STREAMFLOW_HOSTS environment variable if len(locations) > 1 and ( hostnames := [loc.hostname for loc in locations if loc.hostname is not None] @@ -979,17 +968,19 @@ async def execute(self, job: Job) -> CWLCommandOutput: class CWLCommandOptions(CommandOptions): - __slots__ = ("context", "expression_lib", "full_js") + __slots__ = ("context", "expression_lib", "full_js", "local") def __init__( self, context: MutableMapping[str, Any], expression_lib: MutableSequence[str] | None = None, full_js: bool = False, + local: bool = False, ): self.context: MutableMapping[str, Any] = context self.expression_lib: MutableSequence[str] | None = expression_lib self.full_js: bool = full_js + self.local: bool = local class CWLCommandTokenProcessor(CommandTokenProcessor): @@ -1075,7 +1066,7 @@ def bind( value = [value] # Process shell escape only on the single command token if not self.is_shell_command or self.shell_quote: - value = [_escape_value(v) for v in value] + value = [_escape_value(value=v, local=options.local) for v in value] # Obtain token position if isinstance(self.position, str) and not self.position.isnumeric(): position = utils.eval_expression( @@ -1219,6 +1210,7 @@ def _update_options( | {"inputs": {self.name: get_token_value(token)}}, expression_lib=options.expression_lib, full_js=options.full_js, + local=options.local, ) @@ -1236,6 +1228,7 @@ def _update_options( | {"inputs": {self.name: value}, "self": value}, expression_lib=options.expression_lib, full_js=options.full_js, + local=options.local, ) diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index 272019556..ec49e4f83 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -559,7 +559,7 @@ async def _process_command_output( if self.target else job.tmp_directory ), - path=cast(str, path), + path=path, ) ) for path in globpaths diff --git a/streamflow/data/remotepath.py b/streamflow/data/remotepath.py index 7590e9caf..6154f7e36 100644 --- a/streamflow/data/remotepath.py +++ b/streamflow/data/remotepath.py @@ -113,9 +113,9 @@ async def _size( [ "find -L ", ( - " ".join([f'"{p}"' for p in path]) + shlex.join(path) if isinstance(path, MutableSequence) - else f'"{path}"' + else shlex.quote(path) ), " -type f -exec ls -ln {} \\+ | ", "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", @@ -651,7 +651,7 @@ async def glob( location=self.location, command=command, capture_output=True ) _check_status(command, self.location, result, status) - for path in result.split(): + for path in result.split("\n"): yield self.with_segments(path) async def is_dir(self) -> bool: @@ -678,7 +678,7 @@ async def mkdir( command = ["mkdir", "-m", f"{mode:o}"] if parents or exist_ok: command.append("-p") - command.append(self.__str__()) + command.append(shlex.quote(self.__str__())) result, status = await self.connector.run( location=self.location, command=command, capture_output=True ) @@ -735,7 +735,7 @@ async def rmtree(self) -> None: if (inner_path := await self._get_inner_path()) != self: await inner_path.rmtree() else: - command = ["rm", "-rf", self.__str__()] + command = ["rm", "-rf", shlex.quote(self.__str__())] result, status = await self.connector.run( location=self.location, command=command, capture_output=True ) @@ -746,14 +746,19 @@ async def size(self) -> int: return await inner_path.size() else: command = [ - "".join( - [ - "find -L ", - f'"{self.__str__()}"', - " -type f -exec ls -ln {} \\+ | ", - "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", - ] - ) + "find", + "-L", + shlex.quote(self.__str__()), + "-type", + "f", + "-exec", + "ls", + "-ln", + "{}", + "\\+", + "|", + "awk", + "'BEGIN {sum=0} {sum+=$5} END {print sum}';", ] result, status = await self.connector.run( location=self.location, command=command, capture_output=True @@ -768,7 +773,12 @@ async def symlink_to( if (inner_path := await self._get_inner_path()) != self: await inner_path.symlink_to(target, target_is_directory=target_is_directory) else: - command = ["ln", "-snf", str(target), self.__str__()] + command = [ + "ln", + "-snf", + shlex.quote(str(target)), + shlex.quote(self.__str__()), + ] result, status = await self.connector.run( location=self.location, command=command, capture_output=True ) @@ -778,7 +788,12 @@ async def hardlink_to(self, target: str | os.PathLike[str]) -> None: if (inner_path := await self._get_inner_path()) != self: await inner_path.hardlink_to(target) else: - command = ["ln", "-nf", str(target), self.__str__()] + command = [ + "ln", + "-nf", + shlex.quote(str(target)), + shlex.quote(self.__str__()), + ] result, status = await self.connector.run( location=self.location, command=command, capture_output=True ) @@ -872,7 +887,8 @@ async def write_text(self, data: str, **kwargs: str | None) -> int: if not isinstance(data, str): raise TypeError("data must be str, not %s" % data.__class__.__name__) async with await self.connector.get_stream_writer( - command=["tee", str(self), ">", "/dev/null"], location=self.location + command=["tee", shlex.quote(str(self)), ">", "/dev/null"], + location=self.location, ) as writer: reader = io.BytesIO(data.encode("utf-8")) while content := reader.read(self.connector.transferBufferSize): @@ -918,8 +934,8 @@ async def download( await connector.run( location=location, command=[ - f'if [ command -v curl ]; then curl -L -o "{filepath}" "{url}"; ' - f'else wget -O "{filepath}" "{url}"; fi' + f"if [ command -v curl ]; then curl -L -o {shlex.quote(filepath)} {shlex.quote(url)}; " + f'else wget -O {shlex.quote(filepath)} "{shlex.quote(url)}"; fi' ], ) return StreamFlowPath(filepath, context=context, location=location) diff --git a/streamflow/deployment/connector/base.py b/streamflow/deployment/connector/base.py index 58134cf3f..46aa04b32 100644 --- a/streamflow/deployment/connector/base.py +++ b/streamflow/deployment/connector/base.py @@ -173,7 +173,13 @@ async def copy_remote_to_remote( ) # Build reader and writer commands if reader_command is None: - reader_command = ["tar", "chf", "-", "-C", *posixpath.split(src)] + reader_command = [ + "tar", + "chf", + "-", + "-C", + *(shlex.quote(path) for path in posixpath.split(src)), + ] if writer_command is None: writer_command = await utils.get_remote_to_remote_write_command( src_connector=source_connector, @@ -418,7 +424,13 @@ async def copy_remote_to_local( location=location, src=src, dst=dst, - reader_command=["tar", "chf", "-", "-C", *posixpath.split(src)], + reader_command=[ + "tar", + "chf", + "-", + "-C", + *(shlex.quote(path) for path in posixpath.split(src)), + ], ) async def copy_remote_to_remote( diff --git a/streamflow/deployment/connector/container.py b/streamflow/deployment/connector/container.py index 2a218e550..89cd6eef2 100644 --- a/streamflow/deployment/connector/container.py +++ b/streamflow/deployment/connector/container.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import base64 import json import logging import os @@ -349,7 +348,13 @@ async def copy_remote_to_local( location=location, src=src, dst=dst, - reader_command=["tar", "chf", "-", "-C", *posixpath.split(src)], + reader_command=[ + "tar", + "chf", + "-", + "-C", + *(shlex.quote(path) for path in posixpath.split(src)), + ], ) async def copy_remote_to_remote( @@ -589,12 +594,9 @@ async def get_stream_reader( async def get_stream_writer( self, command: MutableSequence[str], location: ExecutionLocation ) -> AsyncContextManager[StreamWrapper]: - encoded_command = base64.b64encode(" ".join(command).encode("utf-8")).decode( - "utf-8" - ) return await self.connector.get_stream_writer( command=self._get_run_command( - command=f"eval $(echo {encoded_command} | base64 -d)", + command=" ".join(command), location=location, interactive=True, ), diff --git a/streamflow/deployment/connector/kubernetes.py b/streamflow/deployment/connector/kubernetes.py index 6c8a4cb65..21aa54d51 100644 --- a/streamflow/deployment/connector/kubernetes.py +++ b/streamflow/deployment/connector/kubernetes.py @@ -67,7 +67,10 @@ def _check_helm_installed() -> None: async def _get_helm_version() -> str: proc = await asyncio.create_subprocess_exec( - *shlex.split("helm version --template '{{.Version}}'"), + "helm", + "version", + "--template", + "'{{.Version}}'", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) @@ -184,7 +187,8 @@ async def _close(self) -> None: break else: raise WorkflowExecutionException( - f"Kubernetes connection terminated with status {result['status']}." + f"Kubernetes connection terminated with status {result['status']}: " + f"{result.get('reason', result.get( 'message', '' ))}" ) async def read(self, size: int | None = None) -> bytes | None: @@ -555,7 +559,7 @@ async def run( ) command = ( ["sh", "-c"] - + [f"{k}={v}" for k, v in location.environment.items()] + + [f"{k}={shlex.quote(v)}" for k, v in location.environment.items()] + [command] ) pod, container = location.name.split(":") diff --git a/streamflow/deployment/connector/local.py b/streamflow/deployment/connector/local.py index f1bf86c6c..17c5d7218 100644 --- a/streamflow/deployment/connector/local.py +++ b/streamflow/deployment/connector/local.py @@ -4,19 +4,18 @@ import errno import logging import os -import shlex import shutil import sys import tempfile from collections.abc import MutableMapping, MutableSequence from importlib.resources import files -import mslex import psutil from streamflow.core import utils from streamflow.core.deployment import Connector, ExecutionLocation from streamflow.core.scheduling import AvailableLocation, Hardware, Storage +from streamflow.core.utils import create_shell_command from streamflow.deployment.connector.base import FS_TYPES_TO_SKIP, BaseConnector from streamflow.log_handler import logger @@ -75,15 +74,6 @@ def __init__( storage=storage, ) - def _get_shell(self) -> str: - match sys.platform: - case "win32": - return "cmd" - case "darwin": - return "bash" - case _: - return "sh" - async def copy_local_to_remote( self, src: str, @@ -183,15 +173,7 @@ async def run( ) return await utils.run_in_subprocess( location=location, - command=[ - self._get_shell(), - "/C" if sys.platform == "win32" else "-c", - ( - mslex.quote(command) - if sys.platform == "win32" - else shlex.quote(command) - ), - ], + command=create_shell_command(command=[command], local=True), capture_output=capture_output, timeout=timeout, ) diff --git a/streamflow/deployment/connector/queue_manager.py b/streamflow/deployment/connector/queue_manager.py index 4ee4c5361..43dc20802 100644 --- a/streamflow/deployment/connector/queue_manager.py +++ b/streamflow/deployment/connector/queue_manager.py @@ -801,7 +801,9 @@ async def _run_batch_command( "|", ] if environment: - batch_command.extend([f"{k}={v}" for k, v in environment.items()]) + batch_command.extend( + [f"{k}={shlex.quote(v)}" for k, v in environment.items()] + ) batch_command.extend( [ "sbatch", @@ -1012,7 +1014,9 @@ async def _run_batch_command( ] ) if environment: - batch_command.extend([f"{k}={v}" for k, v in environment.items()]) + batch_command.extend( + [f"{k}={shlex.quote(v)}" for k, v in environment.items()] + ) batch_command.extend( [ "qsub", @@ -1211,7 +1215,9 @@ async def _run_batch_command( "|", ] if environment: - batch_command.extend([f"{k}={v}" for k, v in environment.items()]) + batch_command.extend( + [f"{k}={shlex.quote(v)}" for k, v in environment.items()] + ) batch_command.extend( [ "flux", diff --git a/tests/test_remotepath.py b/tests/test_remotepath.py index d0d52b487..b742a7d8b 100644 --- a/tests/test_remotepath.py +++ b/tests/test_remotepath.py @@ -45,19 +45,22 @@ async def test_directory( # ./ # file1.txt # file2.csv + # file with spaces.txt # dir1/ # dir2/ await (path / "dir1").mkdir(mode=0o777) await (path / "dir2").mkdir(mode=0o777) await (path / "file1.txt").write_text("StreamFlow") await (path / "file2.csv").write_text("StreamFlow") + await (path / "file with spaces.txt").write_text("StreamFlow") async for dirpath, dirnames, filenames in path.walk(follow_symlinks=True): assert len(dirnames) == 2 assert "dir1" in dirnames assert "dir2" in dirnames - assert len(filenames) == 2 + assert len(filenames) == 3 assert "file1.txt" in filenames assert "file2.csv" in filenames + assert "file with spaces.txt" in filenames break await path.rmtree() assert not await path.exists() @@ -149,6 +152,7 @@ async def test_glob( # dir1/ # file1.txt # file2.csv + # file with spaces.txt # dir2/ # file1.txt # file2.csv @@ -157,6 +161,7 @@ async def test_glob( await (path / "dir1" / "dir2").mkdir(mode=0o777, parents=True) await (path / "dir1" / "file1.txt").write_text("StreamFlow") await (path / "dir1" / "file2.csv").write_text("StreamFlow") + await (path / "dir1" / "file with spaces.txt").write_text("StreamFlow") await (path / "dir1" / "dir2" / "file1.txt").write_text("StreamFlow") await (path / "dir1" / "dir2" / "file2.csv").write_text("StreamFlow") # Test *.txt @@ -170,8 +175,9 @@ async def test_glob( assert path / "file2.csv" in result # Test */*.txt result = [p async for p in path.glob("*/*.txt")] - assert len(result) == 1 + assert len(result) == 2 assert path / "dir1" / "file1.txt" in result + assert path / "dir1" / "file with spaces.txt" in result finally: await path.rmtree()