From d933830fc2cc0e360df877b67f9b0d9d867ab10f Mon Sep 17 00:00:00 2001 From: larrygao Date: Fri, 17 Apr 2026 20:04:23 +0000 Subject: [PATCH 1/3] feat(cli): Add --include-path selective filtering to download-output Add selective download support for job output attachments: - Add --include-path and --include-path-stdin options to download-output for downloading specific files or directory prefixes - Add _filter_paths() and _matches_any_filter() in download.py - Add path_filters parameter to OutputDownloader - Add _validate_and_normalize_include_paths() with path traversal rejection, backslash-to-forward-slash conversion, and normalization - Fix bug in get_job_input_paths_by_asset_root() where S3 root prefix was not prepended to input manifest keys - Extract _run_download_ux(), _get_job_download_context(), _parse_filters_and_config(), _handle_download_error() shared helpers - Update job_attachments_guide.md to document --include-path - Add unit tests for path filtering and validation Signed-off-by: larrygao --- docs/job_attachments_guide.md | 2 +- src/deadline/client/cli/_groups/job_group.py | 324 +++++++++++------- src/deadline/job_attachments/download.py | 73 +++- .../cli/test_cli_handle_web_url.py | 2 + test/unit/deadline_client/cli/test_cli_job.py | 8 + .../cli/test_cli_path_filters.py | 68 ++++ .../unit/deadline_job_attachments/conftest.py | 2 +- .../test_path_filtering.py | 109 ++++++ 8 files changed, 454 insertions(+), 134 deletions(-) create mode 100644 test/unit/deadline_client/cli/test_cli_path_filters.py create mode 100644 test/unit/deadline_job_attachments/test_path_filtering.py diff --git a/docs/job_attachments_guide.md b/docs/job_attachments_guide.md index 8f8af8951..ff35dc61c 100644 --- a/docs/job_attachments_guide.md +++ b/docs/job_attachments_guide.md @@ -4,7 +4,7 @@ Job attachments uses your configured S3 bucket as a [content-addressable storage](https://en.wikipedia.org/wiki/Content-addressable_storage), which creates a snapshot of the files used in your job submission in [asset manifests](#asset-manifests), only uploading files that aren't already in S3. This saves you time and bandwidth when iterating on jobs. When an [AWS Deadline Cloud worker agent][worker-agent] starts working on a job with job attachments, it recreates the file system snapshot in the worker agent session directory, and uploads any outputs back to your S3 bucket. -You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. +You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. The command supports `--include-path` for downloading specific files or directories. Job attachments also works as an auxiliary storage when used with [AWS Deadline Cloud storage profiles][shared-storage], allowing you to flexibly upload files to your Amazon S3 bucket that aren't on your configured shared storage. diff --git a/src/deadline/client/cli/_groups/job_group.py b/src/deadline/client/cli/_groups/job_group.py index e8fdb2f25..70782aaf1 100644 --- a/src/deadline/client/cli/_groups/job_group.py +++ b/src/deadline/client/cli/_groups/job_group.py @@ -480,74 +480,24 @@ def job_requeue_tasks(run_status: Optional[list[str]], **args): click.echo(f"\nRequeued a total of {total_count_requeued} tasks.") -def _download_job_output( +def _run_download_ux( + downloader: OutputDownloader, + output_paths_by_root: dict[str, list[str]], + root_path_format_mapping: dict[str, str], + is_json_format: bool, config: Optional[ConfigParser], - farm_id: str, - queue_id: str, - job_id: str, - step_id: Optional[str], - task_id: Optional[str], - is_json_format: bool = False, + label: str = "Outputs", + telemetry_metric: str = "download_job_output", ): """ - Starts the download of job output and handles the progress reporting callback. + Shared UX flow for downloading files: root path confirmation, cross-OS mapping, + conflict resolution, progress bar, and summary. Used by both download-output and download-input. """ - deadline = api.get_boto3_client("deadline", config=config) - auto_accept = config_file.str2bool( config_file.get_setting("settings.auto_accept", config=config) ) conflict_resolution = config_file.get_setting("settings.conflict_resolution", config=config) - job = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id) - step = {} - task = {} - session_action_id = None - if step_id: - step = deadline.get_step(farmId=farm_id, queueId=queue_id, jobId=job_id, stepId=step_id) - if task_id: - task = deadline.get_task( - farmId=farm_id, - queueId=queue_id, - jobId=job_id, - stepId=step_id, - taskId=task_id, - ) - session_action_id = task.get("latestSessionActionId") - - click.echo( - _get_start_message(job["name"], step.get("name"), task.get("parameters"), is_json_format) - ) - - queue = deadline.get_queue(farmId=farm_id, queueId=queue_id) - - queue_role_session = api.get_queue_user_boto3_session( - deadline=deadline, - config=config, - farm_id=farm_id, - queue_id=queue_id, - queue_display_name=queue["displayName"], - ) - - # Get a dictionary mapping rootPath to rootPathFormat (OS) from job's manifests - root_path_format_mapping: dict[str, str] = {} - job_attachments = job.get("attachments", None) - if job_attachments: - job_attachments_manifests = job_attachments["manifests"] - for manifest in job_attachments_manifests: - root_path_format_mapping[manifest["rootPath"]] = manifest["rootPathFormat"] - - job_output_downloader = OutputDownloader( - s3_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]), - farm_id=farm_id, - queue_id=queue_id, - job_id=job_id, - step_id=step_id, - task_id=task_id, - session_action_id=session_action_id, - session=queue_role_session, - ) - def _check_and_warn_long_output_paths( output_paths_by_root: dict[str, list[str]], ) -> None: @@ -560,25 +510,16 @@ def _check_and_warn_long_output_paths( fg="yellow", ) - output_paths_by_root = job_output_downloader.get_output_paths_by_root() - # If no output paths were found, log a message and exit. - if output_paths_by_root == {}: - click.echo(_get_no_output_message(is_json_format)) - return - _check_and_warn_long_output_paths(output_paths_by_root) - # Check if the asset roots came from different OS. If so, prompt users to - # select alternative root paths to download to, (regardless of the auto-accept.) + # Check if the asset roots came from different OS asset_roots = list(output_paths_by_root.keys()) for asset_root in asset_roots: root_path_format = root_path_format_mapping.get(asset_root, "") if root_path_format == "": - # There must be a corresponding root path format for each root path, by design. raise DeadlineOperationError(f"No root path format found for {asset_root}.") if PathFormat.get_host_path_format_string() != root_path_format: click.echo(_get_mismatch_os_root_warning(asset_root, root_path_format, is_json_format)) - if not is_json_format: new_root = click.prompt( "> Please enter a new root path", @@ -590,16 +531,12 @@ def _check_and_warn_long_output_paths( json_string, JSON_MSG_TYPE_PATHCONFIRM, expected_size=1 )[0] _assert_valid_path(new_root) + downloader.set_root_path(asset_root, os.path.expanduser(new_root)) - job_output_downloader.set_root_path(asset_root, os.path.expanduser(new_root)) - - output_paths_by_root = job_output_downloader.get_output_paths_by_root() - + output_paths_by_root = downloader.get_output_paths_by_root() _check_and_warn_long_output_paths(output_paths_by_root) - # Prompt users to confirm local root paths where they will download outputs to, - # and allow users to select different location to download files to if they want. - # (If auto-accept is enabled, automatically download to the default root paths.) + # Prompt users to confirm local root paths if not auto_accept: if not is_json_format: user_choice = "" @@ -621,20 +558,17 @@ def _check_and_warn_long_output_paths( default="y", ) if user_choice == "n": - click.echo("Output download canceled.") + click.echo("Download canceled.") return elif user_choice != "y": - # User selected an index to modify the root directory. index_to_change = int(user_choice) new_root = click.prompt( "> Please enter the new root directory path, or press Enter to keep it unchanged", type=click.Path(exists=False), default=asset_roots[index_to_change], ) - job_output_downloader.set_root_path( - asset_roots[index_to_change], str(Path(new_root)) - ) - output_paths_by_root = job_output_downloader.get_output_paths_by_root() + downloader.set_root_path(asset_roots[index_to_change], str(Path(new_root))) + output_paths_by_root = downloader.get_output_paths_by_root() _check_and_warn_long_output_paths(output_paths_by_root) else: click.echo( @@ -648,12 +582,11 @@ def _check_and_warn_long_output_paths( ) for index, confirmed_root in enumerate(confirmed_asset_roots): _assert_valid_path(confirmed_root) - job_output_downloader.set_root_path(asset_roots[index], str(Path(confirmed_root))) - output_paths_by_root = job_output_downloader.get_output_paths_by_root() + downloader.set_root_path(asset_roots[index], str(Path(confirmed_root))) + output_paths_by_root = downloader.get_output_paths_by_root() _check_and_warn_long_output_paths(output_paths_by_root) if not is_json_format: - # Create and print a summary of all the paths to download all_output_paths: set[str] = set() for asset_root, output_paths in output_paths_by_root.items(): all_output_paths.update( @@ -662,9 +595,6 @@ def _check_and_warn_long_output_paths( click.echo("\nSummary of file paths to download:") click.echo(textwrap.indent(summarize_path_list(all_output_paths), " ")) - # If the conflict resolution option was not specified, auto-accept is false, and - # if there are any conflicting files in local, prompt users to select a resolution method. - # (skip, overwrite, or make a copy.) if conflict_resolution != FileConflictResolution.NOT_SELECTED.name: file_conflict_resolution = FileConflictResolution[conflict_resolution] elif auto_accept: @@ -680,33 +610,27 @@ def _check_and_warn_long_output_paths( default="3", ) if user_choice == "n": - click.echo("Output download canceled.") + click.echo("Download canceled.") return else: - resolution_choice_int = int(user_choice) - file_conflict_resolution = FileConflictResolution(resolution_choice_int) + file_conflict_resolution = FileConflictResolution(int(user_choice)) - # TODO: remove logging level setting when the max number connections for boto3 client - # in Job Attachments library can be increased (currently using default number, 10, which - # makes it keep logging urllib3 warning messages when downloading large files) with _modified_logging_level(logging.getLogger("urllib3"), logging.ERROR): - @api.record_success_fail_telemetry_event(metric_name="download_job_output") - def _download_job_output( + @api.record_success_fail_telemetry_event(metric_name=telemetry_metric) + def _do_download( file_conflict_resolution: Optional[ FileConflictResolution ] = FileConflictResolution.CREATE_COPY, on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None, ) -> DownloadSummaryStatistics: - return job_output_downloader.download_job_output( + return downloader.download_job_output( file_conflict_resolution=file_conflict_resolution, on_downloading_files=on_downloading_files, ) if not is_json_format: - # Note: click doesn't export the return type of progressbar(), so we suppress mypy warnings for - # not annotating the type of download_progress. - with click.progressbar(length=100, label="Downloading Outputs") as download_progress: # type: ignore[var-annotated] + with click.progressbar(length=100, label=f"Downloading {label}") as download_progress: # type: ignore[var-annotated] def _update_download_progress( download_metadata: ProgressReportMetadata, @@ -716,7 +640,7 @@ def _update_download_progress( download_progress.update(new_progress) return sigint_handler.continue_operation - download_summary: DownloadSummaryStatistics = _download_job_output( # type: ignore + download_summary: DownloadSummaryStatistics = _do_download( # type: ignore file_conflict_resolution=file_conflict_resolution, on_downloading_files=_update_download_progress, ) @@ -728,10 +652,9 @@ def _update_download_progress( click.echo( _get_json_line(JSON_MSG_TYPE_PROGRESS, str(int(download_metadata.progress))) ) - # TODO: enable download cancellation for JSON format return True - download_summary = _download_job_output( # type: ignore + download_summary = _do_download( # type: ignore file_conflict_resolution=file_conflict_resolution, on_downloading_files=_update_download_progress, ) @@ -740,6 +663,113 @@ def _update_download_progress( click.echo() +def _validate_and_normalize_include_paths(filters: list[str]) -> list[str]: + """ + Validates and normalizes path filters. + - Rejects filters containing '..' (path traversal prevention) + - Converts backslashes to forward slashes (Windows compatibility) + - Strips leading './' + - Normalizes '//' to '/' + """ + normalized = [] + for f in filters: + if ".." in f: + raise click.BadParameter(f"Path filter must not contain '..': {f}") + f = f.replace("\\", "/") + if f.startswith("./"): + f = f[2:] + while "//" in f: + f = f.replace("//", "/") + if f: + normalized.append(f) + return normalized + + +def _get_job_download_context(config, farm_id, queue_id, job_id): + """Shared setup for download-output and download-input: fetches job, queue, and credentials.""" + deadline = api.get_boto3_client("deadline", config=config) + job = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id) + queue = deadline.get_queue(farmId=farm_id, queueId=queue_id) + queue_role_session = api.get_queue_user_boto3_session( + deadline=deadline, + config=config, + farm_id=farm_id, + queue_id=queue_id, + queue_display_name=queue["displayName"], + ) + root_path_format_mapping: dict[str, str] = {} + job_attachments = job.get("attachments", None) + if job_attachments: + for manifest in job_attachments["manifests"]: + root_path_format_mapping[manifest["rootPath"]] = manifest["rootPathFormat"] + return deadline, job, queue, queue_role_session, job_attachments, root_path_format_mapping + + +def _download_job_output( + config: Optional[ConfigParser], + farm_id: str, + queue_id: str, + job_id: str, + step_id: Optional[str], + task_id: Optional[str], + is_json_format: bool = False, + include_paths: Optional[list[str]] = None, +): + """ + Starts the download of job output and handles the progress reporting callback. + """ + deadline, job, queue, queue_role_session, job_attachments, root_path_format_mapping = ( + _get_job_download_context(config, farm_id, queue_id, job_id) + ) + + step = {} + task = {} + session_action_id = None + if step_id: + step = deadline.get_step(farmId=farm_id, queueId=queue_id, jobId=job_id, stepId=step_id) + if task_id: + task = deadline.get_task( + farmId=farm_id, + queueId=queue_id, + jobId=job_id, + stepId=step_id, + taskId=task_id, + ) + session_action_id = task.get("latestSessionActionId") + + click.echo( + _get_start_message(job["name"], step.get("name"), task.get("parameters"), is_json_format) + ) + + job_output_downloader = OutputDownloader( + s3_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]), + farm_id=farm_id, + queue_id=queue_id, + job_id=job_id, + step_id=step_id, + task_id=task_id, + session_action_id=session_action_id, + session=queue_role_session, + path_filters=include_paths, + ) + + output_paths_by_root = job_output_downloader.get_output_paths_by_root() + # If no output paths were found, log a message and exit. + if output_paths_by_root == {}: + click.echo(_get_no_output_message(is_json_format)) + return + + _run_download_ux( + downloader=job_output_downloader, + output_paths_by_root=output_paths_by_root, + root_path_format_mapping=root_path_format_mapping, + is_json_format=is_json_format, + config=config, + label="Outputs", + telemetry_metric="download_job_output", + ) + + def _get_start_message( job_name: str, step_name: Optional[str], @@ -915,6 +945,52 @@ def _assert_valid_path(path: str) -> None: raise ValueError(f"Path {path} is not an absolute path.") +def _parse_filters_and_config(include_path, include_path_stdin, args): + """Shared setup for download-output and download-input CLI commands.""" + + filters = list(include_path) + if include_path_stdin: + # Read lines until EOF or an empty line (sentinel). + # The empty-line sentinel allows callers that cannot close stdin + # (e.g. Tauri IPC) to signal end-of-input. + for line in sys.stdin: + stripped = line.strip() + if not stripped: + break + filters.append(stripped) + + # Reopen stdin from the terminal so interactive prompts (e.g. path + # confirmation) still work after the pipe is consumed. + # On Windows, /dev/tty doesn't exist — use CON instead. + try: + tty_path = "CON" if sys.platform == "win32" else "/dev/tty" + sys.stdin = open(tty_path) # noqa: SIM115 + except OSError: + pass # Non-interactive environment (CI, Tauri) — leave stdin as-is + if filters: + filters = _validate_and_normalize_include_paths(filters) + + config = _apply_cli_options_to_config( + required_options={"farm_id", "queue_id", "job_id"}, **args + ) + farm_id = config_file.get_setting("defaults.farm_id", config=config) + queue_id = config_file.get_setting("defaults.queue_id", config=config) + job_id = config_file.get_setting("defaults.job_id", config=config) + return filters or None, config, farm_id, queue_id, job_id + + +def _handle_download_error(e: Exception, is_json_format: bool, download_type: str): + """Shared error handling for download-output and download-input CLI commands.""" + if is_json_format: + error_one_liner = str(e).replace("\n", ". ") + click.echo(_get_json_line(JSON_MSG_TYPE_ERROR, error_one_liner)) + sys.exit(1) + else: + if logging.DEBUG >= logger.getEffectiveLevel(): + logger.exception("Exception details:") + raise DeadlineOperationError(f"Failed to download {download_type}:\n{e}") from e + + @cli_job.command(name="download-output") @click.option("--profile", help="The AWS profile to use.") @click.option("--farm-id", help="The farm to use.") @@ -922,6 +998,16 @@ def _assert_valid_path(path: str) -> None: @click.option("--job-id", help="The job to use.") @click.option("--step-id", help="The step to use.") @click.option("--task-id", help="The task to use.") +@click.option( + "--include-path", + multiple=True, + help="Download only files matching this relative path or directory prefix (trailing /). Repeatable.", +) +@click.option( + "--include-path-stdin", + is_flag=True, + help="Read path filters from stdin, one per line.", +) @click.option( "--conflict-resolution", type=click.Choice( @@ -954,7 +1040,7 @@ def _assert_valid_path(path: str) -> None: "parsed/consumed by custom scripts.", ) @_handle_error -def job_download_output(step_id, task_id, output, **args): +def job_download_output(step_id, task_id, output, include_path, include_path_stdin, **args): """ Download the output of a Deadline Cloud job that was saved as job attachments. @@ -964,27 +1050,25 @@ def job_download_output(step_id, task_id, output, **args): """ if task_id and not step_id: raise click.UsageError("Missing option '--step-id' required with '--task-id'") - # Get a temporary config object with the standard options handled - config = _apply_cli_options_to_config( - required_options={"farm_id", "queue_id", "job_id"}, **args - ) - farm_id = config_file.get_setting("defaults.farm_id", config=config) - queue_id = config_file.get_setting("defaults.queue_id", config=config) - job_id = config_file.get_setting("defaults.job_id", config=config) + filters, config, farm_id, queue_id, job_id = _parse_filters_and_config( + include_path, include_path_stdin, args + ) is_json_format = True if output == "json" else False try: - _download_job_output(config, farm_id, queue_id, job_id, step_id, task_id, is_json_format) + _download_job_output( + config, + farm_id, + queue_id, + job_id, + step_id, + task_id, + is_json_format, + include_paths=filters, + ) except Exception as e: - if is_json_format: - error_one_liner = str(e).replace("\n", ". ") - click.echo(_get_json_line(JSON_MSG_TYPE_ERROR, error_one_liner)) - sys.exit(1) - else: - if logging.DEBUG >= logger.getEffectiveLevel(): - logger.exception("Exception details:") - raise DeadlineOperationError(f"Failed to download output:\n{e}") from e + _handle_download_error(e, is_json_format, "output") @cli_job.command(name="wait") diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index 3eae5a808..2d4653289 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -73,7 +73,6 @@ from ._utils import ( _get_long_path_compatible_path, _is_relative_to, - _join_s3_paths, ) from threading import Lock @@ -325,7 +324,9 @@ def get_job_input_paths_by_asset_root( for manifest_properties in attachments.manifests: if manifest_properties.inputManifestPath: - key = _join_s3_paths(manifest_properties.inputManifestPath) + key = s3_settings.add_root_and_manifest_folder_prefix( + manifest_properties.inputManifestPath + ) _, asset_manifest = get_asset_root_and_manifest_from_s3( manifest_key=key, s3_bucket=s3_settings.s3BucketName, @@ -1243,6 +1244,47 @@ def mount_vfs_from_manifests( vfs_manager.start(session_dir=session_dir) +def _matches_any_filter(file_path: str, filters: list[str]) -> bool: + """ + Check if a file path matches any of the given filters. + - Exact match: filter "renders/frame_001.exr" matches only that path + - Directory prefix: filter "renders/" matches all paths starting with "renders/" + """ + for f in filters: + if f.endswith("/"): + if file_path.startswith(f): + return True + else: + if file_path == f: + return True + return False + + +def _filter_paths( + paths_by_root: dict[str, ManifestPathGroup], + path_filters: list[str], +) -> dict[str, ManifestPathGroup]: + """ + Filter ManifestPathGroups to only include files matching the given filters. + Each filter is an exact relative file path or a directory prefix (ending with '/'). + Filters are matched against file paths across all asset roots. + """ + filtered: dict[str, ManifestPathGroup] = {} + for root, group in paths_by_root.items(): + filtered_group = ManifestPathGroup() + for hash_alg, file_list in group.files_by_hash_alg.items(): + matching = [f for f in file_list if _matches_any_filter(f.path, path_filters)] + if matching: + if hash_alg not in filtered_group.files_by_hash_alg: + filtered_group.files_by_hash_alg[hash_alg] = matching + else: + filtered_group.files_by_hash_alg[hash_alg].extend(matching) + filtered_group.total_bytes += sum(f.size for f in matching) + if filtered_group.files_by_hash_alg: + filtered[root] = filtered_group + return filtered + + def _ensure_paths_within_directory(root_path: str, paths_relative_to_root: list[str]) -> None: """ Validates the given paths to ensure that they are within the given root path. @@ -1282,19 +1324,26 @@ def __init__( task_id: Optional[str] = None, session_action_id: Optional[str] = None, session: Optional[boto3.Session] = None, + path_filters: Optional[list[str]] = None, + _outputs_by_root: Optional[dict[str, ManifestPathGroup]] = None, ) -> None: self.s3_settings = s3_settings self.session = session - self.outputs_by_root = get_job_output_paths_by_asset_root( - s3_settings=s3_settings, - farm_id=farm_id, - queue_id=queue_id, - job_id=job_id, - step_id=step_id, - task_id=task_id, - session_action_id=session_action_id, - session=session, - ) + if _outputs_by_root is not None: + self.outputs_by_root = _outputs_by_root + else: + self.outputs_by_root = get_job_output_paths_by_asset_root( + s3_settings=s3_settings, + farm_id=farm_id, + queue_id=queue_id, + job_id=job_id, + step_id=step_id, + task_id=task_id, + session_action_id=session_action_id, + session=session, + ) + if path_filters: + self.outputs_by_root = _filter_paths(self.outputs_by_root, path_filters) def get_output_paths_by_root(self) -> dict[str, list[str]]: """ diff --git a/test/unit/deadline_client/cli/test_cli_handle_web_url.py b/test/unit/deadline_client/cli/test_cli_handle_web_url.py index b873f18e0..0862cc9ae 100644 --- a/test/unit/deadline_client/cli/test_cli_handle_web_url.py +++ b/test/unit/deadline_client/cli/test_cli_handle_web_url.py @@ -306,6 +306,7 @@ def test_cli_handle_web_url_download_output_only_required_input(fresh_deadline_c task_id=None, session_action_id=None, session=ANY, + path_filters=None, ) mock_download.assert_called_once_with( file_conflict_resolution=FileConflictResolution.CREATE_COPY, @@ -371,6 +372,7 @@ def test_cli_handle_web_url_download_output_with_optional_input(fresh_deadline_c task_id=MOCK_TASK_ID, session_action_id=MOCK_SESSION_ACTION_ID, session=ANY, + path_filters=None, ) mock_download.assert_called_once_with( file_conflict_resolution=FileConflictResolution.CREATE_COPY, diff --git a/test/unit/deadline_client/cli/test_cli_job.py b/test/unit/deadline_client/cli/test_cli_job.py index a1d08da29..7c1675fa9 100644 --- a/test/unit/deadline_client/cli/test_cli_job.py +++ b/test/unit/deadline_client/cli/test_cli_job.py @@ -377,6 +377,7 @@ def test_cli_job_download_output_stdout_with_only_required_input( task_id=None, session_action_id=None, session=ANY, + path_filters=None, ) path_separator = "/" if sys.platform != "win32" else "\\" @@ -490,6 +491,7 @@ def test_cli_job_download_output_stdout_with_mismatching_path_format( task_id=None, session_action_id=None, session=ANY, + path_filters=None, ) path_separator = "/" if sys.platform != "win32" else "\\" @@ -590,6 +592,7 @@ def test_cli_job_download_output_handles_unc_path_on_windows(fresh_deadline_conf task_id=None, session_action_id=None, session=ANY, + path_filters=None, ) path_separator = "/" if sys.platform != "win32" else "\\" @@ -673,6 +676,7 @@ def test_cli_job_download_no_output_stdout(fresh_deadline_config, tmp_path: Path task_id=None, session_action_id=None, session=ANY, + path_filters=None, ) assert ( @@ -774,6 +778,7 @@ def test_cli_job_download_output_stdout_with_json_format( task_id=None, session_action_id=None, session=ANY, + path_filters=None, ) expected_json_title = {"messageType": "title", "value": "Mock Job"} @@ -1390,6 +1395,7 @@ def test_cli_job_download_output_handle_web_url_with_optional_input( task_id="task-2", session_action_id=MOCK_SESSION_ACTION_ID, session=ANY, + path_filters=None, ) mock_download.assert_called_once_with( file_conflict_resolution=FileConflictResolution.CREATE_COPY, @@ -1476,6 +1482,7 @@ def test_cli_job_download_output_with_different_asset_root_path_format_than_job( task_id=None, session_action_id=None, session=ANY, + path_filters=None, ) path_separator = "/" if sys.platform != "win32" else "\\" @@ -1707,6 +1714,7 @@ def test_cli_job_download_output_with_session_action_id(fresh_deadline_config): task_id=MOCK_TASK_ID, session_action_id=MOCK_SESSION_ACTION_ID, session=ANY, + path_filters=None, ) diff --git a/test/unit/deadline_client/cli/test_cli_path_filters.py b/test/unit/deadline_client/cli/test_cli_path_filters.py new file mode 100644 index 000000000..b4cc20719 --- /dev/null +++ b/test/unit/deadline_client/cli/test_cli_path_filters.py @@ -0,0 +1,68 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +"""Tests for _validate_and_normalize_include_paths and download-input CLI command.""" + +import pytest +import click + +from deadline.client.cli._groups.job_group import _validate_and_normalize_include_paths + + +class TestValidateAndNormalizePathFilters: + def test_rejects_double_dot(self): + with pytest.raises(click.BadParameter, match="must not contain '..'"): + _validate_and_normalize_include_paths(["../../etc/passwd"]) + + def test_rejects_double_dot_in_middle(self): + with pytest.raises(click.BadParameter, match="must not contain '..'"): + _validate_and_normalize_include_paths(["renders/../secret"]) + + def test_converts_backslashes(self): + result = _validate_and_normalize_include_paths(["renders\\frame_001.exr"]) + assert result == ["renders/frame_001.exr"] + + def test_strips_leading_dot_slash(self): + result = _validate_and_normalize_include_paths(["./renders/frame.exr"]) + assert result == ["renders/frame.exr"] + + def test_collapses_double_slashes(self): + result = _validate_and_normalize_include_paths(["renders//frame.exr"]) + assert result == ["renders/frame.exr"] + + def test_empty_filter_removed(self): + result = _validate_and_normalize_include_paths(["", "a.txt"]) + assert result == ["a.txt"] + + def test_passthrough_normal_paths(self): + result = _validate_and_normalize_include_paths( + ["renders/frame_001.exr", "textures/", "scripts/setup.mel"] + ) + assert result == ["renders/frame_001.exr", "textures/", "scripts/setup.mel"] + + def test_backslash_with_double_dot_rejected(self): + """Backslash path traversal like 'renders\\..\\..' should be rejected.""" + with pytest.raises(click.BadParameter, match="must not contain '..'"): + _validate_and_normalize_include_paths(["renders\\..\\secret"]) + + def test_multiple_normalizations(self): + result = _validate_and_normalize_include_paths([".\\renders\\\\frame.exr"]) + # .\ -> ./ after backslash conversion, then ./ stripped, // collapsed + assert result == ["renders/frame.exr"] + + +class TestParseFiltersStdin: + def test_stdin_reads_until_empty_line(self): + from unittest.mock import patch + from io import StringIO + from deadline.client.cli._groups.job_group import _parse_filters_and_config + + stdin_data = "renders/frame_001.exr\nrenders/frame_002.exr\n\nextra_ignored\n" + with patch("sys.stdin", StringIO(stdin_data)), patch( + "deadline.client.cli._groups.job_group._apply_cli_options_to_config" + ), patch("deadline.client.cli._groups.job_group.config_file") as mock_cf: + mock_cf.get_setting.return_value = "test-id" + filters, _, _, _, _ = _parse_filters_and_config( + (), True, {"yes": False, "profile": None} + ) + + assert filters == ["renders/frame_001.exr", "renders/frame_002.exr"] diff --git a/test/unit/deadline_job_attachments/conftest.py b/test/unit/deadline_job_attachments/conftest.py index 272e377c0..ce5e41270 100644 --- a/test/unit/deadline_job_attachments/conftest.py +++ b/test/unit/deadline_job_attachments/conftest.py @@ -112,7 +112,7 @@ def fixture_default_attachments(farm_id, queue_id): ManifestProperties( rootPath="/tmp", rootPathFormat=PathFormat.POSIX, - inputManifestPath=f"assetRoot/Manifests/{farm_id}/{queue_id}/Inputs/0000/manifest_input", + inputManifestPath=f"{farm_id}/{queue_id}/Inputs/0000/manifest_input", inputManifestHash="manifesthash", outputRelativeDirectories=["test/outputs"], ) diff --git a/test/unit/deadline_job_attachments/test_path_filtering.py b/test/unit/deadline_job_attachments/test_path_filtering.py new file mode 100644 index 000000000..8f1e9d76d --- /dev/null +++ b/test/unit/deadline_job_attachments/test_path_filtering.py @@ -0,0 +1,109 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +"""Tests for path filtering in download.py""" + +from typing import List + +from deadline.job_attachments.download import ( + _matches_any_filter, + _filter_paths, +) +from deadline.job_attachments.models import ManifestPathGroup +from deadline.job_attachments.asset_manifests.hash_algorithms import HashAlgorithm +from deadline.job_attachments.asset_manifests.v2023_03_03 import ( + ManifestPath as ManifestPathv2023_03_03, +) + + +class TestMatchesAnyFilter: + def test_exact_match(self): + assert _matches_any_filter("renders/frame_001.exr", ["renders/frame_001.exr"]) is True + + def test_exact_no_match(self): + assert _matches_any_filter("renders/frame_002.exr", ["renders/frame_001.exr"]) is False + + def test_directory_prefix_match(self): + assert _matches_any_filter("renders/frame_001.exr", ["renders/"]) is True + + def test_directory_prefix_no_match(self): + assert _matches_any_filter("textures/wood.exr", ["renders/"]) is False + + def test_directory_prefix_does_not_match_similar_names(self): + """'renders/' should NOT match 'renders_v2/file.exr'""" + assert _matches_any_filter("renders_v2/file.exr", ["renders/"]) is False + + def test_multiple_filters_or(self): + assert ( + _matches_any_filter("textures/wood.exr", ["renders/frame_001.exr", "textures/"]) is True + ) + + def test_empty_filters(self): + assert _matches_any_filter("renders/frame_001.exr", []) is False + + def test_nested_directory_prefix(self): + assert _matches_any_filter("a/b/c/file.txt", ["a/b/"]) is True + + def test_exact_match_no_trailing_slash(self): + """Exact filter without trailing slash should not match subdirectory files.""" + assert _matches_any_filter("renders/frame_001.exr", ["renders"]) is False + + +class TestFilterPaths: + def _make_group(self, paths: List[str]) -> ManifestPathGroup: + group = ManifestPathGroup() + group.files_by_hash_alg[HashAlgorithm.XXH128] = [ + ManifestPathv2023_03_03(path=p, hash="abc123", size=100, mtime=1234000000) + for p in paths + ] + group.total_bytes = len(paths) * 100 + return group + + def test_exact_filter(self): + paths_by_root = {"/root": self._make_group(["a.txt", "b.txt", "c.txt"])} + result = _filter_paths(paths_by_root, ["b.txt"]) + assert list(result.keys()) == ["/root"] + assert [f.path for f in result["/root"].files_by_hash_alg[HashAlgorithm.XXH128]] == [ + "b.txt" + ] + assert result["/root"].total_bytes == 100 + + def test_directory_prefix_filter(self): + paths_by_root = { + "/root": self._make_group(["renders/a.exr", "renders/b.exr", "textures/c.png"]) + } + result = _filter_paths(paths_by_root, ["renders/"]) + files = [f.path for f in result["/root"].files_by_hash_alg[HashAlgorithm.XXH128]] + assert files == ["renders/a.exr", "renders/b.exr"] + + def test_no_matches_returns_empty(self): + paths_by_root = {"/root": self._make_group(["a.txt"])} + result = _filter_paths(paths_by_root, ["nonexistent.txt"]) + assert result == {} + + def test_multiple_asset_roots(self): + paths_by_root = { + "/root1": self._make_group(["shared/file.txt", "other.txt"]), + "/root2": self._make_group(["shared/file.txt", "different.txt"]), + } + result = _filter_paths(paths_by_root, ["shared/file.txt"]) + assert "/root1" in result + assert "/root2" in result + + def test_mixed_filters(self): + paths_by_root = { + "/root": self._make_group( + ["renders/a.exr", "renders/b.exr", "textures/c.png", "scripts/setup.mel"] + ) + } + result = _filter_paths(paths_by_root, ["renders/", "scripts/setup.mel"]) + files = [f.path for f in result["/root"].files_by_hash_alg[HashAlgorithm.XXH128]] + assert set(files) == {"renders/a.exr", "renders/b.exr", "scripts/setup.mel"} + + def test_empty_root_removed(self): + paths_by_root = { + "/has_match": self._make_group(["a.txt"]), + "/no_match": self._make_group(["b.txt"]), + } + result = _filter_paths(paths_by_root, ["a.txt"]) + assert "/has_match" in result + assert "/no_match" not in result From ca11fcdf18d95ea5f6298f1f5abdaf1024722dde Mon Sep 17 00:00:00 2001 From: larrygao Date: Fri, 17 Apr 2026 20:18:48 +0000 Subject: [PATCH 2/3] refactor: clean up download-input references for PR split Remove remaining download-input references from docstrings and comments. download-input command will be added in a follow-up PR. Signed-off-by: larrygao --- src/deadline/client/cli/_groups/job_group.py | 8 ++++---- test/unit/deadline_client/cli/test_cli_path_filters.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/deadline/client/cli/_groups/job_group.py b/src/deadline/client/cli/_groups/job_group.py index 70782aaf1..3376c7fd9 100644 --- a/src/deadline/client/cli/_groups/job_group.py +++ b/src/deadline/client/cli/_groups/job_group.py @@ -491,7 +491,7 @@ def _run_download_ux( ): """ Shared UX flow for downloading files: root path confirmation, cross-OS mapping, - conflict resolution, progress bar, and summary. Used by both download-output and download-input. + conflict resolution, progress bar, and summary. Used by download-output. """ auto_accept = config_file.str2bool( config_file.get_setting("settings.auto_accept", config=config) @@ -686,7 +686,7 @@ def _validate_and_normalize_include_paths(filters: list[str]) -> list[str]: def _get_job_download_context(config, farm_id, queue_id, job_id): - """Shared setup for download-output and download-input: fetches job, queue, and credentials.""" + """Shared setup for download commands: fetches job, queue, and credentials.""" deadline = api.get_boto3_client("deadline", config=config) job = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id) queue = deadline.get_queue(farmId=farm_id, queueId=queue_id) @@ -946,7 +946,7 @@ def _assert_valid_path(path: str) -> None: def _parse_filters_and_config(include_path, include_path_stdin, args): - """Shared setup for download-output and download-input CLI commands.""" + """Shared setup for download CLI commands.""" filters = list(include_path) if include_path_stdin: @@ -980,7 +980,7 @@ def _parse_filters_and_config(include_path, include_path_stdin, args): def _handle_download_error(e: Exception, is_json_format: bool, download_type: str): - """Shared error handling for download-output and download-input CLI commands.""" + """Shared error handling for download CLI commands.""" if is_json_format: error_one_liner = str(e).replace("\n", ". ") click.echo(_get_json_line(JSON_MSG_TYPE_ERROR, error_one_liner)) diff --git a/test/unit/deadline_client/cli/test_cli_path_filters.py b/test/unit/deadline_client/cli/test_cli_path_filters.py index b4cc20719..3b78fe071 100644 --- a/test/unit/deadline_client/cli/test_cli_path_filters.py +++ b/test/unit/deadline_client/cli/test_cli_path_filters.py @@ -1,6 +1,6 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -"""Tests for _validate_and_normalize_include_paths and download-input CLI command.""" +"""Tests for _validate_and_normalize_include_paths.""" import pytest import click From 77c5ab1cbd9278b303d69870c571dc0ccbac0d25 Mon Sep 17 00:00:00 2001 From: larrygao Date: Fri, 17 Apr 2026 20:42:51 +0000 Subject: [PATCH 3/3] feat(cli): Add download-input command for input attachments Add new download-input command for downloading input attachment files, using the same --include-path filtering and shared UX flow established in the download-output PR. - Add InputDownloader class (inherits from OutputDownloader) - Add _download_job_input() and download-input CLI command - Update job_attachments_guide.md to document download-input Signed-off-by: larrygao --- docs/job_attachments_guide.md | 3 +- src/deadline/client/cli/_groups/job_group.py | 135 ++++++++++++++++++- src/deadline/job_attachments/download.py | 31 +++++ 3 files changed, 167 insertions(+), 2 deletions(-) diff --git a/docs/job_attachments_guide.md b/docs/job_attachments_guide.md index ff35dc61c..5e4b397dc 100644 --- a/docs/job_attachments_guide.md +++ b/docs/job_attachments_guide.md @@ -4,7 +4,7 @@ Job attachments uses your configured S3 bucket as a [content-addressable storage](https://en.wikipedia.org/wiki/Content-addressable_storage), which creates a snapshot of the files used in your job submission in [asset manifests](#asset-manifests), only uploading files that aren't already in S3. This saves you time and bandwidth when iterating on jobs. When an [AWS Deadline Cloud worker agent][worker-agent] starts working on a job with job attachments, it recreates the file system snapshot in the worker agent session directory, and uploads any outputs back to your S3 bucket. -You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. The command supports `--include-path` for downloading specific files or directories. +You can then easily download your outputs with the [deadline job download-output] command, or your inputs with the [deadline job download-input] command. You can also use the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. Both commands support `--include-path` for downloading specific files or directories. Job attachments also works as an auxiliary storage when used with [AWS Deadline Cloud storage profiles][shared-storage], allowing you to flexibly upload files to your Amazon S3 bucket that aren't on your configured shared storage. @@ -18,6 +18,7 @@ See the [`examples`](https://github.com/aws-deadline/deadline-cloud/tree/mainlin [worker-agent]: https://github.com/aws-deadline/deadline-cloud-worker-agent/blob/release/docs/ [developer-guide]: https://docs.aws.amazon.com/deadline-cloud/latest/developerguide/what-job-attachments-uploads-to-amazon-s3.html [deadline job download-output]: cli_reference/deadline_job.md#download-output +[deadline job download-input]: cli_reference/deadline_job.md#download-input ## Job Attachments Bucket Structure diff --git a/src/deadline/client/cli/_groups/job_group.py b/src/deadline/client/cli/_groups/job_group.py index 3376c7fd9..ad111e095 100644 --- a/src/deadline/client/cli/_groups/job_group.py +++ b/src/deadline/client/cli/_groups/job_group.py @@ -21,7 +21,7 @@ from botocore.exceptions import ClientError from ...api._session import _modified_logging_level -from ....job_attachments.download import OutputDownloader +from ....job_attachments.download import OutputDownloader, InputDownloader from ....job_attachments.models import ( FileConflictResolution, JobAttachmentS3Settings, @@ -1071,6 +1071,139 @@ def job_download_output(step_id, task_id, output, include_path, include_path_std _handle_download_error(e, is_json_format, "output") +def _download_job_input( + config: Optional[ConfigParser], + farm_id: str, + queue_id: str, + job_id: str, + is_json_format: bool = False, + include_paths: Optional[list[str]] = None, +): + """ + Starts the download of job input files and handles the progress reporting callback. + """ + _, job, queue, queue_role_session, job_attachments, root_path_format_mapping = ( + _get_job_download_context(config, farm_id, queue_id, job_id) + ) + + click.echo( + _get_json_line(JSON_MSG_TYPE_TITLE, job["name"]) + if is_json_format + else f"Downloading input from Job {job['name']!r}" + ) + + if not job_attachments: + msg = "No input attachments found for this job." + click.echo(_get_json_line(JSON_MSG_TYPE_SUMMARY, msg) if is_json_format else msg) + return + + from ....job_attachments.models import Attachments, ManifestProperties + + attachments = Attachments( + manifests=[ + ManifestProperties( + fileSystemLocationName=m.get("fileSystemLocationName", None), + rootPath=m["rootPath"], + rootPathFormat=PathFormat(m["rootPathFormat"]), + outputRelativeDirectories=m.get("outputRelativeDirectories", None), + inputManifestPath=m.get("inputManifestPath", None), + ) + for m in job_attachments.get("manifests", []) + ], + ) + + downloader = InputDownloader( + s3_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]), + attachments=attachments, + session=queue_role_session, + path_filters=include_paths, + ) + + output_paths_by_root = downloader.get_output_paths_by_root() + if not output_paths_by_root: + msg = ( + "No files matched the provided path filters." + if include_paths + else "No input files found for this job." + ) + click.echo(_get_json_line(JSON_MSG_TYPE_SUMMARY, msg) if is_json_format else msg) + return + + _run_download_ux( + downloader=downloader, + output_paths_by_root=output_paths_by_root, + root_path_format_mapping=root_path_format_mapping, + is_json_format=is_json_format, + config=config, + label="Inputs", + telemetry_metric="download_job_input", + ) + + +@cli_job.command(name="download-input") +@click.option("--profile", help="The AWS profile to use.") +@click.option("--farm-id", help="The farm to use.") +@click.option("--queue-id", help="The queue to use.") +@click.option("--job-id", help="The job to use.") +@click.option( + "--include-path", + multiple=True, + help="Download only files matching this relative path or directory prefix (trailing /). Repeatable.", +) +@click.option( + "--include-path-stdin", + is_flag=True, + help="Read include paths from stdin, one per line.", +) +@click.option( + "--conflict-resolution", + type=click.Choice( + [ + FileConflictResolution.SKIP.name, + FileConflictResolution.OVERWRITE.name, + FileConflictResolution.CREATE_COPY.name, + ], + case_sensitive=False, + ), + help="How to handle downloads if a file already exists:\n" + "CREATE_COPY (default): Download the file with a new name, appending '(1)' to the end\n" + "SKIP: Do not download the file\n" + "OVERWRITE: Download and replace the existing file", +) +@click.option("--yes", is_flag=True, help="Automatically accept any confirmation prompts") +@click.option( + "--output", + type=click.Choice(["verbose", "json"], case_sensitive=False), + help="Specifies the output format of the messages printed to stdout.\n" + "VERBOSE: Displays messages in a human-readable text format.\n" + "JSON: Displays messages in JSON line format.", +) +@_handle_error +def job_download_input(output, include_path, include_path_stdin, **args): + """ + Download the input attachments of a Deadline Cloud job. + + \b + Learn more about [job attachments](https://docs.aws.amazon.com/deadline-cloud/latest/userguide/storage-job-attachments.html) + """ + filters, config, farm_id, queue_id, job_id = _parse_filters_and_config( + include_path, include_path_stdin, args + ) + is_json_format = True if output == "json" else False + + try: + _download_job_input( + config, + farm_id, + queue_id, + job_id, + is_json_format, + include_paths=filters, + ) + except Exception as e: + _handle_download_error(e, is_json_format, "input") + + @cli_job.command(name="wait") @click.option("--profile", help="The AWS profile to use.") @click.option("--farm-id", help="The farm to use.") diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index 2d4653289..1cd8f601b 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -1459,6 +1459,37 @@ def download_job_output( return progress_tracker.get_download_summary_statistics(downloaded_files_paths_by_root) +class InputDownloader(OutputDownloader): + """ + Handler for downloading input attachment files for a given job. + Inherits download mechanics from OutputDownloader, only differs in + how manifests are fetched (input manifests from job attachments + instead of output manifests from S3 listing). + """ + + def __init__( + self, + s3_settings: JobAttachmentS3Settings, + attachments: Attachments, + session: Optional[boto3.Session] = None, + path_filters: Optional[list[str]] = None, + ) -> None: + input_paths = get_job_input_paths_by_asset_root( + s3_settings=s3_settings, + attachments=attachments, + session=session, + ) + super().__init__( + s3_settings=s3_settings, + farm_id="", + queue_id="", + job_id="", + session=session, + path_filters=path_filters, + _outputs_by_root=input_paths, + ) + + def _get_manifests_by_session_action_id( s3_settings: JobAttachmentS3Settings, farm_id: str,