diff --git a/docs/job_attachments_guide.md b/docs/job_attachments_guide.md index 8f8af8951..07dc936ce 100644 --- a/docs/job_attachments_guide.md +++ b/docs/job_attachments_guide.md @@ -4,7 +4,7 @@ Job attachments uses your configured S3 bucket as a [content-addressable storage](https://en.wikipedia.org/wiki/Content-addressable_storage), which creates a snapshot of the files used in your job submission in [asset manifests](#asset-manifests), only uploading files that aren't already in S3. This saves you time and bandwidth when iterating on jobs. When an [AWS Deadline Cloud worker agent][worker-agent] starts working on a job with job attachments, it recreates the file system snapshot in the worker agent session directory, and uploads any outputs back to your S3 bucket. -You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. +You can then easily download your outputs with the [deadline job download-output] command, or download the original input files with the [deadline job download-input] command. You can also use the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor]. Job attachments also works as an auxiliary storage when used with [AWS Deadline Cloud storage profiles][shared-storage], allowing you to flexibly upload files to your Amazon S3 bucket that aren't on your configured shared storage. @@ -18,6 +18,7 @@ See the [`examples`](https://github.com/aws-deadline/deadline-cloud/tree/mainlin [worker-agent]: https://github.com/aws-deadline/deadline-cloud-worker-agent/blob/release/docs/ [developer-guide]: https://docs.aws.amazon.com/deadline-cloud/latest/developerguide/what-job-attachments-uploads-to-amazon-s3.html [deadline job download-output]: cli_reference/deadline_job.md#download-output +[deadline job download-input]: cli_reference/deadline_job.md#download-input ## Job Attachments Bucket Structure diff --git a/src/deadline/client/cli/_groups/job_group.py b/src/deadline/client/cli/_groups/job_group.py index e8fdb2f25..97c1637fb 100644 --- a/src/deadline/client/cli/_groups/job_group.py +++ b/src/deadline/client/cli/_groups/job_group.py @@ -21,10 +21,12 @@ from botocore.exceptions import ClientError from ...api._session import _modified_logging_level -from ....job_attachments.download import OutputDownloader +from ....job_attachments.download import OutputDownloader, _InputDownloader from ....job_attachments.models import ( + Attachments, FileConflictResolution, JobAttachmentS3Settings, + ManifestProperties, PathFormat, ) from ....job_attachments._utils import ( @@ -987,6 +989,249 @@ def job_download_output(step_id, task_id, output, **args): raise DeadlineOperationError(f"Failed to download output:\n{e}") from e +@cli_job.command(name="download-input") +@click.option("--profile", help="The AWS profile to use.") +@click.option("--farm-id", help="The farm to use.") +@click.option("--queue-id", help="The queue to use.") +@click.option("--job-id", help="The job to use.") +@click.option( + "--conflict-resolution", + type=click.Choice( + [ + FileConflictResolution.SKIP.name, + FileConflictResolution.OVERWRITE.name, + FileConflictResolution.CREATE_COPY.name, + ], + case_sensitive=False, + ), + help="How to handle downloads if a file already exists:\n" + "CREATE_COPY (default): Download the file with a new name, appending '(1)' to the end\n" + "SKIP: Do not download the file\n" + "OVERWRITE: Download and replace the existing file", +) +@click.option( + "--yes", + is_flag=True, + help="Automatically accept any confirmation prompts", +) +@click.option( + "--output", + type=click.Choice( + ["verbose", "json"], + case_sensitive=False, + ), + help="Specifies the output format of the messages printed to stdout.\n" + "VERBOSE: Displays messages in a human-readable text format.\n" + "JSON: Displays messages in JSON line format.", +) +@_handle_error +def job_download_input(output, **args): + """ + Download the input files of a [Deadline Cloud job] that were saved as [job attachments]. + + [Deadline Cloud job]: https://docs.aws.amazon.com/deadline-cloud/latest/userguide/deadline-cloud-jobs.html + [job attachments]: https://docs.aws.amazon.com/deadline-cloud/latest/userguide/storage-job-attachments.html + """ + config = _apply_cli_options_to_config( + required_options={"farm_id", "queue_id", "job_id"}, **args + ) + + farm_id = config_file.get_setting("defaults.farm_id", config=config) + queue_id = config_file.get_setting("defaults.queue_id", config=config) + job_id = config_file.get_setting("defaults.job_id", config=config) + is_json_format = output == "json" + + try: + _download_job_input(config, farm_id, queue_id, job_id, is_json_format) + except Exception as e: + if is_json_format: + error_one_liner = str(e).replace("\n", ". ") + click.echo(_get_json_line(JSON_MSG_TYPE_ERROR, error_one_liner)) + sys.exit(1) + else: + if logging.DEBUG >= logger.getEffectiveLevel(): + logger.exception("Exception details:") + raise DeadlineOperationError(f"Failed to download input:\n{e}") from e + + +def _download_job_input( + config: Optional[ConfigParser], + farm_id: str, + queue_id: str, + job_id: str, + is_json_format: bool = False, +): + """Starts the download of job input and handles the progress reporting callback.""" + deadline = api.get_boto3_client("deadline", config=config) + + auto_accept = config_file.str2bool( + config_file.get_setting("settings.auto_accept", config=config) + ) + conflict_resolution = config_file.get_setting("settings.conflict_resolution", config=config) + + job = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id) + + click.echo( + _get_json_line(JSON_MSG_TYPE_TITLE, f"Downloading inputs for job: {job['name']}") + if is_json_format + else f"Downloading inputs for job: {job['name']}" + ) + + job_attachments = job.get("attachments", None) + if not job_attachments: + click.echo( + _get_json_line(JSON_MSG_TYPE_TITLE, "No input attachments found for this job.") + if is_json_format + else "No input attachments found for this job." + ) + return + + queue = deadline.get_queue(farmId=farm_id, queueId=queue_id) + + queue_role_session = api.get_queue_user_boto3_session( + deadline=deadline, + config=config, + farm_id=farm_id, + queue_id=queue_id, + queue_display_name=queue["displayName"], + ) + + attachments = Attachments( + manifests=[ManifestProperties(**m) for m in job_attachments["manifests"]], + fileSystem=job_attachments.get("fileSystem", "COPIED"), + ) + + # Build mapping of root paths to their OS format for cross-platform detection + root_path_format_mapping: dict[str, str] = {} + for manifest in job_attachments["manifests"]: + root_path_format_mapping[manifest["rootPath"]] = manifest["rootPathFormat"] + + job_input_downloader = _InputDownloader( + s3_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]), + attachments=attachments, + session=queue_role_session, + ) + + input_paths_by_root = job_input_downloader.get_input_paths_by_root() + if not input_paths_by_root: + click.echo( + _get_json_line(JSON_MSG_TYPE_TITLE, "No input files found for this job.") + if is_json_format + else "No input files found for this job." + ) + return + + # Check if asset roots came from different OS. If so, prompt for alternative paths. + asset_roots = list(input_paths_by_root.keys()) + for asset_root in asset_roots: + root_path_format = root_path_format_mapping.get(asset_root, "") + if root_path_format == "": + raise DeadlineOperationError(f"No root path format found for {asset_root}.") + if PathFormat.get_host_path_format_string() != root_path_format: + click.echo(_get_mismatch_os_root_warning(asset_root, root_path_format, is_json_format)) + + if not is_json_format: + new_root = click.prompt( + "> Please enter a new root path", + type=click.Path(exists=False), + ) + else: + json_string = click.prompt("", prompt_suffix="", type=str) + new_root = _get_value_from_json_line( + json_string, JSON_MSG_TYPE_PATHCONFIRM, expected_size=1 + )[0] + _assert_valid_path(new_root) + + job_input_downloader.set_root_path(asset_root, os.path.expanduser(new_root)) + + input_paths_by_root = job_input_downloader.get_input_paths_by_root() + + # Prompt for root path changes if not auto-accept + if not auto_accept and not is_json_format: + user_choice = "" + while user_choice not in ("y", "n"): + click.echo( + _get_summary_of_files_to_download_message(input_paths_by_root, is_json_format) + ) + asset_roots = list(input_paths_by_root.keys()) + click.echo(_get_roots_list_message(asset_roots, is_json_format)) + user_choice = click.prompt( + "> Please enter the index of root directory to edit, y to proceed, or n to cancel", + type=click.Choice([*[str(num) for num in range(len(asset_roots))], "y", "n"]), + default="y", + ) + if user_choice == "n": + click.echo("Input download canceled.") + return + elif user_choice != "y": + index_to_change = int(user_choice) + new_root = click.prompt( + "> Please enter the new root directory path", + type=click.Path(exists=False), + default=asset_roots[index_to_change], + ) + job_input_downloader.set_root_path( + asset_roots[index_to_change], str(Path(new_root)) + ) + input_paths_by_root = job_input_downloader.get_input_paths_by_root() + + if not is_json_format: + all_input_paths: set[str] = set() + for asset_root, input_paths in input_paths_by_root.items(): + all_input_paths.update( + os.path.normpath(os.path.join(asset_root, path)) for path in input_paths + ) + click.echo("\nSummary of file paths to download:") + click.echo(textwrap.indent(summarize_path_list(all_input_paths), " ")) + + # Handle conflict resolution + if conflict_resolution and conflict_resolution != FileConflictResolution.NOT_SELECTED.name: + file_conflict_resolution = FileConflictResolution[conflict_resolution] + else: + file_conflict_resolution = FileConflictResolution.CREATE_COPY + + with _modified_logging_level(logging.getLogger("urllib3"), logging.ERROR): + + @api.record_success_fail_telemetry_event(metric_name="download_job_input") + def _do_download( + file_conflict_resolution: FileConflictResolution, + on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None, + ) -> DownloadSummaryStatistics: + return job_input_downloader.download_job_input( + file_conflict_resolution=file_conflict_resolution, + on_downloading_files=on_downloading_files, + ) + + if not is_json_format: + with click.progressbar(length=100, label="Downloading Inputs") as download_progress: # type: ignore[var-annotated] + + def _update_progress(download_metadata: ProgressReportMetadata) -> bool: + new_progress = int(download_metadata.progress) - download_progress.pos + if new_progress > 0: + download_progress.update(new_progress) + return sigint_handler.continue_operation + + download_summary = _do_download( + file_conflict_resolution=file_conflict_resolution, + on_downloading_files=_update_progress, + ) + else: + + def _update_progress(download_metadata: ProgressReportMetadata) -> bool: + click.echo( + _get_json_line(JSON_MSG_TYPE_PROGRESS, str(int(download_metadata.progress))) + ) + return True + + download_summary = _do_download( + file_conflict_resolution=file_conflict_resolution, + on_downloading_files=_update_progress, + ) + + click.echo(_get_download_summary_message(download_summary, is_json_format)) + click.echo() + + @cli_job.command(name="wait") @click.option("--profile", help="The AWS profile to use.") @click.option("--farm-id", help="The farm to use.") diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index 2a0161114..fb2d42abc 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -73,7 +73,6 @@ from ._utils import ( _get_long_path_compatible_path, _is_relative_to, - _join_s3_paths, ) from threading import Lock @@ -325,7 +324,9 @@ def get_job_input_paths_by_asset_root( for manifest_properties in attachments.manifests: if manifest_properties.inputManifestPath: - key = _join_s3_paths(manifest_properties.inputManifestPath) + key = s3_settings.add_root_and_manifest_folder_prefix( + manifest_properties.inputManifestPath + ) _, asset_manifest = get_asset_root_and_manifest_from_s3( manifest_key=key, s3_bucket=s3_settings.s3BucketName, @@ -1236,6 +1237,122 @@ def mount_vfs_from_manifests( vfs_manager.start(session_dir=session_dir) +class _InputDownloader: + """ + Handler for downloading all input files from the given job. + If no session is provided the default credentials path will be used. + """ + + def __init__( + self, + s3_settings: JobAttachmentS3Settings, + attachments: Attachments, + session: Optional[boto3.Session] = None, + ) -> None: + self.s3_settings = s3_settings + self.session = session + self.inputs_by_root = get_job_input_paths_by_asset_root( + s3_settings=s3_settings, + attachments=attachments, + session=session, + ) + + def get_input_paths_by_root(self) -> dict[str, list[str]]: + """Returns a dict of asset root paths to lists of input paths.""" + input_paths_by_root: dict[str, list[str]] = {} + for root, path_group in self.inputs_by_root.items(): + input_paths_by_root[root] = path_group.get_all_paths() + return input_paths_by_root + + def set_root_path(self, original_root: str, new_root: str) -> None: + """Changes the root path for downloading input files.""" + new_root = str(os.path.normpath(Path(new_root).absolute())) + + if original_root not in self.inputs_by_root: + raise ValueError(f"The root path {original_root} was not found in input manifests.") + + if new_root == original_root: + return + + # If the new root already exists (user is merging two roots), handle filename + # collisions by prefixing conflicting files with the original root path + if new_root in self.inputs_by_root: + paths_in_new_root = self.inputs_by_root[new_root].get_all_paths() + for manifest_paths in self.inputs_by_root[original_root].files_by_hash_alg.values(): + for manifest_path in manifest_paths: + if manifest_path.path in paths_in_new_root: + # Rename conflicting file: prefix with sanitized original root + new_name_prefix = ( + original_root.replace("/", "_").replace("\\", "_").replace(":", "_") + ) + manifest_path.path = str( + Path(manifest_path.path).with_name( + f"{new_name_prefix}_{manifest_path.path}" + ) + ) + # Merge the original root's files into the new root and remove original + self.inputs_by_root[new_root].combine_with_group(self.inputs_by_root[original_root]) + del self.inputs_by_root[original_root] + else: + # Simple rename: just change the key in the dictionary + self.inputs_by_root = { + key if key != original_root else new_root: value + for key, value in self.inputs_by_root.items() + } + + def download_job_input( + self, + file_conflict_resolution: Optional[ + FileConflictResolution + ] = FileConflictResolution.CREATE_COPY, + on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None, + ) -> DownloadSummaryStatistics: + """ + Downloads input files from S3 bucket to the asset root(s). + """ + total_bytes: int = 0 + total_files: int = 0 + for path_group in self.inputs_by_root.values(): + total_bytes += path_group.total_bytes + total_files += len(path_group.get_all_paths()) + + progress_tracker = ProgressTracker( + status=ProgressStatus.DOWNLOAD_IN_PROGRESS, + total_files=total_files, + total_bytes=total_bytes, + on_progress_callback=on_downloading_files, + ) + + start_time = time.perf_counter() + downloaded_files_paths_by_root: DefaultDict[str, list[str]] = DefaultDict(list) + + try: + for root, path_group in self.inputs_by_root.items(): + for hash_alg, path_list in path_group.files_by_hash_alg.items(): + _ensure_paths_within_directory(root, [file.path for file in path_list]) + + downloaded_files_paths = download_files( + files=path_list, + hash_algorithm=hash_alg, + local_download_dir=root, + s3_settings=self.s3_settings, + session=self.session, + progress_tracker=progress_tracker, + file_conflict_resolution=file_conflict_resolution, + ) + downloaded_files_paths_by_root[root].extend(downloaded_files_paths) + except AssetSyncCancelledError: + downloaded_files = progress_tracker.processed_files + raise AssetSyncCancelledError( + "Download cancelled. " + f"(Downloaded {downloaded_files} file{'' if downloaded_files == 1 else 's'} before cancellation.)" + ) + + progress_tracker.total_time = time.perf_counter() - start_time + + return progress_tracker.get_download_summary_statistics(downloaded_files_paths_by_root) + + def _ensure_paths_within_directory(root_path: str, paths_relative_to_root: list[str]) -> None: """ Validates the given paths to ensure that they are within the given root path. diff --git a/test/unit/deadline_client/cli/test_cli_job.py b/test/unit/deadline_client/cli/test_cli_job.py index a1d08da29..f57a43c34 100644 --- a/test/unit/deadline_client/cli/test_cli_job.py +++ b/test/unit/deadline_client/cli/test_cli_job.py @@ -909,6 +909,236 @@ def test_get_summary_of_files_to_download_message_windows( ) +def test_cli_job_download_input_success(fresh_deadline_config, tmp_path: Path): + """ + Tests that the download-input command successfully downloads input files. + Uses moto for S3 mocking via the deadline_mock fixture pattern. + """ + config.set_setting("defaults.farm_id", MOCK_FARM_ID) + config.set_setting("defaults.queue_id", MOCK_QUEUE_ID) + + with patch.object(api, "get_boto3_client") as boto3_client_mock, patch.object( + job_group, "_InputDownloader" + ) as MockInputDownloader, patch.object( + job_group, "_get_conflicting_filenames", return_value=[] + ), patch.object(job_group, "round", return_value=0), patch.object( + api, "get_queue_user_boto3_session" + ): + mock_download = MagicMock() + mock_download.return_value = DownloadSummaryStatistics( + total_time=5, + processed_files=2, + processed_bytes=512, + ) + MockInputDownloader.return_value.download_job_input = mock_download + mock_root_path = "/root/path" if sys.platform != "win32" else "C:\\Users\\username" + mock_files_list = ["inputs/file1.txt", "inputs/file2.txt"] + MockInputDownloader.return_value.get_input_paths_by_root.side_effect = [ + {mock_root_path: mock_files_list}, + {mock_root_path: mock_files_list}, + {mock_root_path: mock_files_list}, + ] + + mock_host_path_format = PathFormat.get_host_path_format() + + boto3_client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE] + boto3_client_mock().get_job.return_value = { + "name": "Mock Job", + "attachments": { + "manifests": [ + { + "rootPath": mock_root_path, + "rootPathFormat": mock_host_path_format, + "inputManifestPath": f"{MOCK_FARM_ID}/{MOCK_QUEUE_ID}/Inputs/0000/manifest_input", + "inputManifestHash": "abc123", + }, + ], + }, + } + + runner = CliRunner() + result = runner.invoke( + main, + ["job", "download-input", "--job-id", MOCK_JOB_ID, "--output", "verbose"], + input="y\n", + ) + + MockInputDownloader.assert_called_once_with( + s3_settings=JobAttachmentS3Settings(**MOCK_GET_QUEUE_RESPONSE["jobAttachmentSettings"]), # type: ignore + attachments=ANY, + session=ANY, + ) + + assert "Downloading inputs for job: Mock Job" in result.output + assert "Download Summary:" in result.output + assert result.exit_code == 0 + + +def test_cli_job_download_input_no_inputs(fresh_deadline_config, tmp_path: Path): + """ + Tests that download-input handles jobs with no input attachments gracefully. + """ + config.set_setting("defaults.farm_id", MOCK_FARM_ID) + config.set_setting("defaults.queue_id", MOCK_QUEUE_ID) + + with patch.object(api, "get_boto3_client") as boto3_client_mock, patch.object( + job_group, "_InputDownloader" + ) as MockInputDownloader, patch.object( + job_group, "_get_conflicting_filenames", return_value=[] + ), patch.object(api, "get_queue_user_boto3_session"): + MockInputDownloader.return_value.get_input_paths_by_root.return_value = {} + + mock_host_path_format = PathFormat.get_host_path_format() + boto3_client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE] + boto3_client_mock().get_job.return_value = { + "name": "Mock Job", + "attachments": { + "manifests": [ + { + "rootPath": "/root/path", + "rootPathFormat": mock_host_path_format, + }, + ], + }, + } + + runner = CliRunner() + result = runner.invoke( + main, + ["job", "download-input", "--job-id", MOCK_JOB_ID, "--output", "verbose"], + ) + + assert "no input files found" in result.output.lower() + assert result.exit_code == 0 + + +def test_cli_job_download_input_with_conflict_resolution(fresh_deadline_config, tmp_path: Path): + """ + Tests that download-input respects the conflict resolution option. + """ + config.set_setting("defaults.farm_id", MOCK_FARM_ID) + config.set_setting("defaults.queue_id", MOCK_QUEUE_ID) + + with patch.object(api, "get_boto3_client") as boto3_client_mock, patch.object( + job_group, "_InputDownloader" + ) as MockInputDownloader, patch.object( + job_group, "_get_conflicting_filenames", return_value=[] + ), patch.object(job_group, "round", return_value=0), patch.object( + api, "get_queue_user_boto3_session" + ): + mock_download = MagicMock() + mock_download.return_value = DownloadSummaryStatistics( + total_time=5, + processed_files=1, + processed_bytes=256, + ) + MockInputDownloader.return_value.download_job_input = mock_download + mock_root_path = "/root/path" if sys.platform != "win32" else "C:\\Users\\username" + MockInputDownloader.return_value.get_input_paths_by_root.return_value = { + mock_root_path: ["inputs/file1.txt"] + } + + mock_host_path_format = PathFormat.get_host_path_format() + boto3_client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE] + boto3_client_mock().get_job.return_value = { + "name": "Mock Job", + "attachments": { + "manifests": [ + { + "rootPath": mock_root_path, + "rootPathFormat": mock_host_path_format, + "inputManifestPath": f"{MOCK_FARM_ID}/{MOCK_QUEUE_ID}/Inputs/0000/manifest", + "inputManifestHash": "abc123", + }, + ], + }, + } + + runner = CliRunner() + result = runner.invoke( + main, + [ + "job", + "download-input", + "--job-id", + MOCK_JOB_ID, + "--conflict-resolution", + "OVERWRITE", + "--yes", + ], + ) + + mock_download.assert_called_once() + call_kwargs = mock_download.call_args[1] + assert call_kwargs["file_conflict_resolution"] == FileConflictResolution.OVERWRITE + assert result.exit_code == 0 + + +def test_cli_job_download_input_cross_os_path_prompt(fresh_deadline_config, tmp_path: Path): + """ + Tests that download-input prompts for a new path when job was submitted from different OS. + """ + config.set_setting("defaults.farm_id", MOCK_FARM_ID) + config.set_setting("defaults.queue_id", MOCK_QUEUE_ID) + + with patch.object(api, "get_boto3_client") as boto3_client_mock, patch.object( + job_group, "_InputDownloader" + ) as MockInputDownloader, patch.object( + job_group, "_get_conflicting_filenames", return_value=[] + ), patch.object(job_group, "round", return_value=0), patch.object( + api, "get_queue_user_boto3_session" + ): + mock_download = MagicMock() + mock_download.return_value = DownloadSummaryStatistics( + total_time=5, + processed_files=1, + processed_bytes=256, + ) + MockInputDownloader.return_value.download_job_input = mock_download + + # Use opposite OS path format to trigger cross-OS prompt + if sys.platform == "win32": + foreign_root = "/linux/path" + foreign_format = "posix" + local_root = str(tmp_path) + else: + foreign_root = "C:\\Windows\\path" + foreign_format = "windows" + local_root = str(tmp_path) + + MockInputDownloader.return_value.get_input_paths_by_root.side_effect = [ + {foreign_root: ["inputs/file1.txt"]}, + {local_root: ["inputs/file1.txt"]}, + {local_root: ["inputs/file1.txt"]}, + ] + + boto3_client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE] + boto3_client_mock().get_job.return_value = { + "name": "Mock Job", + "attachments": { + "manifests": [ + { + "rootPath": foreign_root, + "rootPathFormat": foreign_format, + "inputManifestPath": f"{MOCK_FARM_ID}/{MOCK_QUEUE_ID}/Inputs/0000/manifest", + "inputManifestHash": "abc123", + }, + ], + }, + } + + runner = CliRunner() + result = runner.invoke( + main, + ["job", "download-input", "--job-id", MOCK_JOB_ID, "--yes"], + input=f"{local_root}\n", + ) + + # Should have prompted for new path and called set_root_path + MockInputDownloader.return_value.set_root_path.assert_called_once() + assert result.exit_code == 0 + + def test_cli_job_wait_succeeded(fresh_deadline_config): """ Test that job wait command returns exit code 0 when job succeeds. diff --git a/test/unit/deadline_job_attachments/conftest.py b/test/unit/deadline_job_attachments/conftest.py index 272e377c0..ce5e41270 100644 --- a/test/unit/deadline_job_attachments/conftest.py +++ b/test/unit/deadline_job_attachments/conftest.py @@ -112,7 +112,7 @@ def fixture_default_attachments(farm_id, queue_id): ManifestProperties( rootPath="/tmp", rootPathFormat=PathFormat.POSIX, - inputManifestPath=f"assetRoot/Manifests/{farm_id}/{queue_id}/Inputs/0000/manifest_input", + inputManifestPath=f"{farm_id}/{queue_id}/Inputs/0000/manifest_input", inputManifestHash="manifesthash", outputRelativeDirectories=["test/outputs"], ) diff --git a/test/unit/deadline_job_attachments/test_download.py b/test/unit/deadline_job_attachments/test_download.py index 241e9dfb2..232e7c696 100644 --- a/test/unit/deadline_job_attachments/test_download.py +++ b/test/unit/deadline_job_attachments/test_download.py @@ -37,6 +37,7 @@ from deadline.job_attachments.asset_manifests.versions import ManifestVersion from deadline.job_attachments.download import ( OutputDownloader, + _InputDownloader, download_file, download_files_from_manifests, download_files_in_directory, @@ -739,6 +740,51 @@ def test_get_job_input_output_paths_by_asset_root( manifest_version, ) + def test_InputDownloader_download_job_input( + self, tmp_path: Path, manifest_version: ManifestVersion + ): + """Test that _InputDownloader can download input files.""" + assert self.job.attachments is not None + + input_downloader = _InputDownloader( + s3_settings=self.job_attachment_settings, + attachments=self.job.attachments, + ) + + # Get the original root path from the job's attachments + original_root = self.job.attachments.manifests[0].rootPath + + # Verify get_input_paths_by_root returns expected structure + input_paths = input_downloader.get_input_paths_by_root() + assert original_root in input_paths + assert len(input_paths[original_root]) == 5 # 5 input files + + # Test set_root_path to change download location + input_downloader.set_root_path(original_root, str(tmp_path)) + input_paths = input_downloader.get_input_paths_by_root() + assert str(tmp_path) in input_paths + + # Download the files + stats = input_downloader.download_job_input() + assert stats.processed_files == 5 + + # Verify files were downloaded + downloaded_files = list(tmp_path.glob("**/*")) + downloaded_files = [f for f in downloaded_files if f.is_file()] + assert len(downloaded_files) == 5 + + def test_InputDownloader_set_root_path_not_found(self): + """Test that set_root_path raises ValueError for unknown root.""" + assert self.job.attachments is not None + + input_downloader = _InputDownloader( + s3_settings=self.job_attachment_settings, + attachments=self.job.attachments, + ) + + with pytest.raises(ValueError, match="was not found"): + input_downloader.set_root_path("/nonexistent/path", "/new/path") + EXPECTED_DOWNLOAD_FILE_PATHS_RELATIVE = [ "inputs/input1.txt", "inputs/subdir/input2.txt",