Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/job_attachments_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Job attachments uses your configured S3 bucket as a [content-addressable storage](https://en.wikipedia.org/wiki/Content-addressable_storage), which creates a snapshot of the files used in your job submission in [asset manifests](#asset-manifests), only uploading files that aren't already in S3. This saves you time and bandwidth when iterating on jobs. When an [AWS Deadline Cloud worker agent][worker-agent] starts working on a job with job attachments, it recreates the file system snapshot in the worker agent session directory, and uploads any outputs back to your S3 bucket.

You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor].
You can then easily download your outputs with the [deadline job download-output] command, or your inputs with the [deadline job download-input] command. You can also use the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. Both commands support `--include-path` for downloading specific files or directories.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks like a duplicate of https://github.com/aws-deadline/deadline-cloud/pull/1108/changes, should we review the other one?


Job attachments also works as an auxiliary storage when used with [AWS Deadline Cloud storage profiles][shared-storage], allowing you to flexibly upload files to your Amazon S3 bucket that aren't on your configured shared storage.

Expand All @@ -18,6 +18,7 @@ See the [`examples`](https://github.com/aws-deadline/deadline-cloud/tree/mainlin
[worker-agent]: https://github.com/aws-deadline/deadline-cloud-worker-agent/blob/release/docs/
[developer-guide]: https://docs.aws.amazon.com/deadline-cloud/latest/developerguide/what-job-attachments-uploads-to-amazon-s3.html
[deadline job download-output]: cli_reference/deadline_job.md#download-output
[deadline job download-input]: cli_reference/deadline_job.md#download-input

## Job Attachments Bucket Structure

Expand Down
457 changes: 337 additions & 120 deletions src/deadline/client/cli/_groups/job_group.py

Large diffs are not rendered by default.

104 changes: 92 additions & 12 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
from ._utils import (
_get_long_path_compatible_path,
_is_relative_to,
_join_s3_paths,
)
from threading import Lock

Expand Down Expand Up @@ -325,7 +324,9 @@ def get_job_input_paths_by_asset_root(

for manifest_properties in attachments.manifests:
if manifest_properties.inputManifestPath:
key = _join_s3_paths(manifest_properties.inputManifestPath)
key = s3_settings.add_root_and_manifest_folder_prefix(
manifest_properties.inputManifestPath
)
_, asset_manifest = get_asset_root_and_manifest_from_s3(
manifest_key=key,
s3_bucket=s3_settings.s3BucketName,
Expand Down Expand Up @@ -1243,6 +1244,47 @@ def mount_vfs_from_manifests(
vfs_manager.start(session_dir=session_dir)


def _matches_any_filter(file_path: str, filters: list[str]) -> bool:
"""
Check if a file path matches any of the given filters.
- Exact match: filter "renders/frame_001.exr" matches only that path
- Directory prefix: filter "renders/" matches all paths starting with "renders/"
"""
for f in filters:
if f.endswith("/"):
if file_path.startswith(f):
return True
else:
if file_path == f:
return True
return False


def _filter_paths(
paths_by_root: dict[str, ManifestPathGroup],
path_filters: list[str],
) -> dict[str, ManifestPathGroup]:
"""
Filter ManifestPathGroups to only include files matching the given filters.
Each filter is an exact relative file path or a directory prefix (ending with '/').
Filters are matched against file paths across all asset roots.
"""
filtered: dict[str, ManifestPathGroup] = {}
for root, group in paths_by_root.items():
filtered_group = ManifestPathGroup()
for hash_alg, file_list in group.files_by_hash_alg.items():
matching = [f for f in file_list if _matches_any_filter(f.path, path_filters)]
if matching:
if hash_alg not in filtered_group.files_by_hash_alg:
filtered_group.files_by_hash_alg[hash_alg] = matching
else:
filtered_group.files_by_hash_alg[hash_alg].extend(matching)
filtered_group.total_bytes += sum(f.size for f in matching)
if filtered_group.files_by_hash_alg:
filtered[root] = filtered_group
return filtered


def _ensure_paths_within_directory(root_path: str, paths_relative_to_root: list[str]) -> None:
"""
Validates the given paths to ensure that they are within the given root path.
Expand Down Expand Up @@ -1282,19 +1324,26 @@ def __init__(
task_id: Optional[str] = None,
session_action_id: Optional[str] = None,
session: Optional[boto3.Session] = None,
path_filters: Optional[list[str]] = None,
_outputs_by_root: Optional[dict[str, ManifestPathGroup]] = None,
) -> None:
self.s3_settings = s3_settings
self.session = session
self.outputs_by_root = get_job_output_paths_by_asset_root(
s3_settings=s3_settings,
farm_id=farm_id,
queue_id=queue_id,
job_id=job_id,
step_id=step_id,
task_id=task_id,
session_action_id=session_action_id,
session=session,
)
if _outputs_by_root is not None:
self.outputs_by_root = _outputs_by_root
else:
self.outputs_by_root = get_job_output_paths_by_asset_root(
s3_settings=s3_settings,
farm_id=farm_id,
queue_id=queue_id,
job_id=job_id,
step_id=step_id,
task_id=task_id,
session_action_id=session_action_id,
session=session,
)
if path_filters:
self.outputs_by_root = _filter_paths(self.outputs_by_root, path_filters)

def get_output_paths_by_root(self) -> dict[str, list[str]]:
"""
Expand Down Expand Up @@ -1410,6 +1459,37 @@ def download_job_output(
return progress_tracker.get_download_summary_statistics(downloaded_files_paths_by_root)


class InputDownloader(OutputDownloader):
"""
Handler for downloading input attachment files for a given job.
Inherits download mechanics from OutputDownloader, only differs in
how manifests are fetched (input manifests from job attachments
instead of output manifests from S3 listing).
"""

def __init__(
self,
s3_settings: JobAttachmentS3Settings,
attachments: Attachments,
session: Optional[boto3.Session] = None,
path_filters: Optional[list[str]] = None,
) -> None:
input_paths = get_job_input_paths_by_asset_root(
s3_settings=s3_settings,
attachments=attachments,
session=session,
)
super().__init__(
s3_settings=s3_settings,
farm_id="",
queue_id="",
job_id="",
session=session,
path_filters=path_filters,
_outputs_by_root=input_paths,
)


def _get_manifests_by_session_action_id(
s3_settings: JobAttachmentS3Settings,
farm_id: str,
Expand Down
2 changes: 2 additions & 0 deletions test/unit/deadline_client/cli/test_cli_handle_web_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def test_cli_handle_web_url_download_output_only_required_input(fresh_deadline_c
task_id=None,
session_action_id=None,
session=ANY,
path_filters=None,
)
mock_download.assert_called_once_with(
file_conflict_resolution=FileConflictResolution.CREATE_COPY,
Expand Down Expand Up @@ -371,6 +372,7 @@ def test_cli_handle_web_url_download_output_with_optional_input(fresh_deadline_c
task_id=MOCK_TASK_ID,
session_action_id=MOCK_SESSION_ACTION_ID,
session=ANY,
path_filters=None,
)
mock_download.assert_called_once_with(
file_conflict_resolution=FileConflictResolution.CREATE_COPY,
Expand Down
8 changes: 8 additions & 0 deletions test/unit/deadline_client/cli/test_cli_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def test_cli_job_download_output_stdout_with_only_required_input(
task_id=None,
session_action_id=None,
session=ANY,
path_filters=None,
)

path_separator = "/" if sys.platform != "win32" else "\\"
Expand Down Expand Up @@ -490,6 +491,7 @@ def test_cli_job_download_output_stdout_with_mismatching_path_format(
task_id=None,
session_action_id=None,
session=ANY,
path_filters=None,
)

path_separator = "/" if sys.platform != "win32" else "\\"
Expand Down Expand Up @@ -590,6 +592,7 @@ def test_cli_job_download_output_handles_unc_path_on_windows(fresh_deadline_conf
task_id=None,
session_action_id=None,
session=ANY,
path_filters=None,
)

path_separator = "/" if sys.platform != "win32" else "\\"
Expand Down Expand Up @@ -673,6 +676,7 @@ def test_cli_job_download_no_output_stdout(fresh_deadline_config, tmp_path: Path
task_id=None,
session_action_id=None,
session=ANY,
path_filters=None,
)

assert (
Expand Down Expand Up @@ -774,6 +778,7 @@ def test_cli_job_download_output_stdout_with_json_format(
task_id=None,
session_action_id=None,
session=ANY,
path_filters=None,
)

expected_json_title = {"messageType": "title", "value": "Mock Job"}
Expand Down Expand Up @@ -1390,6 +1395,7 @@ def test_cli_job_download_output_handle_web_url_with_optional_input(
task_id="task-2",
session_action_id=MOCK_SESSION_ACTION_ID,
session=ANY,
path_filters=None,
)
mock_download.assert_called_once_with(
file_conflict_resolution=FileConflictResolution.CREATE_COPY,
Expand Down Expand Up @@ -1476,6 +1482,7 @@ def test_cli_job_download_output_with_different_asset_root_path_format_than_job(
task_id=None,
session_action_id=None,
session=ANY,
path_filters=None,
)

path_separator = "/" if sys.platform != "win32" else "\\"
Expand Down Expand Up @@ -1707,6 +1714,7 @@ def test_cli_job_download_output_with_session_action_id(fresh_deadline_config):
task_id=MOCK_TASK_ID,
session_action_id=MOCK_SESSION_ACTION_ID,
session=ANY,
path_filters=None,
)


Expand Down
68 changes: 68 additions & 0 deletions test/unit/deadline_client/cli/test_cli_path_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

"""Tests for _validate_and_normalize_include_paths."""

import pytest
import click

from deadline.client.cli._groups.job_group import _validate_and_normalize_include_paths


class TestValidateAndNormalizePathFilters:
def test_rejects_double_dot(self):
with pytest.raises(click.BadParameter, match="must not contain '..'"):
_validate_and_normalize_include_paths(["../../etc/passwd"])

def test_rejects_double_dot_in_middle(self):
with pytest.raises(click.BadParameter, match="must not contain '..'"):
_validate_and_normalize_include_paths(["renders/../secret"])

def test_converts_backslashes(self):
result = _validate_and_normalize_include_paths(["renders\\frame_001.exr"])
assert result == ["renders/frame_001.exr"]

def test_strips_leading_dot_slash(self):
result = _validate_and_normalize_include_paths(["./renders/frame.exr"])
assert result == ["renders/frame.exr"]

def test_collapses_double_slashes(self):
result = _validate_and_normalize_include_paths(["renders//frame.exr"])
assert result == ["renders/frame.exr"]

def test_empty_filter_removed(self):
result = _validate_and_normalize_include_paths(["", "a.txt"])
assert result == ["a.txt"]

def test_passthrough_normal_paths(self):
result = _validate_and_normalize_include_paths(
["renders/frame_001.exr", "textures/", "scripts/setup.mel"]
)
assert result == ["renders/frame_001.exr", "textures/", "scripts/setup.mel"]

def test_backslash_with_double_dot_rejected(self):
"""Backslash path traversal like 'renders\\..\\..' should be rejected."""
with pytest.raises(click.BadParameter, match="must not contain '..'"):
_validate_and_normalize_include_paths(["renders\\..\\secret"])

def test_multiple_normalizations(self):
result = _validate_and_normalize_include_paths([".\\renders\\\\frame.exr"])
# .\ -> ./ after backslash conversion, then ./ stripped, // collapsed
assert result == ["renders/frame.exr"]


class TestParseFiltersStdin:
def test_stdin_reads_until_empty_line(self):
from unittest.mock import patch
from io import StringIO
from deadline.client.cli._groups.job_group import _parse_filters_and_config

stdin_data = "renders/frame_001.exr\nrenders/frame_002.exr\n\nextra_ignored\n"
with patch("sys.stdin", StringIO(stdin_data)), patch(
"deadline.client.cli._groups.job_group._apply_cli_options_to_config"
), patch("deadline.client.cli._groups.job_group.config_file") as mock_cf:
mock_cf.get_setting.return_value = "test-id"
filters, _, _, _, _ = _parse_filters_and_config(
(), True, {"yes": False, "profile": None}
)

assert filters == ["renders/frame_001.exr", "renders/frame_002.exr"]
2 changes: 1 addition & 1 deletion test/unit/deadline_job_attachments/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def fixture_default_attachments(farm_id, queue_id):
ManifestProperties(
rootPath="/tmp",
rootPathFormat=PathFormat.POSIX,
inputManifestPath=f"assetRoot/Manifests/{farm_id}/{queue_id}/Inputs/0000/manifest_input",
inputManifestPath=f"{farm_id}/{queue_id}/Inputs/0000/manifest_input",
inputManifestHash="manifesthash",
outputRelativeDirectories=["test/outputs"],
)
Expand Down
Loading
Loading