Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/job_attachments_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Job attachments uses your configured S3 bucket as a [content-addressable storage](https://en.wikipedia.org/wiki/Content-addressable_storage), which creates a snapshot of the files used in your job submission in [asset manifests](#asset-manifests), only uploading files that aren't already in S3. This saves you time and bandwidth when iterating on jobs. When an [AWS Deadline Cloud worker agent][worker-agent] starts working on a job with job attachments, it recreates the file system snapshot in the worker agent session directory, and uploads any outputs back to your S3 bucket.

You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor].
You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. The command supports `--include` glob patterns and relative paths for downloading specific files or directories.

Job attachments also works as an auxiliary storage when used with [AWS Deadline Cloud storage profiles][shared-storage], allowing you to flexibly upload files to your Amazon S3 bucket that aren't on your configured shared storage.

Expand Down
27 changes: 27 additions & 0 deletions src/deadline/client/cli/_groups/_job_download_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import posixpath
from configparser import ConfigParser
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional

import click
Expand Down Expand Up @@ -42,6 +43,13 @@
JSON_MSG_TYPE_PROGRESS = "progress"


class MatchPathsBy(str, Enum):
"""Which paths --include filters are matched against."""

JOB = "JOB"
LOCAL = "LOCAL"


@dataclass
class ResolvedStorageProfiles:
"""The result of resolving storage profiles for a download operation."""
Expand Down Expand Up @@ -305,3 +313,22 @@ def _on_progress_json(download_metadata: ProgressReportMetadata) -> bool:
return True

return _do_download(on_downloading_files=_on_progress_json)


def _normalize_filters(filters: list[str]) -> list[str]:
"""
Normalizes path filter patterns.
- Converts backslashes to forward slashes (Windows compatibility)
- Strips leading './'
- Normalizes '//' to '/'
"""
normalized = []
for f in filters:
f = f.replace("\\", "/")
if f.startswith("./"):
f = f[2:]
while "//" in f:
f = f.replace("//", "/")
if f:
normalized.append(f)
return normalized
51 changes: 50 additions & 1 deletion src/deadline/client/cli/_groups/job_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@
)
from ._job_download_helpers import (
JSON_MSG_TYPE_PROGRESS,
MatchPathsBy,
_download_mapped_manifests,
_normalize_filters,
_resolve_conflict_resolution,
_resolve_storage_profiles,
_transform_manifests_to_absolute_paths,
)
from ....job_attachments._path_mapping import _generate_path_mapping_rules
from ....job_attachments.download import (
OutputDownloader,
_filter_manifests,
get_output_manifests_by_asset_root,
)

Expand Down Expand Up @@ -499,6 +502,8 @@ def _download_job_output(
task_id: Optional[str],
is_json_format: bool = False,
ignore_storage_profiles: bool = False,
include_patterns: Optional[list[str]] = None,
match_paths_by: MatchPathsBy = MatchPathsBy.LOCAL,
):
"""
Starts the download of job output and handles the progress reporting callback.
Expand Down Expand Up @@ -548,6 +553,8 @@ def _download_job_output(
for manifest in job_attachments_manifests:
root_path_format_mapping[manifest["rootPath"]] = manifest["rootPathFormat"]

# When --match-paths-by JOB is set, filter against the job paths.
# Otherwise, filtering happens later against workstation paths.
job_output_downloader = OutputDownloader(
s3_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]),
farm_id=farm_id,
Expand All @@ -557,6 +564,7 @@ def _download_job_output(
task_id=task_id,
session_action_id=session_action_id,
session=queue_role_session,
include_filters=include_patterns if match_paths_by == MatchPathsBy.JOB else None,
)

def _check_and_warn_long_output_paths(
Expand Down Expand Up @@ -603,9 +611,13 @@ def _check_and_warn_long_output_paths(
session_action_id=session_action_id,
session=queue_role_session,
)
if include_patterns and match_paths_by == MatchPathsBy.JOB:
manifests_by_root = _filter_manifests(manifests_by_root, include_patterns)
mapped_manifests = _transform_manifests_to_absolute_paths(
manifests_by_root, rules, resolved.job_profile.osFamily
)
if include_patterns and match_paths_by != MatchPathsBy.JOB:
mapped_manifests = _filter_manifests(mapped_manifests, include_patterns)
if mapped_manifests:
download_summary = _download_mapped_manifests(
mapped_manifests=mapped_manifests,
Expand Down Expand Up @@ -713,6 +725,15 @@ def _check_and_warn_long_output_paths(
output_paths_by_root = job_output_downloader.get_output_paths_by_root()
_check_and_warn_long_output_paths(output_paths_by_root)

# Apply include filters against workstation paths (default behavior).
# When --match-paths-by JOB is set, filtering was already applied at the job level.
if include_patterns and match_paths_by != MatchPathsBy.JOB:
job_output_downloader.apply_include_filters(include_patterns)
output_paths_by_root = job_output_downloader.get_output_paths_by_root()
if output_paths_by_root == {}:
click.echo(_get_no_output_message(is_json_format))
return

if not is_json_format:
# Create and print a summary of all the paths to download
all_output_paths: set[str] = set()
Expand Down Expand Up @@ -966,6 +987,22 @@ def _assert_valid_path(path: str) -> None:
@click.option("--job-id", help="The job to use.")
@click.option("--step-id", help="The step to use.")
@click.option("--task-id", help="The task to use.")
@click.option(
"-i",
"--include",
multiple=True,
help="Glob pattern or relative path for files to include in download. Matched against "
"the full path (root + relative). Supports *, ?, [seq]. A trailing / matches all "
"files under that directory. Repeatable",
)
@click.option(
"--match-paths-by",
type=click.Choice(["JOB", "LOCAL"], case_sensitive=False),
default="LOCAL",
help="Control which paths --include filters are matched against. "
"JOB matches against the paths recorded at job submission. "
"LOCAL matches against the local download paths (the default).",
)
Comment thread
waninggibbon marked this conversation as resolved.
@click.option(
"--ignore-storage-profiles",
is_flag=True,
Expand Down Expand Up @@ -1007,7 +1044,15 @@ def _assert_valid_path(path: str) -> None:
"parsed/consumed by custom scripts.",
)
@_handle_error
def job_download_output(step_id, task_id, output, ignore_storage_profiles, **args):
def job_download_output(
step_id,
task_id,
output,
ignore_storage_profiles,
include,
match_paths_by,
**args,
):
"""
Download the output of a Deadline Cloud job that was saved as job
attachments.
Expand All @@ -1018,6 +1063,8 @@ def job_download_output(step_id, task_id, output, ignore_storage_profiles, **arg
if task_id and not step_id:
raise click.UsageError("Missing option '--step-id' required with '--task-id'")

include_patterns = _normalize_filters(list(include)) or None

# Get a temporary config object with the standard options handled
config = _apply_cli_options_to_config(
required_options={"farm_id", "queue_id", "job_id"}, **args
Expand All @@ -1038,6 +1085,8 @@ def job_download_output(step_id, task_id, output, ignore_storage_profiles, **arg
task_id=task_id,
is_json_format=is_json_format,
ignore_storage_profiles=ignore_storage_profiles,
include_patterns=include_patterns,
match_paths_by=MatchPathsBy(match_paths_by),
)
except Exception as e:
if is_json_format:
Expand Down
102 changes: 100 additions & 2 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import concurrent.futures
import json
import os
import posixpath
import re
import time
from collections import defaultdict
from datetime import datetime
from fnmatch import fnmatch
from itertools import chain
from logging import Logger, LoggerAdapter, getLogger
from pathlib import Path
Expand Down Expand Up @@ -73,7 +75,6 @@
from ._utils import (
_get_long_path_compatible_path,
_is_relative_to,
_join_s3_paths,
)
from threading import Lock

Expand Down Expand Up @@ -325,7 +326,9 @@ def get_job_input_paths_by_asset_root(

for manifest_properties in attachments.manifests:
if manifest_properties.inputManifestPath:
key = _join_s3_paths(manifest_properties.inputManifestPath)
key = s3_settings.add_root_and_manifest_folder_prefix(
manifest_properties.inputManifestPath
)
_, asset_manifest = get_asset_root_and_manifest_from_s3(
manifest_key=key,
s3_bucket=s3_settings.s3BucketName,
Expand Down Expand Up @@ -1243,6 +1246,94 @@ def mount_vfs_from_manifests(
vfs_manager.start(session_dir=session_dir)


def _full_path(root: str, relative: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's other code that joins the roots and manifest paths. Can we use the same mechanism here to make sure we don't have differences?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up, I'll raise a new PR against the new repo.

"""Join root and relative path, normalizing to forward slashes for consistent matching.

Uses posixpath.join for consistency with _transform_manifests_to_absolute_paths
in _job_download_helpers.py, which joins roots and manifest paths the same way.
"""
return posixpath.join(root.replace("\\", "/"), relative)


def _matches_any_filter(file_path: str, filters: list[str]) -> bool:
"""
Check if a file path matches any of the given filters using glob-style matching.
Uses fnmatch for pattern matching (supports *, ?, [seq], [!seq]).
A filter ending with '/' matches all files under that directory.
Relative filters (not starting with '/' or '*') are auto-prepended with '*/' so they
match anywhere under the root — e.g. 'renders/*.exr' matches '*/renders/*.exr'.
The file_path should be the full path (root + relative).
"""

def _is_absolute(p: str) -> bool:
return p.startswith(("/", "*")) or (len(p) >= 2 and p[1] == ":")

for f in filters:
if f.endswith("/"):
pattern = f + "*" if _is_absolute(f) else "*/" + f + "*"
if fnmatch(file_path, pattern):
return True
else:
pattern = f if _is_absolute(f) else "*/" + f
if fnmatch(file_path, pattern):
return True
return False


def _filter_paths(
paths_by_root: dict[str, ManifestPathGroup],
path_filters: list[str],
) -> dict[str, ManifestPathGroup]:
"""
Filter ManifestPathGroups using glob-style include patterns.
Filters are matched against the full path (root + relative) to support
patterns like '*/renders/*.png'.
"""
filtered: dict[str, ManifestPathGroup] = {}
for root, group in paths_by_root.items():
filtered_group = ManifestPathGroup()
for hash_alg, file_list in group.files_by_hash_alg.items():
matching = [
f for f in file_list if _matches_any_filter(_full_path(root, f.path), path_filters)
]
if matching:
if hash_alg not in filtered_group.files_by_hash_alg:
filtered_group.files_by_hash_alg[hash_alg] = matching
else:
filtered_group.files_by_hash_alg[hash_alg].extend(matching)
filtered_group.total_bytes += sum(f.size for f in matching)
if filtered_group.files_by_hash_alg:
filtered[root] = filtered_group
return filtered


def _filter_manifests(
manifests_by_root: dict[str, list[BaseAssetManifest]],
path_filters: list[str],
) -> dict[str, list[BaseAssetManifest]]:
"""
Filter BaseAssetManifest objects using glob-style include patterns.
Filters are matched against the full path (root + relative) to support
patterns like '*/renders/*.png'.
Returns a new dict with manifests whose paths have been filtered; empty manifests are removed.
"""
filtered: dict[str, list[BaseAssetManifest]] = {}
for root, manifest_list in manifests_by_root.items():
filtered_manifests = []
for manifest in manifest_list:
matching = [
p
for p in manifest.paths
if _matches_any_filter(_full_path(root, p.path), path_filters)
]
if matching:
manifest.paths = matching
filtered_manifests.append(manifest)
if filtered_manifests:
filtered[root] = filtered_manifests
return filtered


def _ensure_paths_within_directory(root_path: str, paths_relative_to_root: list[str]) -> None:
"""
Validates the given paths to ensure that they are within the given root path.
Expand Down Expand Up @@ -1281,6 +1372,7 @@ def __init__(
task_id: Optional[str] = None,
session_action_id: Optional[str] = None,
session: Optional[boto3.Session] = None,
include_filters: Optional[list[str]] = None,
) -> None:
self.s3_settings = s3_settings
self.session = session
Expand All @@ -1294,6 +1386,8 @@ def __init__(
session_action_id=session_action_id,
session=session,
)
if include_filters:
self.outputs_by_root = _filter_paths(self.outputs_by_root, include_filters)

def get_output_paths_by_root(self) -> dict[str, list[str]]:
"""
Expand All @@ -1305,6 +1399,10 @@ def get_output_paths_by_root(self) -> dict[str, list[str]]:
output_paths_by_root[root] = path_group.get_all_paths()
return output_paths_by_root

def apply_include_filters(self, include_filters: list[str]) -> None:
"""Apply glob-style include filters against the current workstation paths."""
self.outputs_by_root = _filter_paths(self.outputs_by_root, include_filters)

def set_root_path(self, original_root: str, new_root: str) -> None:
"""
Changes the root path for downloading output files, (which is the root path
Expand Down
Loading
Loading