From 5a136d3e9ec7c1240d53f9b43156b9d4fc81a715 Mon Sep 17 00:00:00 2001 From: Mark <399551+mwiebe@users.noreply.github.com> Date: Thu, 26 Jun 2025 13:15:59 -0700 Subject: [PATCH] feat(experimental): Track jobs list for incremental download over time * Move the eventual consistency max time to a constant * Add the full job state from the SearchJobs responses to the IncrementalDownloadState object. This is to help explore what options we have for tracking/optimizing job state changes. * Use the _list_jobs_by_filter_expression function to collect all the candidate jobs for downloading tasks. * In the incremental download function, perform a diff between the in-progress and new candidate job lists, printing info about changes. * Print a summary of the job updates at the end. * Remove debug printing from the pid file lock implementation and the incremental download state load/save. * Adjust the CLI help output to use the docstring instead of from the decorator. Shorten some of the option names where the context makes it clear. Added a default checkpoint directory for the operation. * Modified how the opt-in environment variable works. Now, the command is always available and showin in the help output, but running it only succeeds if the environment variable is set. * Update the CLI tests to use the mocked search_jobs API, and implemented a mocked get_job API that works the same way. The golden path test now calls the CLI twice, modifying the state of the mocked job between calls, and validates that the output of the CLI command reflects that job's state appropriately. Signed-off-by: Mark <399551+mwiebe@users.noreply.github.com> --- .../api/_list_jobs_by_filter_expression.py | 3 +- src/deadline/client/cli/_common.py | 2 +- .../client/cli/_groups/queue_group.py | 271 +++++---- .../client/cli/_incremental_download.py | 311 ++++++++++- src/deadline/client/cli/_pid_file_lock.py | 24 +- src/deadline/client/config/config_file.py | 4 +- src/deadline/client/job_bundle/_yaml.py | 2 +- .../incremental_download_state.py | 78 +-- .../test_list_jobs_by_filter_expression.py | 2 +- .../deadline_client/cli/test_cli_queue.py | 14 - .../test_cli_queue_incremental_download.py | 515 ++++++------------ .../{ => cli}/test_pid_file_lock.py | 0 ...arch_jobs.py => mock_deadline_job_apis.py} | 42 +- .../test_incremental_download_state.py | 33 +- 14 files changed, 712 insertions(+), 589 deletions(-) rename test/unit/deadline_client/{ => cli}/test_pid_file_lock.py (100%) rename test/unit/deadline_client/{api/mock_search_jobs.py => mock_deadline_job_apis.py} (85%) diff --git a/src/deadline/client/api/_list_jobs_by_filter_expression.py b/src/deadline/client/api/_list_jobs_by_filter_expression.py index c750bfe66..c439be41a 100644 --- a/src/deadline/client/api/_list_jobs_by_filter_expression.py +++ b/src/deadline/client/api/_list_jobs_by_filter_expression.py @@ -60,7 +60,8 @@ def _list_jobs_by_filter_expression( boto3_session (boto3.Session): The boto3 Session for AWS API access. farm_id (str): The Farm ID. queue_id (str): The Queue ID. - filter_expressions (dict[str, Any]): The filter expression to apply to jobs. + filter_expressions (dict[str, Any]): The filter expression to apply to jobs. This is nested one level in a + filter expression provided to deadline:SearchJobs, so cannot include a groupFilter. Returns: The list of all jobs in the queue that satisfy the provided filter expression. Each job is as returned by the deadline:SearchJobs API. diff --git a/src/deadline/client/cli/_common.py b/src/deadline/client/cli/_common.py index 1b8873ae9..bc3152224 100644 --- a/src/deadline/client/cli/_common.py +++ b/src/deadline/client/cli/_common.py @@ -181,7 +181,7 @@ def _fix_multiline_strings(obj: Any) -> Any: return obj -def _cli_object_repr(obj: Any): +def _cli_object_repr(obj: Any) -> str: """ Transforms an API response object into a string, for printing as CLI output. This formats the output as YAML, using the "|"-style diff --git a/src/deadline/client/cli/_groups/queue_group.py b/src/deadline/client/cli/_groups/queue_group.py index 0d96bf7db..ec5105fe5 100644 --- a/src/deadline/client/cli/_groups/queue_group.py +++ b/src/deadline/client/cli/_groups/queue_group.py @@ -29,7 +29,7 @@ ) PID_FILE_NAME = "incremental_output_download.pid" -DOWNLOAD_PROGRESS_FILE_NAME = "download_progress.json" +DOWNLOAD_CHECKPOINT_FILE_NAME = "download_checkpoint.json" @click.group(name="queue") @@ -209,154 +209,151 @@ def queue_get(**args): click.echo(_cli_object_repr(response)) -if os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is not None: - - @cli_queue.command( - name="incremental-output-download", - help="BETA - Download Job Output data incrementally for all jobs running on a queue as session actions finish.\n" - "The command bootstraps once using a bootstrap lookback specified in minutes and\n" - "continues downloading from the last saved progress thereafter until bootstrap is forced.\n" - "[NOTE] This command is still WIP and partially implemented right now", - ) - @click.option("--farm-id", help="The AWS Deadline Cloud Farm to use.") - @click.option("--queue-id", help="The AWS Deadline Cloud Queue to use.") - @click.option("--path-mapping-rules", help="Path to a file with the path mapping rules to use.") - @click.option( - "--json", default=None, is_flag=True, help="Output is printed as JSON for scripting." - ) - @click.option( - "--bootstrap-lookback-in-minutes", - default=0, - type=float, - help="Downloads outputs for job-session-actions that have been completed since these many\n" - "minutes at bootstrap. Default value is 0 minutes.", - ) - @click.option( - "--saved-progress-checkpoint-location", - help="Proceed downloading from previous progress file at this location, if it exists.\n" - "If parameter not provided or file does not exist,\n" - "the download will start from the provided bootstrap lookback in minutes or its default value. \n", - required=True, - ) - @click.option( - "--force-bootstrap", - is_flag=True, - help="Ignores the previous download progress and forces command to start from the bootstrap \n" - "lookback period specified in minutes.\n" - "Default value is False.", - default=False, - ) - @click.option( - "--conflict-resolution", - type=click.Choice( - [ - FileConflictResolution.SKIP.name, - FileConflictResolution.OVERWRITE.name, - FileConflictResolution.CREATE_COPY.name, - ], - case_sensitive=False, - ), - default=FileConflictResolution.OVERWRITE.name, - help="How to handle downloads if an output file already exists:\n" - "CREATE_COPY (default): Download the file with a new name, appending '(1)' to the end\n" - "SKIP: Do not download the file\n" - "OVERWRITE: Download and replace the existing file.\n" - "Default behaviour is to OVERWRITE.", - ) - @_handle_error - def incremental_output_download( - path_mapping_rules: str, - json: bool, - bootstrap_lookback_in_minutes: float, - saved_progress_checkpoint_location: str, - force_bootstrap: bool, - **args, - ): - """ - Download Job Output data incrementally for all jobs running on a queue as session actions finish. - The command bootstraps once using a bootstrap lookback specified in minutes and - continues downloading from the last saved progress thereafter until bootstrap is forced - - :param path_mapping_rules: path mapping rules for cross OS path mapping - :param json: whether output is printed as JSON for scripting - :param bootstrap_lookback_in_minutes: Downloads outputs for job-session-actions that have been completed - since these many minutes at bootstrap. Default value is 0 minutes. - :param saved_progress_checkpoint_location: location of the download progress file - :param force_bootstrap: force bootstrap and ignore current download progress. Default value is False. - :param args: - :return: - """ - logger: ClickLogger = ClickLogger(is_json=json) - logger.echo("processing " + args.__str__()) - - logger.echo("Started incremental download....") - - # Check if download progress location is a valid directory on the os - if not os.path.isdir(saved_progress_checkpoint_location): - raise DeadlineOperationError( - f"Download progress location {saved_progress_checkpoint_location} is not a valid directory" - ) +@cli_queue.command(name="incremental-output-download") +@click.option("--farm-id", help="The AWS Deadline Cloud Farm to use.") +@click.option("--queue-id", help="The AWS Deadline Cloud Queue to use.") +@click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.") +@click.option( + "--bootstrap-lookback-minutes", + default=0, + type=float, + help="Downloads outputs for job-session-actions that have been completed since these many\n" + "minutes at bootstrap. Default value is 0 minutes.", +) +@click.option( + "--checkpoint-dir", + default=config_file.DEFAULT_QUEUE_INCREMENTAL_DOWNLOAD_DIR, + help="Proceed downloading from the previous progress file stored in this directory, if it exists.\n" + "If the file does not exist, the download will initialize using the bootstrap lookback in minutes. \n", +) +@click.option( + "--force-bootstrap", + is_flag=True, + help="Forces command to start from the bootstrap lookback period and overwrite any previous checkpoint.\n" + "Default value is False.", + default=False, +) +@click.option( + "--conflict-resolution", + type=click.Choice( + [ + FileConflictResolution.SKIP.name, + FileConflictResolution.OVERWRITE.name, + FileConflictResolution.CREATE_COPY.name, + ], + case_sensitive=False, + ), + default=FileConflictResolution.OVERWRITE.name, + help="How to handle downloads if an output file already exists:\n" + "CREATE_COPY: Download the file with a new name, appending '(1)' to the end\n" + "SKIP: Do not download the file\n" + "OVERWRITE (default): Download and replace the existing file.\n" + "Default behaviour is to OVERWRITE.", +) +@_handle_error +def incremental_output_download( + json: bool, + bootstrap_lookback_minutes: float, + checkpoint_dir: str, + force_bootstrap: bool, + **args, +): + """ + BETA - Downloads job attachments output incrementally for all jobs in a queue. When run for the + first time or with the --force-bootstrap option, it starts downloading from --bootstrap-lookback-minutes + in the past. When run each subsequent time, it loads the previous checkpoint and continues + where it left off. - # Check that download progress location is writable - if not os.access(saved_progress_checkpoint_location, os.W_OK): - raise DeadlineOperationError( - f"Download progress location {saved_progress_checkpoint_location} exists but is not writable, please provide write permissions" - ) + To try this command, set the ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD environment variable to 1 to acknowledge its + incomplete beta status. - # Get a temporary config object with the standard options handled - config: Optional[ConfigParser] = _apply_cli_options_to_config( - required_options={"farm_id", "queue_id"}, **args + [NOTE] This command is still WIP and partially implemented right now. + """ + if os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") != "1": + raise DeadlineOperationError( + "The incremental-output-download command is not fully implemented. You must set the environment variable ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD to 1 to acknowledge this." ) - # Get the default configs - farm_id = config_file.get_setting("defaults.farm_id", config=config) - queue_id = config_file.get_setting("defaults.queue_id", config=config) - boto3_session: boto3.Session = api.get_boto3_session(config=config) + logger: ClickLogger = ClickLogger(is_json=json) - # Get download progress file name appended by the queue id - a unique progress file exists per queue - download_progress_file_name: str = f"{queue_id}_{DOWNLOAD_PROGRESS_FILE_NAME}" + # Expand '~' to home directory and create the checkpoint directory if necessary + checkpoint_dir = os.path.abspath(os.path.expanduser(checkpoint_dir)) + os.makedirs(checkpoint_dir, exist_ok=True) - # Get saved progress file full path now that we've validated all file inputs are valid - checkpoint_file_path: str = os.path.join( - saved_progress_checkpoint_location, download_progress_file_name + # Check that download progress location is writable + if not os.access(checkpoint_dir, os.W_OK): + raise DeadlineOperationError( + f"Download progress checkpoint directory {checkpoint_dir} exists but is not writable, please provide write permissions" ) - # Perform incremental download while holding a process id lock + # Get a temporary config object with the standard options handled + config: Optional[ConfigParser] = _apply_cli_options_to_config( + required_options={"farm_id", "queue_id"}, **args + ) - pid_lock_file_path: str = os.path.join( - saved_progress_checkpoint_location, f"{queue_id}_{PID_FILE_NAME}" - ) + # Get the default configs + farm_id = config_file.get_setting("defaults.farm_id", config=config) + queue_id = config_file.get_setting("defaults.queue_id", config=config) + boto3_session: boto3.Session = api.get_boto3_session(config=config) - with PidFileLock( - pid_lock_file_path, - operation_name="incremental output download", - print_function_callback=logger.echo, - ): - current_download_state: IncrementalDownloadState - - if force_bootstrap or not os.path.exists(checkpoint_file_path): - # Bootstrap with the specified lookback duration - current_download_state = IncrementalDownloadState( - downloads_started_timestamp=datetime.now(timezone.utc) - - timedelta(minutes=bootstrap_lookback_in_minutes) - ) + # Get download progress file name appended by the queue id - a unique progress file exists per queue + download_checkpoint_file_name: str = f"{queue_id}_{DOWNLOAD_CHECKPOINT_FILE_NAME}" + + # Get saved progress file full path now that we've validated all file inputs are valid + checkpoint_file_path: str = os.path.join(checkpoint_dir, download_checkpoint_file_name) + + deadline = boto3_session.client("deadline") + response = deadline.get_queue(farmId=farm_id, queueId=queue_id) + logger.echo(f"Started incremental download for queue: {response['displayName']}") + logger.echo(f"Checkpoint: {checkpoint_file_path}") + logger.echo() + + # Perform incremental download while holding a process id lock + + pid_lock_file_path: str = os.path.join(checkpoint_dir, f"{queue_id}_{PID_FILE_NAME}") + + with PidFileLock( + pid_lock_file_path, + operation_name="incremental output download", + ): + current_download_state: IncrementalDownloadState + + if force_bootstrap or not os.path.exists(checkpoint_file_path): + bootstrap_timestamp = datetime.now(timezone.utc) - timedelta( + minutes=bootstrap_lookback_minutes + ) + # Bootstrap with the specified lookback duration + current_download_state = IncrementalDownloadState( + downloads_started_timestamp=bootstrap_timestamp + ) + + # Print the bootstrap time in local time + if force_bootstrap: + logger.echo(f"Bootstrap forced, lookback is {bootstrap_lookback_minutes} minutes") else: - # Load the incremental download checkpoint file - current_download_state = IncrementalDownloadState.from_file( - checkpoint_file_path, logger.echo + logger.echo( + f"Checkpoint not found, lookback is {bootstrap_lookback_minutes} minutes" ) + logger.echo(f"Initializing from: {bootstrap_timestamp.astimezone().isoformat()}") + else: + # Load the incremental download checkpoint file + current_download_state = IncrementalDownloadState.from_file(checkpoint_file_path) - updated_download_state: IncrementalDownloadState = _incremental_output_download( - boto3_session=boto3_session, - farm_id=farm_id, - queue_id=queue_id, - download_state=current_download_state, - path_mapping_rules=path_mapping_rules, - print_function_callback=logger.echo, + # Print the previous download completed time in local time + logger.echo("Checkpoint found") + logger.echo( + f"Continuing from: {current_download_state.downloads_completed_timestamp.astimezone().isoformat()}" ) - # Save the checkpoint file - updated_download_state.save_file( - checkpoint_file_path, - logger.echo, - ) + logger.echo() + + updated_download_state: IncrementalDownloadState = _incremental_output_download( + boto3_session=boto3_session, + farm_id=farm_id, + queue_id=queue_id, + download_state=current_download_state, + print_function_callback=logger.echo, + ) + + # Save the checkpoint file + updated_download_state.save_file(checkpoint_file_path) diff --git a/src/deadline/client/cli/_incremental_download.py b/src/deadline/client/cli/_incremental_download.py index 53a1a831e..e45b7dbab 100644 --- a/src/deadline/client/cli/_incremental_download.py +++ b/src/deadline/client/cli/_incremental_download.py @@ -3,13 +3,165 @@ __all__ = ["_incremental_output_download"] +from datetime import datetime, timedelta, timezone +import difflib +import textwrap + from .. import api -from typing import Optional, Callable +from typing import Any, Callable import boto3 +from ..api._list_jobs_by_filter_expression import _list_jobs_by_filter_expression from ...job_attachments.incremental_downloads.incremental_download_state import ( IncrementalDownloadState, + IncrementalDownloadJob, + _datetimes_to_str, ) -import datetime +from ._common import _cli_object_repr + + +def _get_download_candidate_jobs( + boto3_session: boto3.Session, farm_id: str, queue_id: str, starting_timestamp: datetime +) -> dict[str, dict[str, Any]]: + """ + Uses deadline:SearchJobs queries to get a dict {job_id: job} of download candidates for the queue. + This is a superset of all the jobs that have produced any output for download since + the provided starting_timestamp. + + Args: + boto3_session: The boto3 session for calling AWS APIs. + farm_id: The farm ID. + queue_id: The queue ID in the farm. + starting_timestamp: The point in time from which to look for new download outputs. + + Returns: + A dictionary mapping job id to the job as returned by the deadline.search_jobs API. + """ + # Construct the full set of jobs that may have new available downloads. + # - Any active job (job with taskRunStatus in READY, ASSIGNED, + # STARTING, SCHEDULED, or RUNNING), that has at least one SUCCEEDED task. + download_candidate_jobs_dict = { + job["jobId"]: job + for job in _list_jobs_by_filter_expression( + boto3_session, + farm_id, + queue_id, + filter_expression={ + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": status_value, + }, + } + # Maximum of 3 filters are permitted, so the 5 statuses are split + for status_value in ["READY", "ASSIGNED", "STARTING"] + ], + "operator": "OR", + }, + ) + } + download_candidate_jobs_dict.update( + { + job["jobId"]: job + for job in _list_jobs_by_filter_expression( + boto3_session, + farm_id, + queue_id, + filter_expression={ + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": status_value, + }, + } + for status_value in ["SCHEDULED", "RUNNING"] + ], + "operator": "OR", + }, + ) + } + ) + print(f"DEBUG: Got {len(download_candidate_jobs_dict)} active jobs") + download_candidate_jobs_dict = { + job_id: _datetimes_to_str(job) + for job_id, job in download_candidate_jobs_dict.items() + if job["taskRunStatusCounts"]["SUCCEEDED"] > 0 + } + print( + f"DEBUG: Filtered down to {len(download_candidate_jobs_dict)} active jobs based on SUCCEEDED task filter" + ) + + # - Any recently ended job (job went from active to terminal with a taskRunStatus + # in SUSPENDED, CANCELED, FAILED, SUCCEEDED, NOT_COMPATIBLE), that has at least + # one SUCCEEDED task. The endedAt timestamp field gets updated when that occurs. + # TODO: Enable this when filtering by ENDED_AT works. + # download_candidate_jobs_dict.update( + # { + # job["jobId"]: job + # for job in _list_jobs_by_filter_expression( + # boto3_session, + # farm_id, + # queue_id, + # filter_expression={ + # "filters": [ + # { + # "dateTimeFilter": { + # "name": "ENDED_AT", + # "dateTime": starting_timestamp, + # "operator": "GREATER_THAN_EQUAL_TO", + # } + # } + # ], + # "operator": "AND", + # }, + # ) + # } + # ) + # WORKAROUND: Get all jobs with a SUCCEEDED or SUSPENDED task run status, and filter by endedAt client-side. + # We want to download everything that is succeeded or suspended, but not + # FAILED, CANCELED, or NOT_COMPATIBLE. + recently_ended_jobs = _list_jobs_by_filter_expression( + boto3_session, + farm_id, + queue_id, + filter_expression={ + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": status_value, + }, + } + for status_value in ["SUCCEEDED", "SUSPENDED"] + ], + "operator": "OR", + }, + ) + print(f"DEBUG: Got {len(recently_ended_jobs)} succeeded/suspended jobs") + # Jobs that are submitted with a SUSPENDED status will have no "endedAt" field + # Filter to jobs that: + # 1. Have an endedAt field. (jobs submitted as SUSPENDED will not have one) + # 2. Timestamp endedAt is after the timestamp threshold. + # 3. The count of SUCCEEDED tasks is positive. + recently_ended_jobs = [ + job + for job in recently_ended_jobs + if "endedAt" in job + and job["endedAt"] >= starting_timestamp + and job["taskRunStatusCounts"]["SUCCEEDED"] > 0 + ] + print( + f"DEBUG: Filtered down to {len(recently_ended_jobs)} succeeded/suspended jobs based on endedAt timestamp threshold and SUCCEEDED task filter" + ) + download_candidate_jobs_dict.update( + {job["jobId"]: _datetimes_to_str(job) for job in recently_ended_jobs} + ) + + return download_candidate_jobs_dict @api.record_function_latency_telemetry_event() @@ -18,7 +170,6 @@ def _incremental_output_download( queue_id: str, boto3_session: boto3.Session, download_state: IncrementalDownloadState, - path_mapping_rules: Optional[str] = None, print_function_callback: Callable[[str], None] = lambda msg: None, ) -> IncrementalDownloadState: """ @@ -29,7 +180,6 @@ def _incremental_output_download( :param farm_id: farm id for the output download :param queue_id: queue for scoping output download :param download_state: Download state for starting the incremental download - :param path_mapping_rules: path mapping rules for cross OS path mapping :param boto3_session: boto3 session :param print_function_callback: Callback to print messages produced in this function. Used in the CLI to print to stdout using click.echo. By default, ignores messages. @@ -38,14 +188,159 @@ def _incremental_output_download( # When this function is done, we will be confident that downloads are complete up to # this timestamp. We subtract a duration from now() that gives a generous amount of # time for the deadline:SearchJobs API's eventual consistency to converge. - new_completed_timestamp = ( - datetime.datetime.now(datetime.timezone.utc) - - download_state.eventual_consistency_max_duration + new_completed_timestamp = datetime.now(timezone.utc) - timedelta( + seconds=download_state.eventual_consistency_max_seconds + ) + + print_function_callback( + f"Checkpoint state tracks {len(download_state.jobs)} jobs. Retrieving updated data from Deadline Cloud..." + ) + + deadline = boto3_session.client("deadline") + + download_candidate_jobs = _get_download_candidate_jobs( + boto3_session, farm_id, queue_id, download_state.downloads_completed_timestamp + ) + download_candidate_job_ids = set(download_candidate_jobs.keys()) + + in_progress_jobs = {job.job_id: job.job for job in download_state.jobs} + in_progress_job_ids = set(in_progress_jobs.keys()) + + print_function_callback( + f"Comparing with {len(download_candidate_jobs)} download candidate jobs..." ) + updated_in_progress_jobs = [] + + dropped_job_ids = in_progress_job_ids.difference(download_candidate_job_ids) + updated_job_ids = in_progress_job_ids.intersection(download_candidate_job_ids) + new_job_ids = download_candidate_job_ids.difference(in_progress_job_ids) + # The following sets get populated while analyzing the jobs + unchanged_job_ids = set() + attachments_free_job_ids = set() + + # Copy the job attachments manifest data from the checkpoint to the new job objects. This data is not returned + # by deadline:SearchJobs, so we need to call deadline:GetJob on every job to retrieve it. The manifests on a job + # don't change, so after the call to deadline:GetJob we can cache it indefinitely. + for job_id in updated_job_ids: + ip_job = in_progress_jobs[job_id] + dc_job = download_candidate_jobs[job_id] + + if ip_job["attachments"] is None: + # Carry over the minimal placeholder identifying the job as not using job attachments + download_candidate_jobs[job_id] = ip_job + attachments_free_job_ids.add(job_id) + updated_in_progress_jobs.append(ip_job) + else: + # Carry over the attachments manifest metadata + dc_job["attachments"] = ip_job["attachments"] + updated_job_ids.difference_update(attachments_free_job_ids) + + # Prune jobs that we are certain have no changes by looking at its task status counts. A job is unchanged if both of these are true: + # 1. job["taskRunStatusCounts"]["SUCCEEDED"] stayed the same. Except for when a task is requeued, this count will always increase + # when new output is available to download. If a task is requeued, this value could drop and then return to the same value + # when new output is generated. + # 2. job["updatedAt"] stayed the same. If a task is requeued, this timestamp will be updated, so this catches anything missed + # by the first check. This timestamp can also change for other reasons, and the later checks that look at session actions + # directly will find those cases. + for job_id in updated_job_ids: + ip_job = in_progress_jobs[job_id] + dc_job = download_candidate_jobs[job_id] + + if ip_job["taskRunStatusCounts"]["SUCCEEDED"] == dc_job["taskRunStatusCounts"][ + "SUCCEEDED" + ] and ip_job.get("updatedAt") == dc_job.get("updatedAt"): + print_function_callback(f"UNCHANGED Job: {dc_job['name']} ({job_id})") + unchanged_job_ids.add(job_id) + updated_in_progress_jobs.append(dc_job) + updated_job_ids.difference_update(unchanged_job_ids) + + # First make note of any jobs that were dropped, for example if they were canceled or they failed + for job_id in dropped_job_ids: + ip_job = in_progress_jobs[job_id] + + print_function_callback(f"DROPPED Job: {ip_job['name']} ({job_id})") + if ip_job["attachments"] is None: + print_function_callback(" Job without job attachments no longer needs tracking") + else: + print_function_callback( + " Job is not a download candidate anymore (likely canceled or failed)" + ) + + # Process all the jobs that have updates + for job_id in updated_job_ids: + ip_job = in_progress_jobs[job_id] + dc_job = download_candidate_jobs[job_id] + + print_function_callback(f"EXISTING Job: {ip_job['name']} ({job_id})") + print_function_callback( + f" Succeeded tasks (before): {ip_job['taskRunStatusCounts']['SUCCEEDED']} / {sum(value for _, value in ip_job['taskRunStatusCounts'].items())}" + ) + print_function_callback( + f" Succeeded tasks (now) : {dc_job['taskRunStatusCounts']['SUCCEEDED']} / {sum(value for _, value in dc_job['taskRunStatusCounts'].items())}" + ) + + # Use the CLI output format to produce a diff of the changes + ip_job_repr: list[str] = _cli_object_repr(ip_job).splitlines() + dc_job_repr: list[str] = _cli_object_repr(dc_job).splitlines() + + for line in difflib.unified_diff( + ip_job_repr, + dc_job_repr, + fromfile="Previous update", + tofile="Current update", + lineterm="", + ): + print_function_callback(f" {line}") + + updated_in_progress_jobs.append(dc_job) + + # Process all the jobs that are new + for job_id in new_job_ids: + dc_job = download_candidate_jobs[job_id] + + # Call deadline:GetJob to retrieve attachments manifest information + job = deadline.get_job(jobId=job_id, queueId=queue_id, farmId=farm_id) + dc_job["attachments"] = job.get("attachments") + + print_function_callback(f"NEW Job: {dc_job['name']} ({job_id})") + print_function_callback( + f" Succeeded tasks: {dc_job['taskRunStatusCounts']['SUCCEEDED']} / {sum(value for _, value in dc_job['taskRunStatusCounts'].items())}" + ) + if dc_job["attachments"] is None: + # If the job does not use job attachments, save a minimal placeholder to avoid + # repeatedly calling deadline:GetJob. + download_candidate_jobs[job_id] = dc_job = { + "jobId": job_id, + "name": dc_job["name"], + "attachments": None, + } + attachments_free_job_ids.add(job_id) + print_function_callback(" Job does not use job attachments.") + else: + print_function_callback(textwrap.indent(_cli_object_repr(dc_job["attachments"]), " ")) + + updated_in_progress_jobs.append(dc_job) + new_job_ids.difference_update(attachments_free_job_ids) + + download_state.jobs = [IncrementalDownloadJob(job) for job in updated_in_progress_jobs] + + print_function_callback("") + print_function_callback(f"Identified {len(download_state.jobs)}") + # TODO the rest of the incremental output download # Update the timestamp in the state object to reflect the downloads that were completed - download_state.downloads_completed_timestamp = new_completed_timestamp + download_state.downloads_completed_timestamp = max( + download_state.downloads_started_timestamp, new_completed_timestamp + ) + + print_function_callback("") + print_function_callback("Summary of incremental output download:") + print_function_callback(f" Jobs without job attachments: {len(attachments_free_job_ids)}") + print_function_callback(f" Jobs unchanged: {len(unchanged_job_ids)}") + print_function_callback(f" Jobs added: {len(new_job_ids)}") + print_function_callback(f" Jobs updated: {len(updated_job_ids)}") + print_function_callback(f" Jobs dropped: {len(dropped_job_ids)}") return download_state diff --git a/src/deadline/client/cli/_pid_file_lock.py b/src/deadline/client/cli/_pid_file_lock.py index d2ca4d039..fb7095a53 100644 --- a/src/deadline/client/cli/_pid_file_lock.py +++ b/src/deadline/client/cli/_pid_file_lock.py @@ -3,7 +3,6 @@ import os import sys import psutil -from typing import Callable from contextlib import contextmanager from ..exceptions import DeadlineOperationError import logging @@ -46,7 +45,6 @@ def _claim_pid_lock_with_rename(tmp_file_name: str, pid_file_path: str) -> bool: def _try_acquire_pid_lock( pid_file_path: str, operation_name: str = "the operation", - print_function_callback: Callable[[str], None] = print, ): """ Checks if the specified pid lock file exists and executes as per the following: @@ -60,12 +58,8 @@ def _try_acquire_pid_lock( to handle concurrent processes making the same check. This will not work if primitive file locks are disabled. :param pid_file_full_path: full path of the pid lock file - :param print_function_callback: Callback to print messages produced in this function. - Used in the CLI to print to stdout using click.echo. By default, ignores messages. :return: boolean, True if pid lock was obtained successfully, throws an exception otherwise """ - print_function_callback(f"Checking if another download is in progress at {pid_file_path}") - current_process_id: int = os.getpid() # Generate a tmp file for writing the pid file as a whole and prevent corrupt data @@ -108,7 +102,7 @@ def _try_acquire_pid_lock( ) except FileNotFoundError: # In this case, the pid lock is free to acquire - print_function_callback(f"Pid lock file does not exist at {pid_file_path}") + pass # After possibly cleaning up a stale lock, try claiming it again if _claim_pid_lock_with_rename(tmp_file_path, pid_file_path): @@ -127,17 +121,13 @@ def _try_acquire_pid_lock( logger.warning(f"Failed to clean up pid lock temporary file: {e}") -def _release_pid_lock(pid_file_path: str, print_function_callback: Callable[[str], None] = print): +def _release_pid_lock(pid_file_path: str): """ Releases the pid lock by deleting the pid file. :param pid_file_full_path: full path of the pid lock file - :param print_function_callback: print_function_callback (Callable str -> None, optional): Callback to print messages produced in this function. - Used in the CLI to print to stdout using click.echo. By default, ignores messages. :return: boolean, True if pid lock released successfully """ - print_function_callback(f"Releasing pid lock at {pid_file_path}") - # Get the current process's id to obtain lock current_process_id: int = os.getpid() @@ -163,9 +153,6 @@ def _release_pid_lock(pid_file_path: str, print_function_callback: Callable[[str if lock_holder_pid == str(current_process_id): # Process pid from file is same as current process pid - release pid lock os.remove(pid_file_path) - print_function_callback( - f"Process with pid {lock_holder_pid} is the current process. Deleted pid lock file." - ) else: # Process pid from file is different from current process pid. logger.warning( @@ -177,7 +164,6 @@ def _release_pid_lock(pid_file_path: str, print_function_callback: Callable[[str def PidFileLock( lock_file_path: str, operation_name: str = "the operation", - print_function_callback: Callable[[str], None] = print, ): """ A context manager for holding a pid (process id) lock file during the scope of a 'with' statement. @@ -192,11 +178,9 @@ def PidFileLock( Args: lock_file_path (str): The file system path of the PID lock file. operation_name (Optional[str]): The name of the operation being performed in the lock, used for error messages. - print_function_callback (Optional[Callable]): A function that accepts a string to print debugging info. - """ - _try_acquire_pid_lock(lock_file_path, operation_name, print_function_callback) + _try_acquire_pid_lock(lock_file_path, operation_name) try: yield None finally: - _release_pid_lock(lock_file_path, print_function_callback) + _release_pid_lock(lock_file_path) diff --git a/src/deadline/client/config/config_file.py b/src/deadline/client/config/config_file.py index 57a281172..c9dc92681 100644 --- a/src/deadline/client/config/config_file.py +++ b/src/deadline/client/config/config_file.py @@ -11,6 +11,7 @@ "clear_setting", "get_best_profile_for_farm", "str2bool", + "DEFAULT_QUEUE_INCREMENTAL_DOWNLOAD_DIR", ] import getpass @@ -37,9 +38,10 @@ f"https://deadline.{boto3.Session().region_name}.amazonaws.com", ) -# The default directory within which to save the history of created jobs. +# Default directories used by various Deadline CLI commands DEFAULT_JOB_HISTORY_DIR = os.path.join("~", ".deadline", "job_history", "{aws_profile_name}") DEFAULT_CACHE_DIR = os.path.join("~", ".deadline", "cache") +DEFAULT_QUEUE_INCREMENTAL_DOWNLOAD_DIR = os.path.join("~", ".deadline", "incremental_download") _TRUE_VALUES = {"yes", "on", "true", "1"} _FALSE_VALUES = {"no", "off", "false", "0"} diff --git a/src/deadline/client/job_bundle/_yaml.py b/src/deadline/client/job_bundle/_yaml.py index da2839bf4..a788e1360 100644 --- a/src/deadline/client/job_bundle/_yaml.py +++ b/src/deadline/client/job_bundle/_yaml.py @@ -72,7 +72,7 @@ def __init__( Resolver.__init__(self) -def deadline_yaml_dump(data, stream=None, **kwds): +def deadline_yaml_dump(data, stream=None, **kwds) -> str: """ Works like pyyaml's safe_dump, but saves multi-line strings with the "|" style and defaults to sort_keys=False. diff --git a/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py b/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py index 6d0198e3f..3b38231cc 100644 --- a/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py +++ b/src/deadline/job_attachments/incremental_downloads/incremental_download_state.py @@ -4,29 +4,48 @@ import json import os -from datetime import datetime, timedelta -from typing import Any, Optional, Callable +from datetime import datetime +from typing import Any, Optional import tempfile +# This as an upper bound to allow for eventual consistency into the materialized view that +# the deadline:SearchJobs API is based on. It's taken from numbers seen in heavy load testing, +# increased by a generous amount. +EVENTUAL_CONSISTENCY_MAX_SECONDS = 120 + + +def _datetimes_to_str(obj: Any) -> Any: + """Recursively applies the isoformat() function to all datetimes in the object""" + if isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, list): + return [_datetimes_to_str(item) for item in obj] + elif isinstance(obj, dict): + return {key: _datetimes_to_str(value) for key, value in obj.items()} + else: + return obj + class IncrementalDownloadJob: """ Model representing a job in the download progress state. """ - _required_dict_fields = ["jobId", "sessions"] + _required_dict_fields = ["jobId", "job", "sessions"] job_id: str + job: dict[str, Any] sessions: list - def __init__(self, job_id: str, sessions: Optional[list] = None): + def __init__(self, job: dict[str, Any], sessions: Optional[list] = None): """ Initialize a Job instance. Args: - job_id (str): The ID of the job + job (dict[str, Any]): The job as returned by boto3 from deadline:SearchJobs. sessions (list): List of JobSession objects """ - self.job_id = job_id + self.job_id = job["jobId"] + self.job = _datetimes_to_str(job) self.sessions = sessions or [] @classmethod @@ -45,7 +64,7 @@ def from_dict(cls, data: dict[str, Any]): raise ValueError(f"Input is missing required fields: {missing_fields}") sessions = data["sessions"] - return cls(job_id=data["jobId"], sessions=sessions) + return cls(job=data["job"], sessions=sessions) def to_dict(self) -> dict[str, Any]: """ @@ -53,23 +72,24 @@ def to_dict(self) -> dict[str, Any]: Returns: dict: Dictionary representation of the job """ - return {"jobId": self.job_id, "sessions": self.sessions} + return {"jobId": self.job_id, "job": self.job, "sessions": self.sessions} class IncrementalDownloadState: """ Model for tracking all the job attachments downloads to perform for a queue over time. - A new download becomes available whenever a TASK_RUN session action completes. The state - includes some informational fields that are not strictly necessary, to help make the data + A new download becomes available whenever a TASK_RUN session action completes. + + This class includes some informational fields that are not strictly necessary, to help make the data on disk easier to understand on inspection. * https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_GetSessionAction.html#API_GetSessionAction_ResponseSyntax * https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_SessionActionDefinition.html - We track state at three levels, and use the resource state at one level to prune queries at lower levels when we can: + The Deadline Cloud APIs do not provide direct access to a stream of completed session actions, so we reconstruct such + a stream by tracking state at three levels. Where possible, we use the resource state at one level to prune queries at lower levels: - 1. Job - The jobs list contains every job that entered an active status within the time interval [downloads_started_timestamp, downloads_completed_timestamp], - where it can generate new task runs, and has not exited as complete or failed. + 1. Job - The jobs list contains every job that is active and that we have downloaded output from in a previous incremental download command. 2. Session - Each session of a job represents a single worker running a sequence of tasks from the job. The sessions list in a job contains all the sessions that are active and from which we have downloaded some output. 3. SessionAction - Session actions have sequential IDs, so for each session we track the highest index of session action @@ -121,9 +141,9 @@ class IncrementalDownloadState: downloads_started_timestamp: datetime """The timestamp of when the download state was bootstrapped.""" - downloads_completed_timestamp: Optional[datetime] + downloads_completed_timestamp: datetime """The timestamp up to which we are confident downloads are complete.""" - eventual_consistency_max_duration: timedelta = timedelta(seconds=120) + eventual_consistency_max_seconds: int = EVENTUAL_CONSISTENCY_MAX_SECONDS """The duration for deadline:SearchJobs query overlap, to account for eventual consistency.""" jobs: list[IncrementalDownloadJob] @@ -134,7 +154,7 @@ def __init__( downloads_started_timestamp: datetime, downloads_completed_timestamp: Optional[datetime] = None, jobs: Optional[list] = None, - eventual_consistency_max_duration: Optional[timedelta] = None, + eventual_consistency_max_seconds: Optional[int] = None, ): """ Initialize a IncrementalDownloadState instance. To bootstrap the state, construct with only the downloads_started_timestamp. @@ -144,12 +164,15 @@ def __init__( downloads_completed_timestamp (datetime): The timestamp up to which we are confident downloads are complete. jobs (list[IncrementalDownloadJob]): The list of jobs that entered 'active' status between downloads_started_timestamp and downloads_completed_timestamp, and are not completed. - eventual_consistency_max_duration (Optional[timedelta]): The duration for deadline:SearchJobs query overlap, to account for eventual consistency. + eventual_consistency_max_seconds (Optional[int]): The duration, in seconds, for deadline:SearchJobs query overlap, to account for eventual consistency. """ self.downloads_started_timestamp = downloads_started_timestamp - self.downloads_completed_timestamp = downloads_completed_timestamp - if eventual_consistency_max_duration: - self.eventual_consistency_max_duration = eventual_consistency_max_duration + if downloads_completed_timestamp is not None: + self.downloads_completed_timestamp = downloads_completed_timestamp + else: + self.downloads_completed_timestamp = downloads_started_timestamp + if eventual_consistency_max_seconds: + self.eventual_consistency_max_seconds = eventual_consistency_max_seconds self.jobs = jobs or [] @classmethod @@ -172,10 +195,8 @@ def from_dict(cls, data): downloads_completed_timestamp=datetime.fromisoformat( data["downloadsCompletedTimestamp"] ), - eventual_consistency_max_duration=timedelta( - seconds=int(data["eventualConsistencyMaxSeconds"]) - ), - jobs=data["jobs"], + eventual_consistency_max_seconds=int(data["eventualConsistencyMaxSeconds"]), + jobs=[IncrementalDownloadJob.from_dict(job) for job in data["jobs"]], ) def to_dict(self): @@ -186,7 +207,7 @@ def to_dict(self): """ result = { "downloadsStartedTimestamp": self.downloads_started_timestamp.isoformat(), - "eventualConsistencyMaxSeconds": self.eventual_consistency_max_duration.total_seconds(), + "eventualConsistencyMaxSeconds": self.eventual_consistency_max_seconds, "jobs": [job.to_dict() for job in self.jobs], } if self.downloads_completed_timestamp is not None: @@ -198,7 +219,6 @@ def to_dict(self): def from_file( cls, file_path: str, - print_function_callback: Callable[[str], None] = print, ) -> "IncrementalDownloadState": """ Loads progress from state file saved at saved_progress_checkpoint_full_path @@ -213,15 +233,11 @@ def from_file( state_data = json.load(file) download_state = IncrementalDownloadState.from_dict(state_data) - print_function_callback( - f"Loaded existing state file from download progress checkpoint location {file_path}" - ) return download_state def save_file( self, file_path: str, - print_function_callback: Callable[[str], None] = print, ) -> None: """ Save the current download progress to a state file atomically. @@ -249,5 +265,3 @@ def save_file( # Atomically replace the target file with the temporary file os.replace(tmpfile.name, file_path) - - print_function_callback(f"Successfully saved state file to {file_path}") diff --git a/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py b/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py index 9a7c71583..0fb31036a 100644 --- a/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py +++ b/test/unit/deadline_client/api/test_list_jobs_by_filter_expression.py @@ -14,7 +14,7 @@ ) # Constants for testing -from .mock_search_jobs import create_fake_job_list, mock_search_jobs_for_set +from ..mock_deadline_job_apis import create_fake_job_list, mock_search_jobs_for_set from ..shared_constants import MOCK_FARM_ID, MOCK_QUEUE_ID MOCK_TIMESTAMP = datetime(2023, 1, 2, 3, 4, 5, tzinfo=timezone.utc) diff --git a/test/unit/deadline_client/cli/test_cli_queue.py b/test/unit/deadline_client/cli/test_cli_queue.py index 61222949f..1d2411034 100644 --- a/test/unit/deadline_client/cli/test_cli_queue.py +++ b/test/unit/deadline_client/cli/test_cli_queue.py @@ -14,7 +14,6 @@ from deadline.client.cli import main from ..shared_constants import MOCK_FARM_ID, MOCK_QUEUES_LIST -import os def test_cli_queue_list(fresh_deadline_config): @@ -146,16 +145,3 @@ def test_cli_queue_get(fresh_deadline_config): farmId=MOCK_FARM_ID, queueId=MOCK_QUEUES_LIST[0]["queueId"] ) assert result.exit_code == 0 - - -def test_incremental_download_existence(fresh_deadline_config): - """ - Confirm that the CLI is not available for environment with no ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD variable - """ - - assert not os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") - - runner = CliRunner() - response = runner.invoke(main, ["queue", "incremental-output-download"]) - - assert "Error: No such command 'incremental-output-download'." in response.output diff --git a/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py b/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py index 5c3110922..c9a91a515 100644 --- a/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py +++ b/test/unit/deadline_client/cli/test_cli_queue_incremental_download.py @@ -4,19 +4,26 @@ Tests for the CLI queue incremental output download command. """ -import json import os import pytest from unittest.mock import patch, MagicMock +from datetime import datetime import boto3 from freezegun import freeze_time from click.testing import CliRunner from deadline.client.cli import main +import deadline.client +import psutil + +from ..shared_constants import MOCK_FARM_ID, MOCK_QUEUE_ID, MOCK_JOB_ID +from ..mock_deadline_job_apis import ( + mock_search_jobs_for_set, + create_fake_job_list, + mock_get_job_for_set, +) - -MOCK_FARM_ID = "farm-0123456789abcdef" -MOCK_QUEUE_ID = "queue-0123456789abcdef" +ISO_FREEZE_TIME = "2025-05-26 12:00:00+00:00" # Fixtures for shared resources @@ -31,21 +38,12 @@ def checkpoint_dir(tmp_path_factory): @pytest.fixture(scope="module") def boto3_session(): """Create a mock boto3 session for all tests to use.""" - return MagicMock(spec=boto3.Session) - - -@pytest.fixture -def progress_file(checkpoint_dir): - """Create a progress file path for tests that need it. - - This has function scope so each test gets a fresh file. - """ - progress_file_path = os.path.join(checkpoint_dir, f"{MOCK_QUEUE_ID}_download_progress.json") - # File will be created by the test that needs it - yield progress_file_path - # Clean up after each test - if os.path.exists(progress_file_path): - os.remove(progress_file_path) + mock_session = MagicMock(spec=boto3.Session) + mock_session.client().get_queue.return_value = {"displayName": "Mock Queue"} + with patch.object(boto3, "Session", return_value=mock_session), patch.object( + deadline.client.api, "get_deadline_cloud_library_telemetry_client" + ): + yield mock_session @pytest.fixture @@ -59,361 +57,178 @@ def pid_lock_file(checkpoint_dir): @pytest.fixture -def path_mapping_rules_file(tmp_path_factory): - """Create a path mapping rules file for tests that need it.""" - # Create in a separate directory to avoid conflicts - rules_dir = tmp_path_factory.mktemp("rules") - rules_file_path = os.path.join(str(rules_dir), "rules.json") - yield rules_file_path - # Clean up - if os.path.exists(rules_file_path): - os.remove(rules_file_path) +def with_incremental_download_enabled(): + """Set the ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD environment variable to 1 for testing the incremental download command.""" + os.environ["ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD"] = "1" + yield None + del os.environ["ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD"] -@pytest.fixture -def sample_progress_data(): - """Sample progress data for testing.""" - return { - "lastLookbackTime": "2025-04-04T05:30:00", - "jobs": [ - { - "jobId": "job-1234353453443", - "sessions": [ - { - "sessionId": "session-1324324354354", - "sessionLifecycleStatus": "SUCCESSFUL", - "lastDownloadedSessActionId": 3, - }, - { - "sessionId": "session-3423435435454", - "sessionLifecycleStatus": "RUNNING", - "lastDownloadedSessActionId": 6, - }, - ], - }, - { - "jobId": "Job-3234324354345", - "sessions": [ - { - "sessionId": "session-4235435434345", - "sessionLifecycleStatus": "FAILED", - "lastDownloadedSessActionId": 3, - } - ], - }, - ], - } - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@freeze_time("2025-05-26 12:00:00+00:00") -@patch("deadline.client.api.get_boto3_session") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_incremental_output_download_success_load_from_progress( - mock_get_boto3_session, checkpoint_dir, progress_file, sample_progress_data -): - """Test successful execution of incremental_output_download with loading progress from state file""" - # Create a real progress file with test data - with open(progress_file, "w") as f: - json.dump(sample_progress_data, f, indent=2) - - # Mock boto3 session - mock_session = MagicMock(spec=boto3.Session) - mock_get_boto3_session.return_value = mock_session - - # Run the CLI command +def test_incremental_output_download_requires_beta_acknowledgement(boto3_session, checkpoint_dir): + # Run the CLI command once to bootstrap the operation runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - ], - ) + with freeze_time(ISO_FREEZE_TIME): + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) # Assert the command executed successfully - assert result.exit_code == 0 + assert result.exit_code == 1, result.output - # Check that the progress file was updated - with open(progress_file, "r") as f: - updated_progress = json.load(f) + assert ( + "The incremental-output-download command is not fully implemented. You must set the environment variable ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD to 1 to acknowledge this." + in result.output + ), result.output - # Verify the lastLookbackTime was updated to the frozen time - assert updated_progress["lastLookbackTime"] == "2025-05-26T12:00:00+00:00" - # Verify the job data was preserved - assert len(updated_progress["jobs"]) == 2 - assert updated_progress["jobs"][0]["jobId"] == "job-1234353453443" - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@freeze_time("2025-05-26 12:00:00+00:00") -@patch("deadline.client.api.get_boto3_session") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -@pytest.mark.parametrize("bootstrap_lookback_in_minutes", [60, None]) -def test_incremental_output_download_success_with_force_bootstrap( - mock_get_boto3_session, checkpoint_dir, progress_file, bootstrap_lookback_in_minutes +def test_incremental_output_download_simple_success( + with_incremental_download_enabled, boto3_session, checkpoint_dir ): - """Test successful execution of incremental_output_download with bootstrapping""" - # Create a file that should be ignored due to force_bootstrap=True - with open(progress_file, "w") as f: - json.dump({"lastLookbackTime": "2025-01-01T00:00:00", "jobs": []}, f) - - # Mock boto3 session - mock_session = MagicMock(spec=boto3.Session) - mock_get_boto3_session.return_value = mock_session + """Test successful execution of incremental_output_download""" + mock_jobs = create_fake_job_list(1) + mock_jobs[0]["name"] = "Mock Job" + mock_jobs[0]["jobId"] = MOCK_JOB_ID + mock_jobs[0]["taskRunStatus"] = "READY" + mock_jobs[0]["taskRunStatusCounts"] = { + "SUCCEEDED": 1, + "READY": 1, + } + mock_jobs[0]["attachments"] = { + "manifests": [ + {"rootPath": "/", "rootPathFormat": "posix", "outputRelativeDirectories": ["."]} + ], + "fileSystem": "VIRTUAL", + } + del mock_jobs[0]["endedAt"] + boto3_session.client().search_jobs = mock_search_jobs_for_set( + MOCK_FARM_ID, MOCK_QUEUE_ID, mock_jobs + ) + boto3_session.client().get_job = mock_get_job_for_set(MOCK_FARM_ID, MOCK_QUEUE_ID, mock_jobs) - # Run the CLI command + # Run the CLI command once to bootstrap the operation runner = CliRunner() - cmd = [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--force-bootstrap", - ] - - if bootstrap_lookback_in_minutes is not None: - cmd.extend(["--bootstrap-lookback-in-minutes", str(bootstrap_lookback_in_minutes)]) - - result = runner.invoke(main, cmd) + with freeze_time(ISO_FREEZE_TIME): + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) # Assert the command executed successfully - assert result.exit_code == 0 - - # Check that the progress file was updated - with open(progress_file, "r") as f: - updated_progress = json.load(f) - - # Verify the lastLookbackTime was updated to the frozen time - assert updated_progress["lastLookbackTime"] == "2025-05-26T12:00:00+00:00" + assert result.exit_code == 0, result.output + + # Assert that the output contained information about the bootstrapping and the mocked resources + assert "Started incremental download for queue: Mock Queue" in result.output, result.output + assert ( + f"Checkpoint: {os.path.join(checkpoint_dir, MOCK_QUEUE_ID + '_download_checkpoint.json')}" + in result.output + ), result.output + assert "Checkpoint not found, lookback is 0.0 minutes" in result.output, result.output + # Need to convert the freeze time to the local time zone for this print assertion + assert ( + f"Initializing from: {datetime.fromisoformat(ISO_FREEZE_TIME).astimezone().isoformat()}" + in result.output + ), result.output + assert f"NEW Job: Mock Job ({MOCK_JOB_ID})" in result.output, result.output + assert "Succeeded tasks: 1 / 2" in result.output, result.output + assert "Jobs added: 1" in result.output, result.output + + # Edit the mock job to complete the task + mock_jobs[0]["taskRunStatus"] = "SUCCEEDED" + mock_jobs[0]["taskRunStatusCounts"] = { + "SUCCEEDED": 2, + "READY": 0, + } + mock_jobs[0]["endedAt"] = datetime.fromisoformat(ISO_FREEZE_TIME) + + # Run the CLI command again to "complete" the download that was started + with freeze_time(ISO_FREEZE_TIME): + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) - # Verify the jobs list is empty (as it would be with a fresh bootstrap) - assert updated_progress["jobs"] == [] + # Assert the command executed successfully + assert result.exit_code == 0, result.output + + # Assert that the output contained information about loading the checkpoint and the mocked resources + assert "Started incremental download for queue: Mock Queue" in result.output, result.output + assert ( + f"Checkpoint: {os.path.join(checkpoint_dir, MOCK_QUEUE_ID + '_download_checkpoint.json')}" + in result.output + ), result.output + assert "Checkpoint found" in result.output, result.output + # Need to convert the freeze time to the local time zone for this print assertion + assert ( + f"Continuing from: {datetime.fromisoformat(ISO_FREEZE_TIME).astimezone().isoformat()}" + in result.output + ), result.output + assert f"EXISTING Job: Mock Job ({MOCK_JOB_ID})" in result.output, result.output + assert "Succeeded tasks (before): 1 / 2" in result.output, result.output + assert "Succeeded tasks (now) : 2 / 2" in result.output, result.output + assert "Jobs updated: 1" in result.output, result.output -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch("psutil.pid_exists") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) def test_incremental_output_download_pid_lock_already_held_error( - mock_pid_exists, checkpoint_dir, pid_lock_file + with_incremental_download_enabled, boto3_session, checkpoint_dir, pid_lock_file ): """Test incremental_output_download when PidLockAlreadyHeld is raised""" # Write a fake PID to the file with open(pid_lock_file, "w") as f: - f.write("12345") # Use a fake PID - - # Make psutil.pid_exists return True to simulate the process is running - mock_pid_exists.return_value = True + f.write("12345678") # Use a fake PID # Run the CLI command runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - ], - ) - - # Assert the command executed successfully but with a message about another download in progress - assert result.exit_code == 0 - assert f"Another download is in progress at {checkpoint_dir}" in result.output + with patch.object(psutil, "pid_exists") as mock_pid_exists: + # Make psutil.pid_exists return True to simulate the process is running + mock_pid_exists.return_value = True + result = runner.invoke( + main, + [ + "queue", + "incremental-output-download", + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--checkpoint-dir", + checkpoint_dir, + ], + ) + + # Assert the command did not execute successfully and wrote a message about another download in progress + assert result.exit_code == 1, result.output + assert ( + f"Unable to perform incremental output download as process with pid 12345678 already holds the lock {os.path.join(checkpoint_dir, MOCK_QUEUE_ID + '_incremental_output_download.pid')}" + in result.output + ), result.output # Verify the PID file still exists since we're simulating another process holding the lock assert os.path.exists(pid_lock_file) - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_success(checkpoint_dir): - """Test successful validation of file inputs""" - # Run the CLI command with a valid directory - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - ], - ) - - # The command should execute - assert result.exit_code == 0 - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "1"}) -def test_validate_file_inputs_invalid_directory(checkpoint_dir): - """Test validation when directory is invalid""" - # Create a path to a non-existent directory - nonexistent_dir = os.path.join(checkpoint_dir, "nonexistent_directory") - - # Run the CLI command with an invalid directory - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - nonexistent_dir, - ], - ) - - # The command should execute but report the validation error - assert result.exit_code == 0 - assert "Download failed" in result.output - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_with_mapping_rules_success(checkpoint_dir, path_mapping_rules_file): - """Test successful validation with path mapping rules""" - # Create the rules file with valid content - with open(path_mapping_rules_file, "w") as f: - f.write('{"rules": []}') - - # Run the CLI command with valid rules file - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--path-mapping-rules", - path_mapping_rules_file, - ], - ) - - # The command should execute without validation errors - assert result.exit_code == 0 - assert "Download failed" not in result.output - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_mapping_rules_not_exist(checkpoint_dir): - """Test validation when mapping rules file doesn't exist""" - # Create a path to a non-existent rules file - nonexistent_rules = os.path.join(checkpoint_dir, "nonexistent_rules.json") - - # Run the CLI command with non-existent rules file - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--path-mapping-rules", - nonexistent_rules, - ], - ) - - # The command should execute but report the validation error - assert result.exit_code == 0 - assert "Download failed" in result.output - - -@pytest.mark.skipif( - os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is None, - reason="Incremental output download is not enabled", -) -@patch("os.access") -@patch.dict(os.environ, {"ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD": "True"}) -def test_validate_file_inputs_mapping_rules_not_readable( - mock_access, checkpoint_dir, path_mapping_rules_file -): - """Test validation when mapping rules file is not readable""" - # Create the rules file - with open(path_mapping_rules_file, "w") as f: - f.write('{"rules": []}') - - # Mock os.access to simulate a non-readable rules file - def access_side_effect(path, mode): - if path == path_mapping_rules_file and mode == os.R_OK: - return False - return True - - mock_access.side_effect = access_side_effect - - # Run the CLI command with non-readable rules file - runner = CliRunner() - result = runner.invoke( - main, - [ - "queue", - "incremental-output-download", - "--farm-id", - MOCK_FARM_ID, - "--queue-id", - MOCK_QUEUE_ID, - "--saved-progress-checkpoint-location", - checkpoint_dir, - "--path-mapping-rules", - path_mapping_rules_file, - ], - ) - - # The command should execute but report the validation error - assert result.exit_code == 0 - assert "Download failed" in result.output diff --git a/test/unit/deadline_client/test_pid_file_lock.py b/test/unit/deadline_client/cli/test_pid_file_lock.py similarity index 100% rename from test/unit/deadline_client/test_pid_file_lock.py rename to test/unit/deadline_client/cli/test_pid_file_lock.py diff --git a/test/unit/deadline_client/api/mock_search_jobs.py b/test/unit/deadline_client/mock_deadline_job_apis.py similarity index 85% rename from test/unit/deadline_client/api/mock_search_jobs.py rename to test/unit/deadline_client/mock_deadline_job_apis.py index c27dde1b5..f1b959763 100644 --- a/test/unit/deadline_client/api/mock_search_jobs.py +++ b/test/unit/deadline_client/mock_deadline_job_apis.py @@ -1,8 +1,8 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. """ -This file implements a partial re-implementation of the deadline:SearchJobs API, so that -we can unit tests queries that depend on its behavior in more complex ways. +This file implements a partial re-implementation of the deadline:SearchJobs and deadline:GetJob APIs, +so that we can unit tests queries that depend on its behavior in more complex ways. The function create_fake_job_list uses a random number generator to produce a list of job dictionaries. The mock_search_jobs_for_set takes such a list of jobs, and returns @@ -17,7 +17,9 @@ from typing import Any, Callable, Optional import uuid -from ..testing_utilities import snake_to_camel +import botocore.exceptions + +from .testing_utilities import snake_to_camel __all__ = ["create_fake_job_list", "mock_search_jobs_for_set"] @@ -123,9 +125,6 @@ def mock_search_jobs_for_set(farmIdForJobs, queueIdForJobs, jobs): """Returns a fake "search_jobs" API that emulates a subset of Deadline Cloud's SearchJobs on the provided set of jobs. - These fake jobs only have the timestamp fields. If we decide to generalize this - function for more cases, we could move this to a conftest.py and extend it. - See https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_SearchJobs.html """ @@ -150,8 +149,15 @@ def _fake_search_jobs( result_jobs = [j for j in result_jobs if filter(j)] # Construct the API response nextItemOffset = min(len(result_jobs), itemOffset + pageSize) + response_jobs = deepcopy(result_jobs[itemOffset:nextItemOffset]) + + # Remove all "attachments" properties from the response, they are not returned by deadline:SearchJobs + for job in response_jobs: + if "attachments" in job: + del job["attachments"] + response = { - "jobs": deepcopy(result_jobs[itemOffset:nextItemOffset]), + "jobs": response_jobs, "totalResults": len(result_jobs), } if nextItemOffset < len(result_jobs): @@ -162,6 +168,28 @@ def _fake_search_jobs( return _fake_search_jobs +def mock_get_job_for_set(farmIdForJobs, queueIdForJobs, jobs): + """Returns a fake "get_job" API that emulates a subset of Deadline Cloud's + GetJob on the provided set of jobs. + + See https://docs.aws.amazon.com/deadline-cloud/latest/APIReference/API_GetJob.html + """ + + def _fake_get_job(farmId, queueId, jobId): + assert farmId == farmIdForJobs + assert queueId == queueIdForJobs + + matching_jobs = [job for job in jobs if job["jobId"] == jobId] + + if matching_jobs: + return matching_jobs[0] + else: + error_class = botocore.exceptions.from_code(404) + raise error_class(f"Resource of type job with id {jobId} does not exist.", "GetJob") + + return _fake_get_job + + def create_fake_job_list( job_count: int, timestamp_min: Optional[datetime] = None, diff --git a/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py b/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py index 543e7c1c9..e2772159a 100644 --- a/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py +++ b/test/unit/deadline_job_attachments/incremental_downloads/test_incremental_download_state.py @@ -9,8 +9,9 @@ from deadline.job_attachments.incremental_downloads.incremental_download_state import ( IncrementalDownloadJob, IncrementalDownloadState, + EVENTUAL_CONSISTENCY_MAX_SECONDS, ) -from datetime import datetime, timedelta +from datetime import datetime class TestIncrementalDownloadState: @@ -47,10 +48,10 @@ def sample_state_data(self): Fixture to provide sample state data. """ return { - "downloadsStartedTimestamp": "2023-01-01T00:00:00", - "downloadsCompletedTimestamp": "2023-01-02T00:00:00", - "eventualConsistencyMaxSeconds": 120, - "jobs": [], + "downloadsStartedTimestamp": "2023-01-01T00:00:00+00:00", + "downloadsCompletedTimestamp": "2023-01-02T00:00:00+00:00", + "eventualConsistencyMaxSeconds": EVENTUAL_CONSISTENCY_MAX_SECONDS, + "jobs": [{"jobId": "job-123", "name": "Job 1"}, {"jobId": "job-124", "name": "Job 2"}], } @pytest.fixture @@ -66,15 +67,15 @@ def state_file(self, test_paths, sample_state_data): @pytest.fixture def mock_download_state(self): """ - Fixture to create a sample IncrementalDownloadState. + Fixture to create a sample IncrementalDownloadState. This state matches the sample_state_data fixture. """ - return IncrementalDownloadState.from_dict( - { - "downloadsStartedTimestamp": "2023-01-01T00:00:00", - "downloadsCompletedTimestamp": "2023-01-02T00:00:00", - "eventualConsistencyMaxSeconds": 120, - "jobs": [], - } + return IncrementalDownloadState( + downloads_started_timestamp=datetime.fromisoformat("2023-01-01T00:00:00+00:00"), + downloads_completed_timestamp=datetime.fromisoformat("2023-01-02T00:00:00+00:00"), + jobs=[ + IncrementalDownloadJob({"jobId": "job-123", "name": "Job 1"}), + IncrementalDownloadJob({"jobId": "job-124", "name": "Job 2"}), + ], ) def test_incremental_download_state_init(self): @@ -87,12 +88,12 @@ def test_incremental_download_state_init(self): # Test with minimal bootstrapped construction state = IncrementalDownloadState(bootstrap_time) assert state.downloads_started_timestamp == bootstrap_time - assert state.downloads_completed_timestamp is None - assert state.eventual_consistency_max_duration == timedelta(minutes=2) + assert state.downloads_completed_timestamp == bootstrap_time + assert state.eventual_consistency_max_seconds == 120 assert state.jobs == [] # Test with provided values - jobs = [IncrementalDownloadJob("job-123", [])] + jobs = [IncrementalDownloadJob({"jobId": "job-123"}, [])] state = IncrementalDownloadState( downloads_started_timestamp=bootstrap_time, downloads_completed_timestamp=completed_time,