Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/job_attachments_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Job attachments uses your configured S3 bucket as a [content-addressable storage](https://en.wikipedia.org/wiki/Content-addressable_storage), which creates a snapshot of the files used in your job submission in [asset manifests](#asset-manifests), only uploading files that aren't already in S3. This saves you time and bandwidth when iterating on jobs. When an [AWS Deadline Cloud worker agent][worker-agent] starts working on a job with job attachments, it recreates the file system snapshot in the worker agent session directory, and uploads any outputs back to your S3 bucket.

You can then easily download your outputs with the [deadline job download-output] command, or using the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor].
You can then easily download your outputs with the [deadline job download-output] command, or download the original input files with the [deadline job download-input] command. You can also use the [protocol handler](#protocol-handler) to download from a click of a button in the [AWS Deadline Cloud monitor][monitor].

Job attachments also works as an auxiliary storage when used with [AWS Deadline Cloud storage profiles][shared-storage], allowing you to flexibly upload files to your Amazon S3 bucket that aren't on your configured shared storage.

Expand All @@ -18,6 +18,7 @@ See the [`examples`](https://github.com/aws-deadline/deadline-cloud/tree/mainlin
[worker-agent]: https://github.com/aws-deadline/deadline-cloud-worker-agent/blob/release/docs/
[developer-guide]: https://docs.aws.amazon.com/deadline-cloud/latest/developerguide/what-job-attachments-uploads-to-amazon-s3.html
[deadline job download-output]: cli_reference/deadline_job.md#download-output
[deadline job download-input]: cli_reference/deadline_job.md#download-input

## Job Attachments Bucket Structure

Expand Down
247 changes: 246 additions & 1 deletion src/deadline/client/cli/_groups/job_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
from botocore.exceptions import ClientError

from ...api._session import _modified_logging_level
from ....job_attachments.download import OutputDownloader
from ....job_attachments.download import OutputDownloader, _InputDownloader
from ....job_attachments.models import (
Attachments,
FileConflictResolution,
JobAttachmentS3Settings,
ManifestProperties,
PathFormat,
)
from ....job_attachments._utils import (
Expand Down Expand Up @@ -987,6 +989,249 @@
raise DeadlineOperationError(f"Failed to download output:\n{e}") from e


@cli_job.command(name="download-input")
@click.option("--profile", help="The AWS profile to use.")
@click.option("--farm-id", help="The farm to use.")
@click.option("--queue-id", help="The queue to use.")
@click.option("--job-id", help="The job to use.")
@click.option(
"--conflict-resolution",
type=click.Choice(
[
FileConflictResolution.SKIP.name,
FileConflictResolution.OVERWRITE.name,
FileConflictResolution.CREATE_COPY.name,
],
case_sensitive=False,
),
help="How to handle downloads if a file already exists:\n"
"CREATE_COPY (default): Download the file with a new name, appending '(1)' to the end\n"
"SKIP: Do not download the file\n"
"OVERWRITE: Download and replace the existing file",
)
@click.option(
"--yes",
is_flag=True,
help="Automatically accept any confirmation prompts",
)
@click.option(
"--output",
type=click.Choice(
["verbose", "json"],
case_sensitive=False,
),
help="Specifies the output format of the messages printed to stdout.\n"
"VERBOSE: Displays messages in a human-readable text format.\n"
"JSON: Displays messages in JSON line format.",
)
@_handle_error
def job_download_input(output, **args):
Comment thread
rickrams marked this conversation as resolved.
"""
Download the input files of a [Deadline Cloud job] that were saved as [job attachments].

[Deadline Cloud job]: https://docs.aws.amazon.com/deadline-cloud/latest/userguide/deadline-cloud-jobs.html
[job attachments]: https://docs.aws.amazon.com/deadline-cloud/latest/userguide/storage-job-attachments.html
"""
config = _apply_cli_options_to_config(
required_options={"farm_id", "queue_id", "job_id"}, **args
)

farm_id = config_file.get_setting("defaults.farm_id", config=config)
queue_id = config_file.get_setting("defaults.queue_id", config=config)
job_id = config_file.get_setting("defaults.job_id", config=config)
is_json_format = output == "json"

try:
_download_job_input(config, farm_id, queue_id, job_id, is_json_format)
except Exception as e:
if is_json_format:
error_one_liner = str(e).replace("\n", ". ")
click.echo(_get_json_line(JSON_MSG_TYPE_ERROR, error_one_liner))
sys.exit(1)
else:
if logging.DEBUG >= logger.getEffectiveLevel():
logger.exception("Exception details:")
raise DeadlineOperationError(f"Failed to download input:\n{e}") from e


def _download_job_input(

Check failure on line 1057 in src/deadline/client/cli/_groups/job_group.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 36 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=aws-deadline_deadline-cloud&issues=AZ1Kh5HRxDMSx2rFsNz4&open=AZ1Kh5HRxDMSx2rFsNz4&pullRequest=983
config: Optional[ConfigParser],
farm_id: str,
queue_id: str,
job_id: str,
is_json_format: bool = False,
):
"""Starts the download of job input and handles the progress reporting callback."""
deadline = api.get_boto3_client("deadline", config=config)

auto_accept = config_file.str2bool(
config_file.get_setting("settings.auto_accept", config=config)
)
conflict_resolution = config_file.get_setting("settings.conflict_resolution", config=config)

job = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id)

click.echo(
_get_json_line(JSON_MSG_TYPE_TITLE, f"Downloading inputs for job: {job['name']}")
if is_json_format
else f"Downloading inputs for job: {job['name']}"
)

job_attachments = job.get("attachments", None)
if not job_attachments:
click.echo(
_get_json_line(JSON_MSG_TYPE_TITLE, "No input attachments found for this job.")
if is_json_format
else "No input attachments found for this job."
)
return

queue = deadline.get_queue(farmId=farm_id, queueId=queue_id)

queue_role_session = api.get_queue_user_boto3_session(
deadline=deadline,
config=config,
farm_id=farm_id,
queue_id=queue_id,
queue_display_name=queue["displayName"],
)

attachments = Attachments(
manifests=[ManifestProperties(**m) for m in job_attachments["manifests"]],
fileSystem=job_attachments.get("fileSystem", "COPIED"),
)

# Build mapping of root paths to their OS format for cross-platform detection
root_path_format_mapping: dict[str, str] = {}
for manifest in job_attachments["manifests"]:
root_path_format_mapping[manifest["rootPath"]] = manifest["rootPathFormat"]

job_input_downloader = _InputDownloader(
s3_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]),
attachments=attachments,
session=queue_role_session,
)

input_paths_by_root = job_input_downloader.get_input_paths_by_root()
if not input_paths_by_root:
click.echo(
_get_json_line(JSON_MSG_TYPE_TITLE, "No input files found for this job.")
if is_json_format
else "No input files found for this job."
)
return

# Check if asset roots came from different OS. If so, prompt for alternative paths.
asset_roots = list(input_paths_by_root.keys())
for asset_root in asset_roots:
root_path_format = root_path_format_mapping.get(asset_root, "")
if root_path_format == "":
raise DeadlineOperationError(f"No root path format found for {asset_root}.")
if PathFormat.get_host_path_format_string() != root_path_format:
click.echo(_get_mismatch_os_root_warning(asset_root, root_path_format, is_json_format))

if not is_json_format:
new_root = click.prompt(
"> Please enter a new root path",
type=click.Path(exists=False),
)
else:
json_string = click.prompt("", prompt_suffix="", type=str)
new_root = _get_value_from_json_line(
json_string, JSON_MSG_TYPE_PATHCONFIRM, expected_size=1
)[0]
_assert_valid_path(new_root)

job_input_downloader.set_root_path(asset_root, os.path.expanduser(new_root))

input_paths_by_root = job_input_downloader.get_input_paths_by_root()

# Prompt for root path changes if not auto-accept
if not auto_accept and not is_json_format:
user_choice = ""
while user_choice not in ("y", "n"):
click.echo(
_get_summary_of_files_to_download_message(input_paths_by_root, is_json_format)
)
asset_roots = list(input_paths_by_root.keys())
click.echo(_get_roots_list_message(asset_roots, is_json_format))
user_choice = click.prompt(
"> Please enter the index of root directory to edit, y to proceed, or n to cancel",
type=click.Choice([*[str(num) for num in range(len(asset_roots))], "y", "n"]),
default="y",
)
if user_choice == "n":
click.echo("Input download canceled.")
return
elif user_choice != "y":
index_to_change = int(user_choice)
new_root = click.prompt(
"> Please enter the new root directory path",
type=click.Path(exists=False),
default=asset_roots[index_to_change],
)
job_input_downloader.set_root_path(
asset_roots[index_to_change], str(Path(new_root))
)
input_paths_by_root = job_input_downloader.get_input_paths_by_root()

if not is_json_format:
all_input_paths: set[str] = set()
for asset_root, input_paths in input_paths_by_root.items():
all_input_paths.update(
os.path.normpath(os.path.join(asset_root, path)) for path in input_paths
)
click.echo("\nSummary of file paths to download:")
click.echo(textwrap.indent(summarize_path_list(all_input_paths), " "))

# Handle conflict resolution
if conflict_resolution and conflict_resolution != FileConflictResolution.NOT_SELECTED.name:
file_conflict_resolution = FileConflictResolution[conflict_resolution]
else:
file_conflict_resolution = FileConflictResolution.CREATE_COPY

with _modified_logging_level(logging.getLogger("urllib3"), logging.ERROR):

@api.record_success_fail_telemetry_event(metric_name="download_job_input")
def _do_download(
file_conflict_resolution: FileConflictResolution,
on_downloading_files: Optional[Callable[[ProgressReportMetadata], bool]] = None,
) -> DownloadSummaryStatistics:
return job_input_downloader.download_job_input(
file_conflict_resolution=file_conflict_resolution,
on_downloading_files=on_downloading_files,
)

if not is_json_format:
with click.progressbar(length=100, label="Downloading Inputs") as download_progress: # type: ignore[var-annotated]

def _update_progress(download_metadata: ProgressReportMetadata) -> bool:
new_progress = int(download_metadata.progress) - download_progress.pos
if new_progress > 0:
download_progress.update(new_progress)
return sigint_handler.continue_operation

download_summary = _do_download(
file_conflict_resolution=file_conflict_resolution,
on_downloading_files=_update_progress,
)
else:

def _update_progress(download_metadata: ProgressReportMetadata) -> bool:
click.echo(
_get_json_line(JSON_MSG_TYPE_PROGRESS, str(int(download_metadata.progress)))
)
return True

download_summary = _do_download(
file_conflict_resolution=file_conflict_resolution,
on_downloading_files=_update_progress,
)

click.echo(_get_download_summary_message(download_summary, is_json_format))
click.echo()


@cli_job.command(name="wait")
@click.option("--profile", help="The AWS profile to use.")
@click.option("--farm-id", help="The farm to use.")
Expand Down
Loading
Loading