diff --git a/src/deadline/client/api/_list_jobs_by_filter_expression.py b/src/deadline/client/api/_list_jobs_by_filter_expression.py index c750bfe66..c439be41a 100644 --- a/src/deadline/client/api/_list_jobs_by_filter_expression.py +++ b/src/deadline/client/api/_list_jobs_by_filter_expression.py @@ -60,7 +60,8 @@ def _list_jobs_by_filter_expression( boto3_session (boto3.Session): The boto3 Session for AWS API access. farm_id (str): The Farm ID. queue_id (str): The Queue ID. - filter_expressions (dict[str, Any]): The filter expression to apply to jobs. + filter_expressions (dict[str, Any]): The filter expression to apply to jobs. This is nested one level in a + filter expression provided to deadline:SearchJobs, so cannot include a groupFilter. Returns: The list of all jobs in the queue that satisfy the provided filter expression. Each job is as returned by the deadline:SearchJobs API. diff --git a/src/deadline/client/cli/_common.py b/src/deadline/client/cli/_common.py index 1b8873ae9..bc3152224 100644 --- a/src/deadline/client/cli/_common.py +++ b/src/deadline/client/cli/_common.py @@ -181,7 +181,7 @@ def _fix_multiline_strings(obj: Any) -> Any: return obj -def _cli_object_repr(obj: Any): +def _cli_object_repr(obj: Any) -> str: """ Transforms an API response object into a string, for printing as CLI output. This formats the output as YAML, using the "|"-style diff --git a/src/deadline/client/cli/_groups/queue_group.py b/src/deadline/client/cli/_groups/queue_group.py index 0d96bf7db..ec5105fe5 100644 --- a/src/deadline/client/cli/_groups/queue_group.py +++ b/src/deadline/client/cli/_groups/queue_group.py @@ -29,7 +29,7 @@ ) PID_FILE_NAME = "incremental_output_download.pid" -DOWNLOAD_PROGRESS_FILE_NAME = "download_progress.json" +DOWNLOAD_CHECKPOINT_FILE_NAME = "download_checkpoint.json" @click.group(name="queue") @@ -209,154 +209,151 @@ def queue_get(**args): click.echo(_cli_object_repr(response)) -if os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is not None: - - @cli_queue.command( - name="incremental-output-download", - help="BETA - Download Job Output data incrementally for all jobs running on a queue as session actions finish.\n" - "The command bootstraps once using a bootstrap lookback specified in minutes and\n" - "continues downloading from the last saved progress thereafter until bootstrap is forced.\n" - "[NOTE] This command is still WIP and partially implemented right now", - ) - @click.option("--farm-id", help="The AWS Deadline Cloud Farm to use.") - @click.option("--queue-id", help="The AWS Deadline Cloud Queue to use.") - @click.option("--path-mapping-rules", help="Path to a file with the path mapping rules to use.") - @click.option( - "--json", default=None, is_flag=True, help="Output is printed as JSON for scripting." - ) - @click.option( - "--bootstrap-lookback-in-minutes", - default=0, - type=float, - help="Downloads outputs for job-session-actions that have been completed since these many\n" - "minutes at bootstrap. Default value is 0 minutes.", - ) - @click.option( - "--saved-progress-checkpoint-location", - help="Proceed downloading from previous progress file at this location, if it exists.\n" - "If parameter not provided or file does not exist,\n" - "the download will start from the provided bootstrap lookback in minutes or its default value. \n", - required=True, - ) - @click.option( - "--force-bootstrap", - is_flag=True, - help="Ignores the previous download progress and forces command to start from the bootstrap \n" - "lookback period specified in minutes.\n" - "Default value is False.", - default=False, - ) - @click.option( - "--conflict-resolution", - type=click.Choice( - [ - FileConflictResolution.SKIP.name, - FileConflictResolution.OVERWRITE.name, - FileConflictResolution.CREATE_COPY.name, - ], - case_sensitive=False, - ), - default=FileConflictResolution.OVERWRITE.name, - help="How to handle downloads if an output file already exists:\n" - "CREATE_COPY (default): Download the file with a new name, appending '(1)' to the end\n" - "SKIP: Do not download the file\n" - "OVERWRITE: Download and replace the existing file.\n" - "Default behaviour is to OVERWRITE.", - ) - @_handle_error - def incremental_output_download( - path_mapping_rules: str, - json: bool, - bootstrap_lookback_in_minutes: float, - saved_progress_checkpoint_location: str, - force_bootstrap: bool, - **args, - ): - """ - Download Job Output data incrementally for all jobs running on a queue as session actions finish. - The command bootstraps once using a bootstrap lookback specified in minutes and - continues downloading from the last saved progress thereafter until bootstrap is forced - - :param path_mapping_rules: path mapping rules for cross OS path mapping - :param json: whether output is printed as JSON for scripting - :param bootstrap_lookback_in_minutes: Downloads outputs for job-session-actions that have been completed - since these many minutes at bootstrap. Default value is 0 minutes. - :param saved_progress_checkpoint_location: location of the download progress file - :param force_bootstrap: force bootstrap and ignore current download progress. Default value is False. - :param args: - :return: - """ - logger: ClickLogger = ClickLogger(is_json=json) - logger.echo("processing " + args.__str__()) - - logger.echo("Started incremental download....") - - # Check if download progress location is a valid directory on the os - if not os.path.isdir(saved_progress_checkpoint_location): - raise DeadlineOperationError( - f"Download progress location {saved_progress_checkpoint_location} is not a valid directory" - ) +@cli_queue.command(name="incremental-output-download") +@click.option("--farm-id", help="The AWS Deadline Cloud Farm to use.") +@click.option("--queue-id", help="The AWS Deadline Cloud Queue to use.") +@click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.") +@click.option( + "--bootstrap-lookback-minutes", + default=0, + type=float, + help="Downloads outputs for job-session-actions that have been completed since these many\n" + "minutes at bootstrap. Default value is 0 minutes.", +) +@click.option( + "--checkpoint-dir", + default=config_file.DEFAULT_QUEUE_INCREMENTAL_DOWNLOAD_DIR, + help="Proceed downloading from the previous progress file stored in this directory, if it exists.\n" + "If the file does not exist, the download will initialize using the bootstrap lookback in minutes. \n", +) +@click.option( + "--force-bootstrap", + is_flag=True, + help="Forces command to start from the bootstrap lookback period and overwrite any previous checkpoint.\n" + "Default value is False.", + default=False, +) +@click.option( + "--conflict-resolution", + type=click.Choice( + [ + FileConflictResolution.SKIP.name, + FileConflictResolution.OVERWRITE.name, + FileConflictResolution.CREATE_COPY.name, + ], + case_sensitive=False, + ), + default=FileConflictResolution.OVERWRITE.name, + help="How to handle downloads if an output file already exists:\n" + "CREATE_COPY: Download the file with a new name, appending '(1)' to the end\n" + "SKIP: Do not download the file\n" + "OVERWRITE (default): Download and replace the existing file.\n" + "Default behaviour is to OVERWRITE.", +) +@_handle_error +def incremental_output_download( + json: bool, + bootstrap_lookback_minutes: float, + checkpoint_dir: str, + force_bootstrap: bool, + **args, +): + """ + BETA - Downloads job attachments output incrementally for all jobs in a queue. When run for the + first time or with the --force-bootstrap option, it starts downloading from --bootstrap-lookback-minutes + in the past. When run each subsequent time, it loads the previous checkpoint and continues + where it left off. - # Check that download progress location is writable - if not os.access(saved_progress_checkpoint_location, os.W_OK): - raise DeadlineOperationError( - f"Download progress location {saved_progress_checkpoint_location} exists but is not writable, please provide write permissions" - ) + To try this command, set the ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD environment variable to 1 to acknowledge its + incomplete beta status. - # Get a temporary config object with the standard options handled - config: Optional[ConfigParser] = _apply_cli_options_to_config( - required_options={"farm_id", "queue_id"}, **args + [NOTE] This command is still WIP and partially implemented right now. + """ + if os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") != "1": + raise DeadlineOperationError( + "The incremental-output-download command is not fully implemented. You must set the environment variable ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD to 1 to acknowledge this." ) - # Get the default configs - farm_id = config_file.get_setting("defaults.farm_id", config=config) - queue_id = config_file.get_setting("defaults.queue_id", config=config) - boto3_session: boto3.Session = api.get_boto3_session(config=config) + logger: ClickLogger = ClickLogger(is_json=json) - # Get download progress file name appended by the queue id - a unique progress file exists per queue - download_progress_file_name: str = f"{queue_id}_{DOWNLOAD_PROGRESS_FILE_NAME}" + # Expand '~' to home directory and create the checkpoint directory if necessary + checkpoint_dir = os.path.abspath(os.path.expanduser(checkpoint_dir)) + os.makedirs(checkpoint_dir, exist_ok=True) - # Get saved progress file full path now that we've validated all file inputs are valid - checkpoint_file_path: str = os.path.join( - saved_progress_checkpoint_location, download_progress_file_name + # Check that download progress location is writable + if not os.access(checkpoint_dir, os.W_OK): + raise DeadlineOperationError( + f"Download progress checkpoint directory {checkpoint_dir} exists but is not writable, please provide write permissions" ) - # Perform incremental download while holding a process id lock + # Get a temporary config object with the standard options handled + config: Optional[ConfigParser] = _apply_cli_options_to_config( + required_options={"farm_id", "queue_id"}, **args + ) - pid_lock_file_path: str = os.path.join( - saved_progress_checkpoint_location, f"{queue_id}_{PID_FILE_NAME}" - ) + # Get the default configs + farm_id = config_file.get_setting("defaults.farm_id", config=config) + queue_id = config_file.get_setting("defaults.queue_id", config=config) + boto3_session: boto3.Session = api.get_boto3_session(config=config) - with PidFileLock( - pid_lock_file_path, - operation_name="incremental output download", - print_function_callback=logger.echo, - ): - current_download_state: IncrementalDownloadState - - if force_bootstrap or not os.path.exists(checkpoint_file_path): - # Bootstrap with the specified lookback duration - current_download_state = IncrementalDownloadState( - downloads_started_timestamp=datetime.now(timezone.utc) - - timedelta(minutes=bootstrap_lookback_in_minutes) - ) + # Get download progress file name appended by the queue id - a unique progress file exists per queue + download_checkpoint_file_name: str = f"{queue_id}_{DOWNLOAD_CHECKPOINT_FILE_NAME}" + + # Get saved progress file full path now that we've validated all file inputs are valid + checkpoint_file_path: str = os.path.join(checkpoint_dir, download_checkpoint_file_name) + + deadline = boto3_session.client("deadline") + response = deadline.get_queue(farmId=farm_id, queueId=queue_id) + logger.echo(f"Started incremental download for queue: {response['displayName']}") + logger.echo(f"Checkpoint: {checkpoint_file_path}") + logger.echo() + + # Perform incremental download while holding a process id lock + + pid_lock_file_path: str = os.path.join(checkpoint_dir, f"{queue_id}_{PID_FILE_NAME}") + + with PidFileLock( + pid_lock_file_path, + operation_name="incremental output download", + ): + current_download_state: IncrementalDownloadState + + if force_bootstrap or not os.path.exists(checkpoint_file_path): + bootstrap_timestamp = datetime.now(timezone.utc) - timedelta( + minutes=bootstrap_lookback_minutes + ) + # Bootstrap with the specified lookback duration + current_download_state = IncrementalDownloadState( + downloads_started_timestamp=bootstrap_timestamp + ) + + # Print the bootstrap time in local time + if force_bootstrap: + logger.echo(f"Bootstrap forced, lookback is {bootstrap_lookback_minutes} minutes") else: - # Load the incremental download checkpoint file - current_download_state = IncrementalDownloadState.from_file( - checkpoint_file_path, logger.echo + logger.echo( + f"Checkpoint not found, lookback is {bootstrap_lookback_minutes} minutes" ) + logger.echo(f"Initializing from: {bootstrap_timestamp.astimezone().isoformat()}") + else: + # Load the incremental download checkpoint file + current_download_state = IncrementalDownloadState.from_file(checkpoint_file_path) - updated_download_state: IncrementalDownloadState = _incremental_output_download( - boto3_session=boto3_session, - farm_id=farm_id, - queue_id=queue_id, - download_state=current_download_state, - path_mapping_rules=path_mapping_rules, - print_function_callback=logger.echo, + # Print the previous download completed time in local time + logger.echo("Checkpoint found") + logger.echo( + f"Continuing from: {current_download_state.downloads_completed_timestamp.astimezone().isoformat()}" ) - # Save the checkpoint file - updated_download_state.save_file( - checkpoint_file_path, - logger.echo, - ) + logger.echo() + + updated_download_state: IncrementalDownloadState = _incremental_output_download( + boto3_session=boto3_session, + farm_id=farm_id, + queue_id=queue_id, + download_state=current_download_state, + print_function_callback=logger.echo, + ) + + # Save the checkpoint file + updated_download_state.save_file(checkpoint_file_path) diff --git a/src/deadline/client/cli/_incremental_download.py b/src/deadline/client/cli/_incremental_download.py index 53a1a831e..e45b7dbab 100644 --- a/src/deadline/client/cli/_incremental_download.py +++ b/src/deadline/client/cli/_incremental_download.py @@ -3,13 +3,165 @@ __all__ = ["_incremental_output_download"] +from datetime import datetime, timedelta, timezone +import difflib +import textwrap + from .. import api -from typing import Optional, Callable +from typing import Any, Callable import boto3 +from ..api._list_jobs_by_filter_expression import _list_jobs_by_filter_expression from ...job_attachments.incremental_downloads.incremental_download_state import ( IncrementalDownloadState, + IncrementalDownloadJob, + _datetimes_to_str, ) -import datetime +from ._common import _cli_object_repr + + +def _get_download_candidate_jobs( + boto3_session: boto3.Session, farm_id: str, queue_id: str, starting_timestamp: datetime +) -> dict[str, dict[str, Any]]: + """ + Uses deadline:SearchJobs queries to get a dict {job_id: job} of download candidates for the queue. + This is a superset of all the jobs that have produced any output for download since + the provided starting_timestamp. + + Args: + boto3_session: The boto3 session for calling AWS APIs. + farm_id: The farm ID. + queue_id: The queue ID in the farm. + starting_timestamp: The point in time from which to look for new download outputs. + + Returns: + A dictionary mapping job id to the job as returned by the deadline.search_jobs API. + """ + # Construct the full set of jobs that may have new available downloads. + # - Any active job (job with taskRunStatus in READY, ASSIGNED, + # STARTING, SCHEDULED, or RUNNING), that has at least one SUCCEEDED task. + download_candidate_jobs_dict = { + job["jobId"]: job + for job in _list_jobs_by_filter_expression( + boto3_session, + farm_id, + queue_id, + filter_expression={ + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": status_value, + }, + } + # Maximum of 3 filters are permitted, so the 5 statuses are split + for status_value in ["READY", "ASSIGNED", "STARTING"] + ], + "operator": "OR", + }, + ) + } + download_candidate_jobs_dict.update( + { + job["jobId"]: job + for job in _list_jobs_by_filter_expression( + boto3_session, + farm_id, + queue_id, + filter_expression={ + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": status_value, + }, + } + for status_value in ["SCHEDULED", "RUNNING"] + ], + "operator": "OR", + }, + ) + } + ) + print(f"DEBUG: Got {len(download_candidate_jobs_dict)} active jobs") + download_candidate_jobs_dict = { + job_id: _datetimes_to_str(job) + for job_id, job in download_candidate_jobs_dict.items() + if job["taskRunStatusCounts"]["SUCCEEDED"] > 0 + } + print( + f"DEBUG: Filtered down to {len(download_candidate_jobs_dict)} active jobs based on SUCCEEDED task filter" + ) + + # - Any recently ended job (job went from active to terminal with a taskRunStatus + # in SUSPENDED, CANCELED, FAILED, SUCCEEDED, NOT_COMPATIBLE), that has at least + # one SUCCEEDED task. The endedAt timestamp field gets updated when that occurs. + # TODO: Enable this when filtering by ENDED_AT works. + # download_candidate_jobs_dict.update( + # { + # job["jobId"]: job + # for job in _list_jobs_by_filter_expression( + # boto3_session, + # farm_id, + # queue_id, + # filter_expression={ + # "filters": [ + # { + # "dateTimeFilter": { + # "name": "ENDED_AT", + # "dateTime": starting_timestamp, + # "operator": "GREATER_THAN_EQUAL_TO", + # } + # } + # ], + # "operator": "AND", + # }, + # ) + # } + # ) + # WORKAROUND: Get all jobs with a SUCCEEDED or SUSPENDED task run status, and filter by endedAt client-side. + # We want to download everything that is succeeded or suspended, but not + # FAILED, CANCELED, or NOT_COMPATIBLE. + recently_ended_jobs = _list_jobs_by_filter_expression( + boto3_session, + farm_id, + queue_id, + filter_expression={ + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": status_value, + }, + } + for status_value in ["SUCCEEDED", "SUSPENDED"] + ], + "operator": "OR", + }, + ) + print(f"DEBUG: Got {len(recently_ended_jobs)} succeeded/suspended jobs") + # Jobs that are submitted with a SUSPENDED status will have no "endedAt" field + # Filter to jobs that: + # 1. Have an endedAt field. (jobs submitted as SUSPENDED will not have one) + # 2. Timestamp endedAt is after the timestamp threshold. + # 3. The count of SUCCEEDED tasks is positive. + recently_ended_jobs = [ + job + for job in recently_ended_jobs + if "endedAt" in job + and job["endedAt"] >= starting_timestamp + and job["taskRunStatusCounts"]["SUCCEEDED"] > 0 + ] + print( + f"DEBUG: Filtered down to {len(recently_ended_jobs)} succeeded/suspended jobs based on endedAt timestamp threshold and SUCCEEDED task filter" + ) + download_candidate_jobs_dict.update( + {job["jobId"]: _datetimes_to_str(job) for job in recently_ended_jobs} + ) + + return download_candidate_jobs_dict @api.record_function_latency_telemetry_event() @@ -18,7 +170,6 @@ def _incremental_output_download( queue_id: str, boto3_session: boto3.Session, download_state: IncrementalDownloadState, - path_mapping_rules: Optional[str] = None, print_function_callback: Callable[[str], None] = lambda msg: None, ) -> IncrementalDownloadState: """ @@ -29,7 +180,6 @@ def _incremental_output_download( :param farm_id: farm id for the output download :param queue_id: queue for scoping output download :param download_state: Download state for starting the incremental download - :param path_mapping_rules: path mapping rules for cross OS path mapping :param boto3_session: boto3 session :param print_function_callback: Callback to print messages produced in this function. Used in the CLI to print to stdout using click.echo. By default, ignores messages. @@ -38,14 +188,159 @@ def _incremental_output_download( # When this function is done, we will be confident that downloads are complete up to # this timestamp. We subtract a duration from now() that gives a generous amount of # time for the deadline:SearchJobs API's eventual consistency to converge. - new_completed_timestamp = ( - datetime.datetime.now(datetime.timezone.utc) - - download_state.eventual_consistency_max_duration + new_completed_timestamp = datetime.now(timezone.utc) - timedelta( + seconds=download_state.eventual_consistency_max_seconds + ) + + print_function_callback( + f"Checkpoint state tracks {len(download_state.jobs)} jobs. Retrieving updated data from Deadline Cloud..." + ) + + deadline = boto3_session.client("deadline") + + download_candidate_jobs = _get_download_candidate_jobs( + boto3_session, farm_id, queue_id, download_state.downloads_completed_timestamp + ) + download_candidate_job_ids = set(download_candidate_jobs.keys()) + + in_progress_jobs = {job.job_id: job.job for job in download_state.jobs} + in_progress_job_ids = set(in_progress_jobs.keys()) + + print_function_callback( + f"Comparing with {len(download_candidate_jobs)} download candidate jobs..." ) + updated_in_progress_jobs = [] + + dropped_job_ids = in_progress_job_ids.difference(download_candidate_job_ids) + updated_job_ids = in_progress_job_ids.intersection(download_candidate_job_ids) + new_job_ids = download_candidate_job_ids.difference(in_progress_job_ids) + # The following sets get populated while analyzing the jobs + unchanged_job_ids = set() + attachments_free_job_ids = set() + + # Copy the job attachments manifest data from the checkpoint to the new job objects. This data is not returned + # by deadline:SearchJobs, so we need to call deadline:GetJob on every job to retrieve it. The manifests on a job + # don't change, so after the call to deadline:GetJob we can cache it indefinitely. + for job_id in updated_job_ids: + ip_job = in_progress_jobs[job_id] + dc_job = download_candidate_jobs[job_id] + + if ip_job["attachments"] is None: + # Carry over the minimal placeholder identifying the job as not using job attachments + download_candidate_jobs[job_id] = ip_job + attachments_free_job_ids.add(job_id) + updated_in_progress_jobs.append(ip_job) + else: + # Carry over the attachments manifest metadata + dc_job["attachments"] = ip_job["attachments"] + updated_job_ids.difference_update(attachments_free_job_ids) + + # Prune jobs that we are certain have no changes by looking at its task status counts. A job is unchanged if both of these are true: + # 1. job["taskRunStatusCounts"]["SUCCEEDED"] stayed the same. Except for when a task is requeued, this count will always increase + # when new output is available to download. If a task is requeued, this value could drop and then return to the same value + # when new output is generated. + # 2. job["updatedAt"] stayed the same. If a task is requeued, this timestamp will be updated, so this catches anything missed + # by the first check. This timestamp can also change for other reasons, and the later checks that look at session actions + # directly will find those cases. + for job_id in updated_job_ids: + ip_job = in_progress_jobs[job_id] + dc_job = download_candidate_jobs[job_id] + + if ip_job["taskRunStatusCounts"]["SUCCEEDED"] == dc_job["taskRunStatusCounts"][ + "SUCCEEDED" + ] and ip_job.get("updatedAt") == dc_job.get("updatedAt"): + print_function_callback(f"UNCHANGED Job: {dc_job['name']} ({job_id})") + unchanged_job_ids.add(job_id) + updated_in_progress_jobs.append(dc_job) + updated_job_ids.difference_update(unchanged_job_ids) + + # First make note of any jobs that were dropped, for example if they were canceled or they failed + for job_id in dropped_job_ids: + ip_job = in_progress_jobs[job_id] + + print_function_callback(f"DROPPED Job: {ip_job['name']} ({job_id})") + if ip_job["attachments"] is None: + print_function_callback(" Job without job attachments no longer needs tracking") + else: + print_function_callback( + " Job is not a download candidate anymore (likely canceled or failed)" + ) + + # Process all the jobs that have updates + for job_id in updated_job_ids: + ip_job = in_progress_jobs[job_id] + dc_job = download_candidate_jobs[job_id] + + print_function_callback(f"EXISTING Job: {ip_job['name']} ({job_id})") + print_function_callback( + f" Succeeded tasks (before): {ip_job['taskRunStatusCounts']['SUCCEEDED']} / {sum(value for _, value in ip_job['taskRunStatusCounts'].items())}" + ) + print_function_callback( + f" Succeeded tasks (now) : {dc_job['taskRunStatusCounts']['SUCCEEDED']} / {sum(value for _, value in dc_job['taskRunStatusCounts'].items())}" + ) + + # Use the CLI output format to produce a diff of the changes + ip_job_repr: list[str] = _cli_object_repr(ip_job).splitlines() + dc_job_repr: list[str] = _cli_object_repr(dc_job).splitlines() + + for line in difflib.unified_diff( + ip_job_repr, + dc_job_repr, + fromfile="Previous update", + tofile="Current update", + lineterm="", + ): + print_function_callback(f" {line}") + + updated_in_progress_jobs.append(dc_job) + + # Process all the jobs that are new + for job_id in new_job_ids: + dc_job = download_candidate_jobs[job_id] + + # Call deadline:GetJob to retrieve attachments manifest information + job = deadline.get_job(jobId=job_id, queueId=queue_id, farmId=farm_id) + dc_job["attachments"] = job.get("attachments") + + print_function_callback(f"NEW Job: {dc_job['name']} ({job_id})") + print_function_callback( + f" Succeeded tasks: {dc_job['taskRunStatusCounts']['SUCCEEDED']} / {sum(value for _, value in dc_job['taskRunStatusCounts'].items())}" + ) + if dc_job["attachments"] is None: + # If the job does not use job attachments, save a minimal placeholder to avoid + # repeatedly calling deadline:GetJob. + download_candidate_jobs[job_id] = dc_job = { + "jobId": job_id, + "name": dc_job["name"], + "attachments": None, + } + attachments_free_job_ids.add(job_id) + print_function_callback(" Job does not use job attachments.") + else: + print_function_callback(textwrap.indent(_cli_object_repr(dc_job["attachments"]), " ")) + + updated_in_progress_jobs.append(dc_job) + new_job_ids.difference_update(attachments_free_job_ids) + + download_state.jobs = [IncrementalDownloadJob(job) for job in updated_in_progress_jobs] + + print_function_callback("") + print_function_callback(f"Identified {len(download_state.jobs)}") + # TODO the rest of the incremental output download # Update the timestamp in the state object to reflect the downloads that were completed - download_state.downloads_completed_timestamp = new_completed_timestamp + download_state.downloads_completed_timestamp = max( + download_state.downloads_started_timestamp, new_completed_timestamp + ) + + print_function_callback("") + print_function_callback("Summary of incremental output download:") + print_function_callback(f" Jobs without job attachments: {len(attachments_free_job_ids)}") + print_function_callback(f" Jobs unchanged: {len(unchanged_job_ids)}") + print_function_callback(f" Jobs added: {len(new_job_ids)}") + print_function_callback(f" Jobs updated: {len(updated_job_ids)}") + print_function_callback(f" Jobs dropped: {len(dropped_job_ids)}") return download_state diff --git a/src/deadline/client/cli/_pid_file_lock.py b/src/deadline/client/cli/_pid_file_lock.py index d2ca4d039..fb7095a53 100644 --- a/src/deadline/client/cli/_pid_file_lock.py +++ b/src/deadline/client/cli/_pid_file_lock.py @@ -3,7 +3,6 @@ import os import sys import psutil -from typing import Callable from contextlib import contextmanager from ..exceptions import DeadlineOperationError import logging @@ -46,7 +45,6 @@ def _claim_pid_lock_with_rename(tmp_file_name: str, pid_file_path: str) -> bool: def _try_acquire_pid_lock( pid_file_path: str, operation_name: str = "the operation", - print_function_callback: Callable[[str], None] = print, ): """ Checks if the specified pid lock file exists and executes as per the following: @@ -60,12 +58,8 @@ def _try_acquire_pid_lock( to handle concurrent processes making the same check. This will not work if primitive file locks are disabled. :param pid_file_full_path: full path of the pid lock file - :param print_function_callback: Callback to print messages produced in this function. - Used in the CLI to print to stdout using click.echo. By default, ignores messages. :return: boolean, True if pid lock was obtained successfully, throws an exception otherwise """ - print_function_callback(f"Checking if another download is in progress at {pid_file_path}") - current_process_id: int = os.getpid() # Generate a tmp file for writing the pid file as a whole and prevent corrupt data @@ -108,7 +102,7 @@ def _try_acquire_pid_lock( ) except FileNotFoundError: # In this case, the pid lock is free to acquire - print_function_callback(f"Pid lock file does not exist at {pid_file_path}") + pass # After possibly cleaning up a stale lock, try claiming it again if _claim_pid_lock_with_rename(tmp_file_path, pid_file_path): @@ -127,17 +121,13 @@ def _try_acquire_pid_lock( logger.warning(f"Failed to clean up pid lock temporary file: {e}") -def _release_pid_lock(pid_file_path: str, print_function_callback: Callable[[str], None] = print): +def _release_pid_lock(pid_file_path: str): """ Releases the pid lock by deleting the pid file. :param pid_file_full_path: full path of the pid lock file - :param print_function_callback: print_function_callback (Callable str -> None, optional): Callback to print messages produced in this function. - Used in the CLI to print to stdout using click.echo. By default, ignores messages. :return: boolean, True if pid lock released successfully """ - print_function_callback(f"Releasing pid lock at {pid_file_path}") - # Get the current process's id to obtain lock current_process_id: int = os.getpid() @@ -163,9 +153,6 @@ def _release_pid_lock(pid_file_path: str, print_function_callback: Callable[[str if lock_holder_pid == str(current_process_id): # Process pid from file is same as current process pid - release pid lock os.remove(pid_file_path) - print_function_callback( - f"Process with pid {lock_holder_pid} is the current process. Deleted pid lock file." - ) else: # Process pid from file is different from current process pid. logger.warning( @@ -177,7 +164,6 @@ def _release_pid_lock(pid_file_path: str, print_function_callback: Callable[[str def PidFileLock( lock_file_path: str, operation_name: str = "the operation", - print_function_callback: Callable[[str], None] = print, ): """ A context manager for holding a pid (process id) lock file during the scope of a 'with' statement. @@ -192,11 +178,9 @@ def PidFileLock( Args: lock_file_path (str): The file system path of the PID lock file. operation_name (Optional[str]): The name of the operation being performed in the lock, used for error messages. - print_function_callback (Optional[Callable]): A function that accepts a string to print debugging info. - """ - _try_acquire_pid_lock(lock_file_path, operation_name, print_function_callback) + _try_acquire_pid_lock(lock_file_path, operation_name) try: yield None finally: - _release_pid_lock(lock_file_path, print_function_callback) + _release_pid_lock(lock_file_path) diff --git a/src/deadline/client/config/config_file.py b/src/deadline/client/config/config_file.py index 57a281172..c9dc92681 100644 --- a/src/deadline/client/config/config_file.py +++ b/src/deadline/client/config/config_file.py @@ -11,6 +11,7 @@ "clear_setting", "get_best_profile_for_farm", "str2bool", + "DEFAULT_QUEUE_INCREMENTAL_DOWNLOAD_DIR", ] import getpass @@ -37,9 +38,10 @@ f"https://deadline.{boto3.Session().region_name}.amazonaws.com", ) -# The default directory within which to save the history of created jobs. +# Default directories used by various Deadline CLI commands DEFAULT_JOB_HISTORY_DIR = os.path.join("~", ".deadline", "job_history", "{aws_profile_name}") DEFAULT_CACHE_DIR = os.path.join("~", ".deadline", "cache") +DEFAULT_QUEUE_INCREMENTAL_DOWNLOAD_DIR = os.path.join("~", ".deadline", "incremental_download") _TRUE_VALUES = {"yes", "on", "true", "1"} _FALSE_VALUES = {"no", "off", "false", "0"} diff --git a/src/deadline/client/job_bundle/_yaml.py b/src/deadline/client/job_bundle/_yaml.py index da2839bf4..a788e1360 100644 --- a/src/deadline/client/job_bundle/_yaml.py +++ b/src/deadline/client/job_bundle/_yaml.py @@ -72,7 +72,7 @@ def __init__( Resolver.__init__(self) -def deadline_yaml_dump(data, stream=None, **kwds): +def deadline_yaml_dump(data, stream=None, **kwds) -> str: """ Works like pyyaml's safe_dump, but saves multi-line strings with the "|" style and defaults to sort_keys=False. diff --git a/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py b/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py index 6d0198e3f..3b38231cc 100644 --- a/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py +++ b/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py @@ -4,29 +4,48 @@ import json import os -from datetime import datetime, timedelta -from typing import Any, Optional, Callable +from datetime import datetime +from typing import Any, Optional import tempfile +# This as an upper bound to allow for eventual consistency into the materialized view that +# the deadline:SearchJobs API is based on. It's taken from numbers seen in heavy load testing, +# increased by a generous amount. +EVENTUAL_CONSISTENCY_MAX_SECONDS = 120 + + +def _datetimes_to_str(obj: Any) -> Any: + """Recursively applies the isoformat() function to all datetimes in the object""" + if isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, list): + return [_datetimes_to_str(item) for item in obj] + elif isinstance(obj, dict): + return {key: _datetimes_to_str(value) for key, value in obj.items()} + else: + return obj + class IncrementalDownloadJob: """ Model representing a job in the download progress state. """ - _required_dict_fields = ["jobId", "sessions"] + _required_dict_fields = ["jobId", "job", "sessions"] job_id: str + job: dict[str, Any] sessions: list - def __init__(self, job_id: str, sessions: Optional[list] = None): + def __init__(self, job: dict[str, Any], sessions: Optional[list] = None): """ Initialize a Job instance. Args: - job_id (str): The ID of the job + job (dict[str, Any]): The job as returned by boto3 from deadline:SearchJobs. sessions (list): List of JobSession objects """ - self.job_id = job_id + self.job_id = job["jobId"] + self.job = _datetimes_to_str(job) self.sessions = sessions or [] @classmethod @@ -45,7 +64,7 @@ def from_dict(cls, data: dict[str, Any]): raise ValueError(f"Input is missing required fields: {missing_fields}") sessions = data["sessions"] - return cls(job_id=data["jobId"], sessions=sessions) + return cls(job=data["job"], sessions=sessions) def to_dict(self) -> dict[str, Any]: """ @@ -53,23 +72,24 @@ def to_dict(self) -> dict[str, Any]: Returns: dict: Dictionary representation of the job """ - return {"jobId": self.job_id, "sessions": self.sessions} + return {"jobId": self.job_id, "job": self.job, "sessions": self.sessions} class IncrementalDownloadState: """ Model for tracking all the job attachments downloads to perform for a queue over time. - A new download becomes available whenever a TASK_RUN session action completes. The state - includes some informational fields that are not strictly necessary, to help make the data + A new download becomes available whenever a TASK_RUN session action completes. + + This class includes some informational fields that are not strictly necessary, to help make the data on disk easier to understand on inspection. * https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_GetSessionAction.html#API_GetSessionAction_ResponseSyntax * https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_SessionActionDefinition.html - We track state at three levels, and use the resource state at one level to prune queries at lower levels when we can: + The Deadline Cloud APIs do not provide direct access to a stream of completed session actions, so we reconstruct such + a stream by tracking state at three levels. Where possible, we use the resource state at one level to prune queries at lower levels: - 1. Job - The jobs list contains every job that entered an active status within the time interval [downloads_started_timestamp, downloads_completed_timestamp], - where it can generate new task runs, and has not exited as complete or failed. + 1. Job - The jobs list contains every job that is active and that we have downloaded output from in a previous incremental download command. 2. Session - Each session of a job represents a single worker running a sequence of tasks from the job. The sessions list in a job contains all the sessions that are active and from which we have downloaded some output. 3. SessionAction - Session actions have sequential IDs, so for each session we track the highest index of session action @@ -121,9 +141,9 @@ class IncrementalDownloadState: downloads_started_timestamp: datetime """The timestamp of when the download state was bootstrapped.""" - downloads_completed_timestamp: Optional[datetime] + downloads_completed_timestamp: datetime """The timestamp up to which we are confident downloads are complete.""" - eventual_consistency_max_duration: timedelta = timedelta(seconds=120) + eventual_consistency_max_seconds: int = EVENTUAL_CONSISTENCY_MAX_SECONDS """The duration for deadline:SearchJobs query overlap, to account for eventual consistency.""" jobs: list[IncrementalDownloadJob] @@ -134,7 +154,7 @@ def __init__( downloads_started_timestamp: datetime, downloads_completed_timestamp: Optional[datetime] = None, jobs: Optional[list] = None, - eventual_consistency_max_duration: Optional[timedelta] = None, + eventual_consistency_max_seconds: Optional[int] = None, ): """ Initialize a IncrementalDownloadState instance. To bootstrap the state, construct with only the downloads_started_timestamp. @@ -144,12 +164,15 @@ def __init__( downloads_completed_timestamp (datetime): The timestamp up to which we are confident downloads are complete. jobs (list[IncrementalDownloadJob]): The list of jobs that entered 'active' status between downloads_started_timestamp and downloads_completed_timestamp, and are not completed. - eventual_consistency_max_duration (Optional[timedelta]): The duration for deadline:SearchJobs query overlap, to account for eventual consistency. + eventual_consistency_max_seconds (Optional[int]): The duration, in seconds, for deadline:SearchJobs query overlap, to account for eventual consistency. """ self.downloads_started_timestamp = downloads_started_timestamp - self.downloads_completed_timestamp = downloads_completed_timestamp - if eventual_consistency_max_duration: - self.eventual_consistency_max_duration = eventual_consistency_max_duration + if downloads_completed_timestamp is not None: + self.downloads_completed_timestamp = downloads_completed_timestamp + else: + self.downloads_completed_timestamp = downloads_started_timestamp + if eventual_consistency_max_seconds: + self.eventual_consistency_max_seconds = eventual_consistency_max_seconds self.jobs = jobs or [] @classmethod @@ -172,10 +195,8 @@ def from_dict(cls, data): downloads_completed_timestamp=datetime.fromisoformat( data["downloadsCompletedTimestamp"] ), - eventual_consistency_max_duration=timedelta( - seconds=int(data["eventualConsistencyMaxSeconds"]) - ), - jobs=data["jobs"], + eventual_consistency_max_seconds=int(data["eventualConsistencyMaxSeconds"]), + jobs=[IncrementalDownloadJob.from_dict(job) for job in data["jobs"]], ) def to_dict(self): @@ -186,7 +207,7 @@ def to_dict(self): """ result = { "downloadsStartedTimestamp": self.downloads_started_timestamp.isoformat(), - "eventualConsistencyMaxSeconds": self.eventual_consistency_max_duration.total_seconds(), + "eventualConsistencyMaxSeconds": self.eventual_consistency_max_seconds, "jobs": [job.to_dict() for job in self.jobs], } if self.downloads_completed_timestamp is not None: @@ -198,7 +219,6 @@ def to_dict(self): def from_file( cls, file_path: str, - print_function_callback: Callable[[str], None] = print, ) -> "IncrementalDownloadState": """ Loads progress from state file saved at saved_progress_checkpoint_full_path @@ -213,15 +233,11 @@ def from_file( state_data = json.load(file) download_state = IncrementalDownloadState.from_dict(state_data) - print_function_callback( - f"Loaded existing state file from download progress checkpoint location {file_path}" - ) return download_state def save_file( self, file_path: str, - print_function_callback: Callable[[str], None] = print, ) -> None: """ Save the current download progress to a state file atomically. @@ -249,5 +265,3 @@ def save_file( # Atomically replace the target file with the temporary file os.replace(tmpfile.name, file_path) - - print_function_callback(f"Successfully saved state file to {file_path}") diff --git a/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py b/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py index 9a7c71583..0fb31036a 100644 --- a/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py +++ b/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py @@ -14,7 +14,7 @@ ) # Constants for testing -from .mock_search_jobs import create_fake_job_list, mock_search_jobs_for_set +from ..mock_deadline_job_apis import create_fake_job_list, mock_search_jobs_for_set from ..shared_constants import MOCK_FARM_ID, MOCK_QUEUE_ID MOCK_TIMESTAMP = datetime(2023, 1, 2, 3, 4, 5, tzinfo=timezone.utc) diff --git a/test/unit/deadline_client/cli/test_cli_queue.py b/test/unit/deadline_client/cli/test_cli_queue.py index 61222949f..1d2411034 100644 --- a/test/unit/deadline_client/cli/test_cli_queue.py +++ b/test/unit/deadline_client/cli/test_cli_queue.py @@ -14,7 +14,6 @@ from deadline.client.cli import main from ..shared_constants import MOCK_FARM_ID, MOCK_QUEUES_LIST -import os def test_cli_queue_list(fresh_deadline_config): @@ -146,16 +145,3 @@ def test_cli_queue_get(fresh_deadline_config): farmId=MOCK_FARM_ID, queueId=MOCK_QUEUES_LIST[0]["queueId"] ) assert result.exit_code == 0 - - -def test_incremental_download_existence(fresh_deadline_config): - """ - Confirm that the CLI is not available for environment with no ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD variable - """ - - assert not os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") - - runner = CliRunner() - response = runner.invoke(main, ["queue", "incremental-output-download"]) - - assert "Error: No such command 'incremental-output-download'." in response.output diff --git a/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py b/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py index 5c3110922..c9a91a515 100644 --- a/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py +++ b/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py @@ -4,19 +4,26 @@ Tests for the CLI queue incremental output download command. """ -import json import os import pytest from unittest.mock import patch, MagicMock +from datetime import datetime import boto3 from freezegun import freeze_time from click.testing import CliRunner from deadline.client.cli import main +import deadline.client +import psutil + +from ..shared_constants import MOCK_FARM_ID, MOCK_QUEUE_ID, MOCK_JOB_ID +from ..mock_deadline_job_apis import ( + mock_search_jobs_for_set, + create_fake_job_list, + mock_get_job_for_set, +) - -MOCK_FARM_ID = "farm-0123456789abcdef" -MOCK_QUEUE_ID = "queue-0123456789abcdef" +ISO_FREEZE_TIME = "2025-05-26 12:00:00+00:00" # Fixtures for shared resources @@ -31,21 +38,12 @@ def checkpoint_dir(tmp_path_factory): @pytest.fixture(scope="module") def boto3_session(): """Create a mock boto3 session for all tests to use.""" - return MagicMock(spec=boto3.Session) - - -@pytest.fixture -def progress_file(checkpoint_dir): - """Create a progress file path for tests that need it. - - This has function scope so each test gets a fresh file. - """ - progress_file_path = os.path.join(checkpoint_dir, f"{MOCK_QUEUE_ID}_download_progress.json") - # File will be created by the test that needs it - yield progress_file_path - # Clean up after each test - if os.path.exists(progress_file_path): - os.remove(progress_file_path) + mock_session = MagicMock(spec=boto3.Session) + mock_session.client().get_queue.return_value = {"displayName": "Mock Queue"} + with patch.object(boto3, "Session", return_value=mock_session), patch.object( + deadline.client.api, "get_deadline_cloud_library_telemetry_client" + ): + yield mock_session @pytest.fixture @@ -59,361 +57,178 @@ def pid_lock_file(checkpoint_dir): @pytest.fixture -def path_mapping_rules_file(tmp_path_factory): - """Create a path mapping rules file for tests that need it.""" - # Create in a separate directory to avoid conflicts - rules_dir = tmp_path_factory.mktemp("rules") - rules_file_path = os.path.join(str(rules_dir), "rules.json") - yield rules_file_path - # Clean up - if os.path.exists(rules_file_path): - os.remove(rules_file_path) +def with_incremental_download_enabled(): + """Set the ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD environment variable to 1 for testing the incremental download command.""" + os.environ["ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD"] = "1" + yield None + del os.environ["ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD"] -@pytest.fixture -def sample_progress_data(): - """Sample progress data for testing.""" - return { - "lastLookbackTime": "2025-04-04T05:30:00", - "jobs": [ - { - "jobId": "job-1234353453443", - "sessions": [ - { - "sessionId": "session-1324324354354", - "sessionLifecycleStatus": "SUCCESSFUL", - "lastDownloadedSessActionId": 3, - }, - { - "sessionId": "session-3423435435454", - "sessionLifecycleStatus": "RUNNING", - "lastDownloadedSessActionId": 6, - }, - ], - }, - { - "jobId": "Job-3234324354345", - "sessions": [ - { - "sessionId": "session-4235435434345", - "sessionLifecycleStatus": "FAILED", - "lastDownloadedSessActionId": 3, - } - ], - }, - ], - } - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@freeze_time("2025-05-26 12:00:00+00:00") -@patch("deadline.client.api.get_boto3_session") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_incremental_output_download_success_load_from_progress( - mock_get_boto3_session, checkpoint_dir, progress_file, sample_progress_data -): - """Test successful execution of incremental_output_download with loading progress from state file""" - # Create a real progress file with test data - with open(progress_file, "w") as f: - json.dump(sample_progress_data, f, indent=2) - - # Mock boto3 session - mock_session = MagicMock(spec=boto3.Session) - mock_get_boto3_session.return_value = mock_session - - # Run the CLI command +def test_incremental_output_download_requires_beta_acknowledgement(boto3_session, checkpoint_dir): + # Run the CLI command once to bootstrap the operation runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - ], - ) + with freeze_time(ISO_FREEZE_TIME): + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) # Assert the command executed successfully - assert result.exit_code == 0 + assert result.exit_code == 1, result.output - # Check that the progress file was updated - with open(progress_file, "r") as f: - updated_progress = json.load(f) + assert ( + "The incremental-output-download command is not fully implemented. You must set the environment variable ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD to 1 to acknowledge this." + in result.output + ), result.output - # Verify the lastLookbackTime was updated to the frozen time - assert updated_progress["lastLookbackTime"] == "2025-05-26T12:00:00+00:00" - # Verify the job data was preserved - assert len(updated_progress["jobs"]) == 2 - assert updated_progress["jobs"][0]["jobId"] == "job-1234353453443" - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@freeze_time("2025-05-26 12:00:00+00:00") -@patch("deadline.client.api.get_boto3_session") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -@pytest.mark.parametrize("bootstrap_lookback_in_minutes", [60, None]) -def test_incremental_output_download_success_with_force_bootstrap( - mock_get_boto3_session, checkpoint_dir, progress_file, bootstrap_lookback_in_minutes +def test_incremental_output_download_simple_success( + with_incremental_download_enabled, boto3_session, checkpoint_dir ): - """Test successful execution of incremental_output_download with bootstrapping""" - # Create a file that should be ignored due to force_bootstrap=True - with open(progress_file, "w") as f: - json.dump({"lastLookbackTime": "2025-01-01T00:00:00", "jobs": []}, f) - - # Mock boto3 session - mock_session = MagicMock(spec=boto3.Session) - mock_get_boto3_session.return_value = mock_session + """Test successful execution of incremental_output_download""" + mock_jobs = create_fake_job_list(1) + mock_jobs[0]["name"] = "Mock Job" + mock_jobs[0]["jobId"] = MOCK_JOB_ID + mock_jobs[0]["taskRunStatus"] = "READY" + mock_jobs[0]["taskRunStatusCounts"] = { + "SUCCEEDED": 1, + "READY": 1, + } + mock_jobs[0]["attachments"] = { + "manifests": [ + {"rootPath": "/", "rootPathFormat": "posix", "outputRelativeDirectories": ["."]} + ], + "fileSystem": "VIRTUAL", + } + del mock_jobs[0]["endedAt"] + boto3_session.client().search_jobs = mock_search_jobs_for_set( + MOCK_FARM_ID, MOCK_QUEUE_ID, mock_jobs + ) + boto3_session.client().get_job = mock_get_job_for_set(MOCK_FARM_ID, MOCK_QUEUE_ID, mock_jobs) - # Run the CLI command + # Run the CLI command once to bootstrap the operation runner = CliRunner() - cmd = [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--force-bootstrap", - ] - - if bootstrap_lookback_in_minutes is not None: - cmd.extend(["--bootstrap-lookback-in-minutes", str(bootstrap_lookback_in_minutes)]) - - result = runner.invoke(main, cmd) + with freeze_time(ISO_FREEZE_TIME): + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) # Assert the command executed successfully - assert result.exit_code == 0 - - # Check that the progress file was updated - with open(progress_file, "r") as f: - updated_progress = json.load(f) - - # Verify the lastLookbackTime was updated to the frozen time - assert updated_progress["lastLookbackTime"] == "2025-05-26T12:00:00+00:00" + assert result.exit_code == 0, result.output + + # Assert that the output contained information about the bootstrapping and the mocked resources + assert "Started incremental download for queue: Mock Queue" in result.output, result.output + assert ( + f"Checkpoint: {os.path.join(checkpoint_dir, MOCK_QUEUE_ID + '_download_checkpoint.json')}" + in result.output + ), result.output + assert "Checkpoint not found, lookback is 0.0 minutes" in result.output, result.output + # Need to convert the freeze time to the local time zone for this print assertion + assert ( + f"Initializing from: {datetime.fromisoformat(ISO_FREEZE_TIME).astimezone().isoformat()}" + in result.output + ), result.output + assert f"NEW Job: Mock Job ({MOCK_JOB_ID})" in result.output, result.output + assert "Succeeded tasks: 1 / 2" in result.output, result.output + assert "Jobs added: 1" in result.output, result.output + + # Edit the mock job to complete the task + mock_jobs[0]["taskRunStatus"] = "SUCCEEDED" + mock_jobs[0]["taskRunStatusCounts"] = { + "SUCCEEDED": 2, + "READY": 0, + } + mock_jobs[0]["endedAt"] = datetime.fromisoformat(ISO_FREEZE_TIME) + + # Run the CLI command again to "complete" the download that was started + with freeze_time(ISO_FREEZE_TIME): + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) - # Verify the jobs list is empty (as it would be with a fresh bootstrap) - assert updated_progress["jobs"] == [] + # Assert the command executed successfully + assert result.exit_code == 0, result.output + + # Assert that the output contained information about loading the checkpoint and the mocked resources + assert "Started incremental download for queue: Mock Queue" in result.output, result.output + assert ( + f"Checkpoint: {os.path.join(checkpoint_dir, MOCK_QUEUE_ID + '_download_checkpoint.json')}" + in result.output + ), result.output + assert "Checkpoint found" in result.output, result.output + # Need to convert the freeze time to the local time zone for this print assertion + assert ( + f"Continuing from: {datetime.fromisoformat(ISO_FREEZE_TIME).astimezone().isoformat()}" + in result.output + ), result.output + assert f"EXISTING Job: Mock Job ({MOCK_JOB_ID})" in result.output, result.output + assert "Succeeded tasks (before): 1 / 2" in result.output, result.output + assert "Succeeded tasks (now) : 2 / 2" in result.output, result.output + assert "Jobs updated: 1" in result.output, result.output -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch("psutil.pid_exists") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) def test_incremental_output_download_pid_lock_already_held_error( - mock_pid_exists, checkpoint_dir, pid_lock_file + with_incremental_download_enabled, boto3_session, checkpoint_dir, pid_lock_file ): """Test incremental_output_download when PidLockAlreadyHeld is raised""" # Write a fake PID to the file with open(pid_lock_file, "w") as f: - f.write("12345") # Use a fake PID - - # Make psutil.pid_exists return True to simulate the process is running - mock_pid_exists.return_value = True + f.write("12345678") # Use a fake PID # Run the CLI command runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - ], - ) - - # Assert the command executed successfully but with a message about another download in progress - assert result.exit_code == 0 - assert f"Another download is in progress at {checkpoint_dir}" in result.output + with patch.object(psutil, "pid_exists") as mock_pid_exists: + # Make psutil.pid_exists return True to simulate the process is running + mock_pid_exists.return_value = True + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) + + # Assert the command did not execute successfully and wrote a message about another download in progress + assert result.exit_code == 1, result.output + assert ( + f"Unable to perform incremental output download as process with pid 12345678 already holds the lock {os.path.join(checkpoint_dir, MOCK_QUEUE_ID + '_incremental_output_download.pid')}" + in result.output + ), result.output # Verify the PID file still exists since we're simulating another process holding the lock assert os.path.exists(pid_lock_file) - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_success(checkpoint_dir): - """Test successful validation of file inputs""" - # Run the CLI command with a valid directory - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - ], - ) - - # The command should execute - assert result.exit_code == 0 - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "1"}) -def test_validate_file_inputs_invalid_directory(checkpoint_dir): - """Test validation when directory is invalid""" - # Create a path to a non-existent directory - nonexistent_dir = os.path.join(checkpoint_dir, "nonexistent_directory") - - # Run the CLI command with an invalid directory - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - nonexistent_dir, - ], - ) - - # The command should execute but report the validation error - assert result.exit_code == 0 - assert "Download failed" in result.output - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_with_mapping_rules_success(checkpoint_dir, path_mapping_rules_file): - """Test successful validation with path mapping rules""" - # Create the rules file with valid content - with open(path_mapping_rules_file, "w") as f: - f.write('{"rules": []}') - - # Run the CLI command with valid rules file - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--path-mapping-rules", - path_mapping_rules_file, - ], - ) - - # The command should execute without validation errors - assert result.exit_code == 0 - assert "Download failed" not in result.output - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_mapping_rules_not_exist(checkpoint_dir): - """Test validation when mapping rules file doesn't exist""" - # Create a path to a non-existent rules file - nonexistent_rules = os.path.join(checkpoint_dir, "nonexistent_rules.json") - - # Run the CLI command with non-existent rules file - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--path-mapping-rules", - nonexistent_rules, - ], - ) - - # The command should execute but report the validation error - assert result.exit_code == 0 - assert "Download failed" in result.output - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch("os.access") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_mapping_rules_not_readable( - mock_access, checkpoint_dir, path_mapping_rules_file -): - """Test validation when mapping rules file is not readable""" - # Create the rules file - with open(path_mapping_rules_file, "w") as f: - f.write('{"rules": []}') - - # Mock os.access to simulate a non-readable rules file - def access_side_effect(path, mode): - if path == path_mapping_rules_file and mode == os.R_OK: - return False - return True - - mock_access.side_effect = access_side_effect - - # Run the CLI command with non-readable rules file - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--path-mapping-rules", - path_mapping_rules_file, - ], - ) - - # The command should execute but report the validation error - assert result.exit_code == 0 - assert "Download failed" in result.output diff --git a/test/unit/deadline_client/test_pid_file_lock.py b/test/unit/deadline_client/cli/test_pid_file_lock.py similarity index 100% rename from test/unit/deadline_client/test_pid_file_lock.py rename to test/unit/deadline_client/cli/test_pid_file_lock.py diff --git a/test/unit/deadline_client/api/mock_search_jobs.py b/test/unit/deadline_client/mock_deadline_job_apis.py similarity index 85% rename from test/unit/deadline_client/api/mock_search_jobs.py rename to test/unit/deadline_client/mock_deadline_job_apis.py index c27dde1b5..f1b959763 100644 --- a/test/unit/deadline_client/api/mock_search_jobs.py +++ b/test/unit/deadline_client/mock_deadline_job_apis.py @@ -1,8 +1,8 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. """ -This file implements a partial re-implementation of the deadline:SearchJobs API, so that -we can unit tests queries that depend on its behavior in more complex ways. +This file implements a partial re-implementation of the deadline:SearchJobs and deadline:GetJob APIs, +so that we can unit tests queries that depend on its behavior in more complex ways. The function create_fake_job_list uses a random number generator to produce a list of job dictionaries. The mock_search_jobs_for_set takes such a list of jobs, and returns @@ -17,7 +17,9 @@ from typing import Any, Callable, Optional import uuid -from ..testing_utilities import snake_to_camel +import botocore.exceptions + +from .testing_utilities import snake_to_camel __all__ = ["create_fake_job_list", "mock_search_jobs_for_set"] @@ -123,9 +125,6 @@ def mock_search_jobs_for_set(farmIdForJobs, queueIdForJobs, jobs): """Returns a fake "search_jobs" API that emulates a subset of Deadline Cloud's SearchJobs on the provided set of jobs. - These fake jobs only have the timestamp fields. If we decide to generalize this - function for more cases, we could move this to a conftest.py and extend it. - See https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_SearchJobs.html """ @@ -150,8 +149,15 @@ def _fake_search_jobs( result_jobs = [j for j in result_jobs if filter(j)] # Construct the API response nextItemOffset = min(len(result_jobs), itemOffset + pageSize) + response_jobs = deepcopy(result_jobs[itemOffset:nextItemOffset]) + + # Remove all "attachments" properties from the response, they are not returned by deadline:SearchJobs + for job in response_jobs: + if "attachments" in job: + del job["attachments"] + response = { - "jobs": deepcopy(result_jobs[itemOffset:nextItemOffset]), + "jobs": response_jobs, "totalResults": len(result_jobs), } if nextItemOffset < len(result_jobs): @@ -162,6 +168,28 @@ def _fake_search_jobs( return _fake_search_jobs +def mock_get_job_for_set(farmIdForJobs, queueIdForJobs, jobs): + """Returns a fake "get_job" API that emulates a subset of Deadline Cloud's + GetJob on the provided set of jobs. + + See https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_GetJob.html + """ + + def _fake_get_job(farmId, queueId, jobId): + assert farmId == farmIdForJobs + assert queueId == queueIdForJobs + + matching_jobs = [job for job in jobs if job["jobId"] == jobId] + + if matching_jobs: + return matching_jobs[0] + else: + error_class = botocore.exceptions.from_code(404) + raise error_class(f"Resource of type job with id {jobId} does not exist.", "GetJob") + + return _fake_get_job + + def create_fake_job_list( job_count: int, timestamp_min: Optional[datetime] = None, diff --git a/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py b/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py index 543e7c1c9..e2772159a 100644 --- a/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py +++ b/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py @@ -9,8 +9,9 @@ from deadline.job_attachments.incremental_downloads.incremental_download_state import ( IncrementalDownloadJob, IncrementalDownloadState, + EVENTUAL_CONSISTENCY_MAX_SECONDS, ) -from datetime import datetime, timedelta +from datetime import datetime class TestIncrementalDownloadState: @@ -47,10 +48,10 @@ def sample_state_data(self): Fixture to provide sample state data. """ return { - "downloadsStartedTimestamp": "2023-01-01T00:00:00", - "downloadsCompletedTimestamp": "2023-01-02T00:00:00", - "eventualConsistencyMaxSeconds": 120, - "jobs": [], + "downloadsStartedTimestamp": "2023-01-01T00:00:00+00:00", + "downloadsCompletedTimestamp": "2023-01-02T00:00:00+00:00", + "eventualConsistencyMaxSeconds": EVENTUAL_CONSISTENCY_MAX_SECONDS, + "jobs": [{"jobId": "job-123", "name": "Job 1"}, {"jobId": "job-124", "name": "Job 2"}], } @pytest.fixture @@ -66,15 +67,15 @@ def state_file(self, test_paths, sample_state_data): @pytest.fixture def mock_download_state(self): """ - Fixture to create a sample IncrementalDownloadState. + Fixture to create a sample IncrementalDownloadState. This state matches the sample_state_data fixture. """ - return IncrementalDownloadState.from_dict( - { - "downloadsStartedTimestamp": "2023-01-01T00:00:00", - "downloadsCompletedTimestamp": "2023-01-02T00:00:00", - "eventualConsistencyMaxSeconds": 120, - "jobs": [], - } + return IncrementalDownloadState( + downloads_started_timestamp=datetime.fromisoformat("2023-01-01T00:00:00+00:00"), + downloads_completed_timestamp=datetime.fromisoformat("2023-01-02T00:00:00+00:00"), + jobs=[ + IncrementalDownloadJob({"jobId": "job-123", "name": "Job 1"}), + IncrementalDownloadJob({"jobId": "job-124", "name": "Job 2"}), + ], ) def test_incremental_download_state_init(self): @@ -87,12 +88,12 @@ def test_incremental_download_state_init(self): # Test with minimal bootstrapped construction state = IncrementalDownloadState(bootstrap_time) assert state.downloads_started_timestamp == bootstrap_time - assert state.downloads_completed_timestamp is None - assert state.eventual_consistency_max_duration == timedelta(minutes=2) + assert state.downloads_completed_timestamp == bootstrap_time + assert state.eventual_consistency_max_seconds == 120 assert state.jobs == [] # Test with provided values - jobs = [IncrementalDownloadJob("job-123", [])] + jobs = [IncrementalDownloadJob({"jobId": "job-123"}, [])] state = IncrementalDownloadState( downloads_started_timestamp=bootstrap_time, downloads_completed_timestamp=completed_time,