Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def _list_jobs_by_filter_expression(
boto3_session (boto3.Session): The boto3 Session for AWS API access.
farm_id (str): The Farm ID.
queue_id (str): The Queue ID.
filter_expressions (dict[str, Any]): The filter expression to apply to jobs.
filter_expressions (dict[str, Any]): The filter expression to apply to jobs. This is nested one level in a
filter expression provided to deadline:SearchJobs, so cannot include a groupFilter.

Returns:
The list of all jobs in the queue that satisfy the provided filter expression. Each job is as returned by the deadline:SearchJobs API.
Expand Down
2 changes: 1 addition & 1 deletion src/deadline/client/cli/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _fix_multiline_strings(obj: Any) -> Any:
return obj


def _cli_object_repr(obj: Any):
def _cli_object_repr(obj: Any) -> str:
"""
Transforms an API response object into a string, for printing as
CLI output. This formats the output as YAML, using the "|"-style
Expand Down
271 changes: 134 additions & 137 deletions src/deadline/client/cli/_groups/queue_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
)

PID_FILE_NAME = "incremental_output_download.pid"
DOWNLOAD_PROGRESS_FILE_NAME = "download_progress.json"
DOWNLOAD_CHECKPOINT_FILE_NAME = "download_checkpoint.json"


@click.group(name="queue")
Expand Down Expand Up @@ -209,154 +209,151 @@ def queue_get(**args):
click.echo(_cli_object_repr(response))


if os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") is not None:

@cli_queue.command(
name="incremental-output-download",
help="BETA - Download Job Output data incrementally for all jobs running on a queue as session actions finish.\n"
"The command bootstraps once using a bootstrap lookback specified in minutes and\n"
"continues downloading from the last saved progress thereafter until bootstrap is forced.\n"
"[NOTE] This command is still WIP and partially implemented right now",
)
@click.option("--farm-id", help="The AWS Deadline Cloud Farm to use.")
@click.option("--queue-id", help="The AWS Deadline Cloud Queue to use.")
@click.option("--path-mapping-rules", help="Path to a file with the path mapping rules to use.")
@click.option(
"--json", default=None, is_flag=True, help="Output is printed as JSON for scripting."
)
@click.option(
"--bootstrap-lookback-in-minutes",
default=0,
type=float,
help="Downloads outputs for job-session-actions that have been completed since these many\n"
"minutes at bootstrap. Default value is 0 minutes.",
)
@click.option(
"--saved-progress-checkpoint-location",
help="Proceed downloading from previous progress file at this location, if it exists.\n"
"If parameter not provided or file does not exist,\n"
"the download will start from the provided bootstrap lookback in minutes or its default value. \n",
required=True,
)
@click.option(
"--force-bootstrap",
is_flag=True,
help="Ignores the previous download progress and forces command to start from the bootstrap \n"
"lookback period specified in minutes.\n"
"Default value is False.",
default=False,
)
@click.option(
"--conflict-resolution",
type=click.Choice(
[
FileConflictResolution.SKIP.name,
FileConflictResolution.OVERWRITE.name,
FileConflictResolution.CREATE_COPY.name,
],
case_sensitive=False,
),
default=FileConflictResolution.OVERWRITE.name,
help="How to handle downloads if an output file already exists:\n"
"CREATE_COPY (default): Download the file with a new name, appending '(1)' to the end\n"
"SKIP: Do not download the file\n"
"OVERWRITE: Download and replace the existing file.\n"
"Default behaviour is to OVERWRITE.",
)
@_handle_error
def incremental_output_download(
path_mapping_rules: str,
json: bool,
bootstrap_lookback_in_minutes: float,
saved_progress_checkpoint_location: str,
force_bootstrap: bool,
**args,
):
"""
Download Job Output data incrementally for all jobs running on a queue as session actions finish.
The command bootstraps once using a bootstrap lookback specified in minutes and
continues downloading from the last saved progress thereafter until bootstrap is forced

:param path_mapping_rules: path mapping rules for cross OS path mapping
:param json: whether output is printed as JSON for scripting
:param bootstrap_lookback_in_minutes: Downloads outputs for job-session-actions that have been completed
since these many minutes at bootstrap. Default value is 0 minutes.
:param saved_progress_checkpoint_location: location of the download progress file
:param force_bootstrap: force bootstrap and ignore current download progress. Default value is False.
:param args:
:return:
"""
logger: ClickLogger = ClickLogger(is_json=json)
logger.echo("processing " + args.__str__())

logger.echo("Started incremental download....")

# Check if download progress location is a valid directory on the os
if not os.path.isdir(saved_progress_checkpoint_location):
raise DeadlineOperationError(
f"Download progress location {saved_progress_checkpoint_location} is not a valid directory"
)
@cli_queue.command(name="incremental-output-download")
@click.option("--farm-id", help="The AWS Deadline Cloud Farm to use.")
@click.option("--queue-id", help="The AWS Deadline Cloud Queue to use.")
@click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.")
@click.option(
"--bootstrap-lookback-minutes",
default=0,
type=float,
help="Downloads outputs for job-session-actions that have been completed since these many\n"
"minutes at bootstrap. Default value is 0 minutes.",
)
@click.option(
"--checkpoint-dir",
default=config_file.DEFAULT_QUEUE_INCREMENTAL_DOWNLOAD_DIR,
help="Proceed downloading from the previous progress file stored in this directory, if it exists.\n"
"If the file does not exist, the download will initialize using the bootstrap lookback in minutes. \n",
)
@click.option(
"--force-bootstrap",
is_flag=True,
help="Forces command to start from the bootstrap lookback period and overwrite any previous checkpoint.\n"
"Default value is False.",
default=False,
)
@click.option(
"--conflict-resolution",
type=click.Choice(
[
FileConflictResolution.SKIP.name,
FileConflictResolution.OVERWRITE.name,
FileConflictResolution.CREATE_COPY.name,
],
case_sensitive=False,
),
default=FileConflictResolution.OVERWRITE.name,
help="How to handle downloads if an output file already exists:\n"
"CREATE_COPY: Download the file with a new name, appending '(1)' to the end\n"
"SKIP: Do not download the file\n"
"OVERWRITE (default): Download and replace the existing file.\n"
"Default behaviour is to OVERWRITE.",
)
@_handle_error
def incremental_output_download(
json: bool,
bootstrap_lookback_minutes: float,
checkpoint_dir: str,
force_bootstrap: bool,
**args,
):
"""
BETA - Downloads job attachments output incrementally for all jobs in a queue. When run for the
first time or with the --force-bootstrap option, it starts downloading from --bootstrap-lookback-minutes
in the past. When run each subsequent time, it loads the previous checkpoint and continues
where it left off.

# Check that download progress location is writable
if not os.access(saved_progress_checkpoint_location, os.W_OK):
raise DeadlineOperationError(
f"Download progress location {saved_progress_checkpoint_location} exists but is not writable, please provide write permissions"
)
To try this command, set the ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD environment variable to 1 to acknowledge its
incomplete beta status.

# Get a temporary config object with the standard options handled
config: Optional[ConfigParser] = _apply_cli_options_to_config(
required_options={"farm_id", "queue_id"}, **args
[NOTE] This command is still WIP and partially implemented right now.
"""
if os.environ.get("ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD") != "1":
raise DeadlineOperationError(
"The incremental-output-download command is not fully implemented. You must set the environment variable ENABLE_INCREMENTAL_OUTPUT_DOWNLOAD to 1 to acknowledge this."
)

# Get the default configs
farm_id = config_file.get_setting("defaults.farm_id", config=config)
queue_id = config_file.get_setting("defaults.queue_id", config=config)
boto3_session: boto3.Session = api.get_boto3_session(config=config)
logger: ClickLogger = ClickLogger(is_json=json)

# Get download progress file name appended by the queue id - a unique progress file exists per queue
download_progress_file_name: str = f"{queue_id}_{DOWNLOAD_PROGRESS_FILE_NAME}"
# Expand '~' to home directory and create the checkpoint directory if necessary
checkpoint_dir = os.path.abspath(os.path.expanduser(checkpoint_dir))
os.makedirs(checkpoint_dir, exist_ok=True)

# Get saved progress file full path now that we've validated all file inputs are valid
checkpoint_file_path: str = os.path.join(
saved_progress_checkpoint_location, download_progress_file_name
# Check that download progress location is writable
if not os.access(checkpoint_dir, os.W_OK):
raise DeadlineOperationError(
f"Download progress checkpoint directory {checkpoint_dir} exists but is not writable, please provide write permissions"
)

# Perform incremental download while holding a process id lock
# Get a temporary config object with the standard options handled
config: Optional[ConfigParser] = _apply_cli_options_to_config(
required_options={"farm_id", "queue_id"}, **args
)

pid_lock_file_path: str = os.path.join(
saved_progress_checkpoint_location, f"{queue_id}_{PID_FILE_NAME}"
)
# Get the default configs
farm_id = config_file.get_setting("defaults.farm_id", config=config)
queue_id = config_file.get_setting("defaults.queue_id", config=config)
boto3_session: boto3.Session = api.get_boto3_session(config=config)

with PidFileLock(
pid_lock_file_path,
operation_name="incremental output download",
print_function_callback=logger.echo,
):
current_download_state: IncrementalDownloadState

if force_bootstrap or not os.path.exists(checkpoint_file_path):
# Bootstrap with the specified lookback duration
current_download_state = IncrementalDownloadState(
downloads_started_timestamp=datetime.now(timezone.utc)
- timedelta(minutes=bootstrap_lookback_in_minutes)
)
# Get download progress file name appended by the queue id - a unique progress file exists per queue
download_checkpoint_file_name: str = f"{queue_id}_{DOWNLOAD_CHECKPOINT_FILE_NAME}"

# Get saved progress file full path now that we've validated all file inputs are valid
checkpoint_file_path: str = os.path.join(checkpoint_dir, download_checkpoint_file_name)

deadline = boto3_session.client("deadline")
response = deadline.get_queue(farmId=farm_id, queueId=queue_id)
logger.echo(f"Started incremental download for queue: {response['displayName']}")
logger.echo(f"Checkpoint: {checkpoint_file_path}")
logger.echo()

# Perform incremental download while holding a process id lock

pid_lock_file_path: str = os.path.join(checkpoint_dir, f"{queue_id}_{PID_FILE_NAME}")

with PidFileLock(
pid_lock_file_path,
operation_name="incremental output download",
):
current_download_state: IncrementalDownloadState

if force_bootstrap or not os.path.exists(checkpoint_file_path):
bootstrap_timestamp = datetime.now(timezone.utc) - timedelta(
minutes=bootstrap_lookback_minutes
)
# Bootstrap with the specified lookback duration
current_download_state = IncrementalDownloadState(
downloads_started_timestamp=bootstrap_timestamp
)

# Print the bootstrap time in local time
if force_bootstrap:
logger.echo(f"Bootstrap forced, lookback is {bootstrap_lookback_minutes} minutes")
else:
# Load the incremental download checkpoint file
current_download_state = IncrementalDownloadState.from_file(
checkpoint_file_path, logger.echo
logger.echo(
f"Checkpoint not found, lookback is {bootstrap_lookback_minutes} minutes"
)
logger.echo(f"Initializing from: {bootstrap_timestamp.astimezone().isoformat()}")
else:
# Load the incremental download checkpoint file
current_download_state = IncrementalDownloadState.from_file(checkpoint_file_path)

updated_download_state: IncrementalDownloadState = _incremental_output_download(
boto3_session=boto3_session,
farm_id=farm_id,
queue_id=queue_id,
download_state=current_download_state,
path_mapping_rules=path_mapping_rules,
print_function_callback=logger.echo,
# Print the previous download completed time in local time
logger.echo("Checkpoint found")
logger.echo(
f"Continuing from: {current_download_state.downloads_completed_timestamp.astimezone().isoformat()}"
)

# Save the checkpoint file
updated_download_state.save_file(
checkpoint_file_path,
logger.echo,
)
logger.echo()

updated_download_state: IncrementalDownloadState = _incremental_output_download(
boto3_session=boto3_session,
farm_id=farm_id,
queue_id=queue_id,
download_state=current_download_state,
print_function_callback=logger.echo,
)

# Save the checkpoint file
updated_download_state.save_file(checkpoint_file_path)
Loading
Loading