Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion streamflow/cwl/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,6 @@ async def execute(self, job: Job) -> CWLCommandOutput:
inputs=job.inputs,
streamflow_context=self.step.workflow.context,
token_value=context["inputs"][key],
job=job,
)
)
for key in job.inputs.keys()
Expand Down
9 changes: 6 additions & 3 deletions streamflow/cwl/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ async def _update_listing(
deployment=src_location.deployment,
location_name=src_location.name,
):
# adjust the path
# Adjust the path
existing.append(
utils.remap_token_value(
path_processor=get_path_processor(
Expand Down Expand Up @@ -570,7 +570,10 @@ async def _update_file_token(
token_value: MutableMapping[str, Any],
dst_path: StreamFlowPath | None = None,
) -> MutableMapping[str, Any]:
token_class = utils.get_token_class(token_value)
if (token_class := utils.get_token_class(token_value)) is None:
raise WorkflowExecutionException(
f"Step {self.name} (job {job.name}) is processing an invalid token."
)
# Get destination coordinates
dst_connector = self.workflow.context.scheduler.get_connector(job.name)
dst_locations = self.workflow.context.scheduler.get_locations(job.name)
Expand Down Expand Up @@ -626,7 +629,7 @@ async def _update_file_token(
except FileExistsError:
pass
# Transform token value
new_token_value = {
new_token_value: MutableMapping[str, Any] = {
"class": token_class,
"path": str(filepath),
"location": "file://" + str(filepath),
Expand Down
17 changes: 10 additions & 7 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def _get_listing(
loc_path = StreamFlowPath(dirpath, context=context, location=location)
async for dirpath, dirnames, filenames in loc_path.walk(follow_symlinks=True):
for dirname in dirnames:
directory = dirpath / dirname
directory = loc_path / dirname
if str(directory) not in listing_tokens:
load_listing = (
LoadListing.deep_listing
Expand All @@ -158,7 +158,7 @@ async def _get_listing(
)
)
for filename in filenames:
file = dirpath / filename
file = loc_path / filename
if str(file) not in listing_tokens:
listing_tokens[str(file)] = asyncio.create_task(
get_file_token( # nosec
Expand Down Expand Up @@ -198,8 +198,8 @@ async def _process_secondary_file(
case MutableMapping():
filepath = get_path_from_token(secondary_file)
for location in locations:
if await (
path := StreamFlowPath(filepath, context=context, location=location)
if await StreamFlowPath(
filepath, context=context, location=location
).exists():
return await get_file_token(
context=context,
Expand Down Expand Up @@ -1218,6 +1218,8 @@ def remap_token_value(
old_dir=old_dir,
new_dir=new_dir,
)
if "dirname" in value:
value["dirname"] = new_dir
if "secondaryFiles" in value:
value["secondaryFiles"] = [
remap_token_value(path_processor, old_dir, new_dir, sf)
Expand Down Expand Up @@ -1387,10 +1389,11 @@ async def update_file_token(
elif not load_contents and "contents" in new_token_value:
del new_token_value["contents"]
# Process listings
if get_token_class(new_token_value) == "Directory" and load_listing is not None:
if get_token_class(new_token_value) == "Directory":
# If load listing is set to `no_listing`, remove the listing entries in present
if load_listing == LoadListing.no_listing and "listing" in new_token_value:
del new_token_value["listing"]
if load_listing is None or load_listing == LoadListing.no_listing:
if "listing" in new_token_value:
del new_token_value["listing"]
# If listing is not present or if the token needs a deep listing, process directory contents
elif (
"listing" not in new_token_value
Expand Down
Loading