Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions streamflow/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,48 @@ def get_tag(tokens: Iterable[Token]) -> str:
return output_tag


def is_glob(
path: str,
) -> bool:
"""
Check if the argument string is a glob with wildcards.

:param path: the string to be checked
:return: `True` if the string contains any wildcards, `False` otherwise
"""
DEFAULT = 0
BRACKET_FIRST = 1
BRACKET = 2
BACKSLASH = 3

i = 0
stack = [DEFAULT]
wildcards = ["*", "?"]
while i < len(path):
state = stack[-1]
c = path[i]

if state == DEFAULT:
if c in wildcards:
return True
elif c == "[":
stack.append(BRACKET_FIRST)
elif c == "\\":
stack.append(BACKSLASH)
elif state == BRACKET_FIRST:
stack.pop()
stack.append(BRACKET)
elif state == BRACKET:
if c == "]":
return True
elif c == "\\":
stack.append(BACKSLASH)
elif state == BACKSLASH:
stack.pop()
i += 1
return False


def make_future(obj: T) -> asyncio.Future[T]:
future = asyncio.Future()
future.set_result(obj)
Expand Down
28 changes: 16 additions & 12 deletions streamflow/cwl/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
ObjectCommandOutputProcessor,
TokenProcessor,
)
from streamflow.core.utils import eval_processors, flatten_list, get_tag, make_future
from streamflow.core.utils import (
eval_processors,
flatten_list,
get_tag,
is_glob,
make_future,
)
from streamflow.core.workflow import (
CommandOutput,
Job,
Expand Down Expand Up @@ -469,21 +475,19 @@ async def _process_command_output(
for glob in (
self.glob if isinstance(self.glob, MutableSequence) else [self.glob]
):
globpath = (
utils.eval_expression(
expression=glob,
context=context,
full_js=self.full_js,
expression_lib=self.expression_lib,
)
if "$(" in glob or "${" in glob
else glob
globpath = utils.eval_expression(
expression=glob,
context=context,
full_js=self.full_js,
expression_lib=self.expression_lib,
)
globpaths.extend(
globpath if isinstance(globpath, MutableSequence) else [globpath]
)
# Wait for command to finish and resolve glob
await command_output
# If any glob path needs to be resolved, wait for command to finish
if not self.streamable or any(is_glob(globpath) for globpath in globpaths):
await command_output
# Resolve glob paths
for location in locations:
globpaths = dict(
flatten_list(
Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ async def expand_glob(
outdir = StreamFlowPath(
output_directory, context=workflow.context, location=location
)
paths = sorted([p async for p in outdir.glob(path)])
paths = sorted(await outdir.glob(path))
effective_paths = await asyncio.gather(
*(asyncio.create_task(p.resolve()) for p in paths)
)
Expand Down
67 changes: 38 additions & 29 deletions streamflow/data/remotepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from streamflow.core.data import DataType
from streamflow.core.exception import WorkflowExecutionException
from streamflow.core.utils import is_glob

if TYPE_CHECKING:
from streamflow.core.context import StreamFlowContext
Expand Down Expand Up @@ -163,9 +164,10 @@ async def checksum(self) -> str | None: ...
async def exists(self) -> bool: ...

@abstractmethod
def glob(
self, pattern, *, case_sensitive=None
) -> AsyncIterator[StreamFlowPath]: ...
async def glob(self, pattern) -> MutableSequence[StreamFlowPath]: ...

@abstractmethod
def iglob(self, pattern) -> AsyncIterator[StreamFlowPath]: ...

@abstractmethod
async def is_dir(self) -> bool: ...
Expand Down Expand Up @@ -325,10 +327,13 @@ async def checksum(self) -> str | None:
async def exists(self) -> bool:
return cast(Path, super()).exists()

async def glob(
async def glob(self, pattern) -> MutableSequence[LocalStreamFlowPath]:
return [p async for p in self.iglob(pattern)]

async def iglob(
self, pattern, *, case_sensitive=None
) -> AsyncIterator[LocalStreamFlowPath]:
for path in glob.glob(str(self / pattern)):
for path in glob.iglob(str(self / pattern)):
yield self.with_segments(path)

async def is_dir(self) -> bool:
Expand Down Expand Up @@ -559,11 +564,12 @@ async def exists(self) -> bool:
else:
return await self._test(command=(["-e", f"'{self.__str__()}'"]))

async def glob(
self, pattern, *, case_sensitive=None
) -> AsyncIterator[RemoteStreamFlowPath]:
async def glob(self, pattern) -> MutableSequence[RemoteStreamFlowPath]:
return [p async for p in self.iglob(pattern)]

async def iglob(self, pattern) -> AsyncIterator[RemoteStreamFlowPath]:
if (inner_path := await self._get_inner_path()) != self:
async for path in inner_path.glob(pattern, case_sensitive=case_sensitive):
async for path in inner_path.iglob(pattern):
yield _get_outer_path(
context=self.context,
location=self.location,
Expand All @@ -572,26 +578,29 @@ async def glob(
else:
if not pattern:
raise ValueError(f"Unacceptable pattern: {pattern!r}")
command = [
"printf",
'"%s\\0"',
str(self / pattern),
"|",
"xargs",
"-0",
"-I{}",
"sh",
"-c",
'"if [ -e \\"{}\\" ]; then echo \\"{}\\"; fi"',
"|",
"sort",
]
result, status = await self.connector.run(
location=self.location, command=command, capture_output=True
)
_check_status(command, self.location, result, status)
for path in result.split():
yield self.with_segments(path)
if is_glob(pattern):
command = [
"printf",
'"%s\\0"',
str(self / pattern),
"|",
"xargs",
"-0",
"-I{}",
"sh",
"-c",
'"if [ -e \\"{}\\" ]; then echo \\"{}\\"; fi"',
"|",
"sort",
]
result, status = await self.connector.run(
location=self.location, command=command, capture_output=True
)
_check_status(command, self.location, result, status)
for path in result.split():
yield self.with_segments(path)
elif await (self / pattern).exists():
yield self.with_segments(self, pattern)

async def is_dir(self) -> bool:
if (inner_path := await self._get_inner_path()) != self:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cwl_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async def test_initial_workdir(
outdir = StreamFlowPath(
job.output_directory, context=context, location=execution_location
)
files_found = [str(f) async for f in outdir.glob("*")]
files_found = [str(f) async for f in outdir.iglob("*")]
for out_file, init_path in zip(token.value.split(" "), initial_paths):
# Check whether the file has been copied to the job output directory
files_found.remove(out_file)
Expand Down
20 changes: 17 additions & 3 deletions tests/test_remotepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from streamflow.core import utils
from streamflow.core.deployment import Connector, ExecutionLocation
from streamflow.core.exception import WorkflowExecutionException
from streamflow.core.utils import is_glob
from streamflow.data import remotepath
from streamflow.data.remotepath import StreamFlowPath
from tests.utils.deployment import get_docker_deployment_config, get_location
Expand Down Expand Up @@ -148,22 +149,35 @@ async def test_glob(context, connector, location):
await (path / "dir1" / "dir2" / "file1.txt").write_text("StreamFlow")
await (path / "dir1" / "dir2" / "file2.csv").write_text("StreamFlow")
# Test *.txt
result = [p async for p in path.glob("*.txt")]
result = await path.glob("*.txt")
assert len(result) == 1
assert path / "file1.txt" in result
# Test file*
result = [p async for p in path.glob("file*")]
result = await path.glob("file*")
assert len(result) == 2
assert path / "file1.txt" in result
assert path / "file2.csv" in result
# Test */*.txt
result = [p async for p in path.glob("*/*.txt")]
result = await path.glob("*/*.txt")
assert len(result) == 1
assert path / "dir1" / "file1.txt" in result
finally:
await path.rmtree()


def test_is_glob():
assert is_glob("*.txt") is True
assert is_glob("?.txt") is True
assert is_glob("test*.txt") is True
assert is_glob("test?.txt") is True
assert is_glob("test[.txt") is False
assert is_glob("test[].txt") is False
assert is_glob("test[]].txt") is True
assert is_glob("test[abc].txt") is True
assert is_glob("test[1-9].txt") is True
assert is_glob("test.txt") is False


@pytest.mark.asyncio
async def test_mkdir_failure(context):
"""Test on `mkdir` function failure"""
Expand Down
4 changes: 2 additions & 2 deletions tests/test_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async def test_inject_remote_input(context: StreamFlowContext, config: str) -> N

if file_type == "Directory":
remote_files = sorted(
[p async for p in remote_path.glob("*")],
await remote_path.glob("*"),
key=lambda x: os.path.basename(x),
)
assert len(remote_files) == 1
Expand All @@ -251,7 +251,7 @@ async def test_inject_remote_input(context: StreamFlowContext, config: str) -> N
assert len(wf_files) == 1
else:
remote_files = sorted(
[p async for p in remote_path.parent.glob("*")],
await remote_path.parent.glob("*"),
key=lambda x: os.path.basename(x),
)
assert len(remote_files) == 2
Expand Down
Loading