From 9ea156c36d4127fe18c6dd866f56a71b2ad177e0 Mon Sep 17 00:00:00 2001 From: GlassOfWhiskey Date: Tue, 9 Sep 2025 00:12:10 +0200 Subject: [PATCH] Check if glob contains wildcards This commit add an explicit check to see if a `glob` CWL option actually contains a wildcard (and needs to be resolved against the filesystem on the destination location) or if it is just a plain string (and only needs to check for existence in the target location). --- streamflow/core/utils.py | 42 ++++++++++++++++++++++ streamflow/cwl/processor.py | 28 ++++++++------- streamflow/cwl/utils.py | 2 +- streamflow/data/remotepath.py | 67 ++++++++++++++++++++--------------- tests/test_cwl_execution.py | 2 +- tests/test_remotepath.py | 20 +++++++++-- tests/test_translator.py | 4 +-- 7 files changed, 117 insertions(+), 48 deletions(-) diff --git a/streamflow/core/utils.py b/streamflow/core/utils.py index ad8ee1eda..116f8952c 100644 --- a/streamflow/core/utils.py +++ b/streamflow/core/utils.py @@ -308,6 +308,48 @@ def get_tag(tokens: Iterable[Token]) -> str: return output_tag +def is_glob( + path: str, +) -> bool: + """ + Check if the argument string is a glob with wildcards. + + :param path: the string to be checked + :return: `True` if the string contains any wildcards, `False` otherwise + """ + DEFAULT = 0 + BRACKET_FIRST = 1 + BRACKET = 2 + BACKSLASH = 3 + + i = 0 + stack = [DEFAULT] + wildcards = ["*", "?"] + while i < len(path): + state = stack[-1] + c = path[i] + + if state == DEFAULT: + if c in wildcards: + return True + elif c == "[": + stack.append(BRACKET_FIRST) + elif c == "\\": + stack.append(BACKSLASH) + elif state == BRACKET_FIRST: + stack.pop() + stack.append(BRACKET) + elif state == BRACKET: + if c == "]": + return True + elif c == "\\": + stack.append(BACKSLASH) + elif state == BACKSLASH: + stack.pop() + i += 1 + return False + + def make_future(obj: T) -> asyncio.Future[T]: future = asyncio.Future() future.set_result(obj) diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index 926cfceca..e9af58fdd 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -21,7 +21,13 @@ ObjectCommandOutputProcessor, TokenProcessor, ) -from streamflow.core.utils import eval_processors, flatten_list, get_tag, make_future +from streamflow.core.utils import ( + eval_processors, + flatten_list, + get_tag, + is_glob, + make_future, +) from streamflow.core.workflow import ( CommandOutput, Job, @@ -469,21 +475,19 @@ async def _process_command_output( for glob in ( self.glob if isinstance(self.glob, MutableSequence) else [self.glob] ): - globpath = ( - utils.eval_expression( - expression=glob, - context=context, - full_js=self.full_js, - expression_lib=self.expression_lib, - ) - if "$(" in glob or "${" in glob - else glob + globpath = utils.eval_expression( + expression=glob, + context=context, + full_js=self.full_js, + expression_lib=self.expression_lib, ) globpaths.extend( globpath if isinstance(globpath, MutableSequence) else [globpath] ) - # Wait for command to finish and resolve glob - await command_output + # If any glob path needs to be resolved, wait for command to finish + if not self.streamable or any(is_glob(globpath) for globpath in globpaths): + await command_output + # Resolve glob paths for location in locations: globpaths = dict( flatten_list( diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index d19c434ac..31025f65b 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -540,7 +540,7 @@ async def expand_glob( outdir = StreamFlowPath( output_directory, context=workflow.context, location=location ) - paths = sorted([p async for p in outdir.glob(path)]) + paths = sorted(await outdir.glob(path)) effective_paths = await asyncio.gather( *(asyncio.create_task(p.resolve()) for p in paths) ) diff --git a/streamflow/data/remotepath.py b/streamflow/data/remotepath.py index e7c94046d..9fde8df97 100644 --- a/streamflow/data/remotepath.py +++ b/streamflow/data/remotepath.py @@ -21,6 +21,7 @@ from streamflow.core.data import DataType from streamflow.core.exception import WorkflowExecutionException +from streamflow.core.utils import is_glob if TYPE_CHECKING: from streamflow.core.context import StreamFlowContext @@ -163,9 +164,10 @@ async def checksum(self) -> str | None: ... async def exists(self) -> bool: ... @abstractmethod - def glob( - self, pattern, *, case_sensitive=None - ) -> AsyncIterator[StreamFlowPath]: ... + async def glob(self, pattern) -> MutableSequence[StreamFlowPath]: ... + + @abstractmethod + def iglob(self, pattern) -> AsyncIterator[StreamFlowPath]: ... @abstractmethod async def is_dir(self) -> bool: ... @@ -325,10 +327,13 @@ async def checksum(self) -> str | None: async def exists(self) -> bool: return cast(Path, super()).exists() - async def glob( + async def glob(self, pattern) -> MutableSequence[LocalStreamFlowPath]: + return [p async for p in self.iglob(pattern)] + + async def iglob( self, pattern, *, case_sensitive=None ) -> AsyncIterator[LocalStreamFlowPath]: - for path in glob.glob(str(self / pattern)): + for path in glob.iglob(str(self / pattern)): yield self.with_segments(path) async def is_dir(self) -> bool: @@ -559,11 +564,12 @@ async def exists(self) -> bool: else: return await self._test(command=(["-e", f"'{self.__str__()}'"])) - async def glob( - self, pattern, *, case_sensitive=None - ) -> AsyncIterator[RemoteStreamFlowPath]: + async def glob(self, pattern) -> MutableSequence[RemoteStreamFlowPath]: + return [p async for p in self.iglob(pattern)] + + async def iglob(self, pattern) -> AsyncIterator[RemoteStreamFlowPath]: if (inner_path := await self._get_inner_path()) != self: - async for path in inner_path.glob(pattern, case_sensitive=case_sensitive): + async for path in inner_path.iglob(pattern): yield _get_outer_path( context=self.context, location=self.location, @@ -572,26 +578,29 @@ async def glob( else: if not pattern: raise ValueError(f"Unacceptable pattern: {pattern!r}") - command = [ - "printf", - '"%s\\0"', - str(self / pattern), - "|", - "xargs", - "-0", - "-I{}", - "sh", - "-c", - '"if [ -e \\"{}\\" ]; then echo \\"{}\\"; fi"', - "|", - "sort", - ] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) - for path in result.split(): - yield self.with_segments(path) + if is_glob(pattern): + command = [ + "printf", + '"%s\\0"', + str(self / pattern), + "|", + "xargs", + "-0", + "-I{}", + "sh", + "-c", + '"if [ -e \\"{}\\" ]; then echo \\"{}\\"; fi"', + "|", + "sort", + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) + for path in result.split(): + yield self.with_segments(path) + elif await (self / pattern).exists(): + yield self.with_segments(self, pattern) async def is_dir(self) -> bool: if (inner_path := await self._get_inner_path()) != self: diff --git a/tests/test_cwl_execution.py b/tests/test_cwl_execution.py index d76aa1770..35cf731c5 100644 --- a/tests/test_cwl_execution.py +++ b/tests/test_cwl_execution.py @@ -187,7 +187,7 @@ async def test_initial_workdir( outdir = StreamFlowPath( job.output_directory, context=context, location=execution_location ) - files_found = [str(f) async for f in outdir.glob("*")] + files_found = [str(f) async for f in outdir.iglob("*")] for out_file, init_path in zip(token.value.split(" "), initial_paths): # Check whether the file has been copied to the job output directory files_found.remove(out_file) diff --git a/tests/test_remotepath.py b/tests/test_remotepath.py index 785758866..2776e5062 100644 --- a/tests/test_remotepath.py +++ b/tests/test_remotepath.py @@ -8,6 +8,7 @@ from streamflow.core import utils from streamflow.core.deployment import Connector, ExecutionLocation from streamflow.core.exception import WorkflowExecutionException +from streamflow.core.utils import is_glob from streamflow.data import remotepath from streamflow.data.remotepath import StreamFlowPath from tests.utils.deployment import get_docker_deployment_config, get_location @@ -148,22 +149,35 @@ async def test_glob(context, connector, location): await (path / "dir1" / "dir2" / "file1.txt").write_text("StreamFlow") await (path / "dir1" / "dir2" / "file2.csv").write_text("StreamFlow") # Test *.txt - result = [p async for p in path.glob("*.txt")] + result = await path.glob("*.txt") assert len(result) == 1 assert path / "file1.txt" in result # Test file* - result = [p async for p in path.glob("file*")] + result = await path.glob("file*") assert len(result) == 2 assert path / "file1.txt" in result assert path / "file2.csv" in result # Test */*.txt - result = [p async for p in path.glob("*/*.txt")] + result = await path.glob("*/*.txt") assert len(result) == 1 assert path / "dir1" / "file1.txt" in result finally: await path.rmtree() +def test_is_glob(): + assert is_glob("*.txt") is True + assert is_glob("?.txt") is True + assert is_glob("test*.txt") is True + assert is_glob("test?.txt") is True + assert is_glob("test[.txt") is False + assert is_glob("test[].txt") is False + assert is_glob("test[]].txt") is True + assert is_glob("test[abc].txt") is True + assert is_glob("test[1-9].txt") is True + assert is_glob("test.txt") is False + + @pytest.mark.asyncio async def test_mkdir_failure(context): """Test on `mkdir` function failure""" diff --git a/tests/test_translator.py b/tests/test_translator.py index f6d1a0abe..cf9f119a5 100644 --- a/tests/test_translator.py +++ b/tests/test_translator.py @@ -240,7 +240,7 @@ async def test_inject_remote_input(context: StreamFlowContext, config: str) -> N if file_type == "Directory": remote_files = sorted( - [p async for p in remote_path.glob("*")], + await remote_path.glob("*"), key=lambda x: os.path.basename(x), ) assert len(remote_files) == 1 @@ -251,7 +251,7 @@ async def test_inject_remote_input(context: StreamFlowContext, config: str) -> N assert len(wf_files) == 1 else: remote_files = sorted( - [p async for p in remote_path.parent.glob("*")], + await remote_path.parent.glob("*"), key=lambda x: os.path.basename(x), ) assert len(remote_files) == 2