From 95cd4f1da827da5236596780df63eb7325d5457d Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Fri, 7 Feb 2025 16:02:25 -0800 Subject: [PATCH 1/8] Create a Custom Job Balancer sample This sample implements functionality similar to one of the Deadline 10 balancer options. Instead of a Priority/FIFO queue, this implements weighted balancing for jobs in the farm. There are some special edge cases here: - If there are jobs that are set to 100, the Balancer goes into a "High Priority" mode and the entire Fleet gets balanced only between jobs at 100 priority. Other jobs get paused. - Jobs at 0 priority do not get any workers UNLESS there are only jobs with priority 0 on the farm. These jobs are treated as "paused" The actual values for how many workers to assign to each job is done on a "squared weight" policy. That means that a job at priority 20 would get twice as many workers as a job at priority 10. Signed-off-by: Alex Hughes --- .../custom_queue_job_balancer.yaml | 412 ++++++++++++++++++ 1 file changed, 412 insertions(+) create mode 100644 cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml new file mode 100644 index 00000000..df2601b5 --- /dev/null +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -0,0 +1,412 @@ +Description: An AWS Serverless Application Model template describing your function. +Description: A Lambda Function that acts as a balancer for a Deadline Cloud Queue by modifying maxWorkers +Parameters: + FarmId: + Description: The FarmID of the Deadline Farm that you are adding a balancer for + Type: String + QueueId: + Description: The QueueID of the Deadline Queue that you are adding a balancer for + Type: String +Resources: + DeadlineWeightedJobBalancer: + Type: AWS::Lambda::Function + Properties: + Environment: + Variables: + ERROR_WEIGHT: '-10' + PRIORITY_WEIGHT: '100' + RENDERING_TASK_BUFFER: '1' + RENDERING_TASK_WEIGHT: '-100' + SUBMISSION_TIME_WEIGHT: '3' + Handler: index.lambda_handler + Role: !GetAtt DeadlineWeightedJobBalancerExecutionRole.Arn + Code: + ZipFile: | + import json + import boto3 + import os + from datetime import datetime, timezone + from math import ceil + + def lambda_handler(event, context): + deadline = boto3.client("deadline") + # input event is list of objects with farm and queue ids + # loop through each one and balance the job weights + response = [] + for item in event: + updates = [] + farm_id = item["farmId"] + queue_id = item["queueId"] + + active_jobs = get_all_active_jobs(deadline, farm_id, queue_id) + active_fleets = get_all_associated_fleets(deadline, farm_id, queue_id) + total_workers = sum( + [get_worker_limit(deadline, farm_id, x) for x in active_fleets] + ) + + job_weights = [ + { + "name": x["name"], + "jobId": x["jobId"], + "priority": x["priority"], + "weight": calculate_job_weights(deadline, farm_id, queue_id, x), + } + for x in active_jobs + ] + + total_weights = max(1, sum([x["weight"] for x in job_weights])) + for job_weight in job_weights: + # ceil because we'd rather have too high of a limit rather than idle workers + job_limit = max( + 1, ceil((job_weight["weight"] / total_weights) * total_workers) + ) + update_response = update_job_limit( + deadline, farm_id, queue_id, job_weight["jobId"], job_limit + ) + updates.append( + { + "name": job_weight["name"], + "jobId": job_weight["jobId"], + "priority": job_weight["priority"], + "weight": job_weight["weight"], + "workerLimit": job_limit, + "response": update_response, + } + ) + response.append({"totalWorkers": total_workers, "updates": updates}) + print(json.dumps(response)) + return {"statusCode": 200, "body": json.dumps(response)} + + + def get_all_active_jobs(deadline, farm_id, queue_id): + active_jobs = [] + filter_expressions = { + "filters": [ + { + "groupFilter": { + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "READY", + } + }, + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "ASSIGNED", + } + }, + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "SCHEDULED", + } + }, + ], + "operator": "OR", + } + }, + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "RUNNING", + } + }, + ], + "operator": "OR", + } + + job_search_response = deadline.search_jobs( + filterExpressions=filter_expressions, + farmId=farm_id, + queueIds=[queue_id], + itemOffset=0, + pageSize=100, + ) + active_jobs.extend(job_search_response["jobs"]) + + while job_search_response.get("nextItemOffset"): + job_search_response = deadline.search_jobs( + filterExpressions=filter_expressions, + farmId=farm_id, + queueIds=[queue_id], + itemOffset=job_search_response.get("nextItemOffset"), + pageSize=100, + ) + active_jobs.extend(job_search_response["jobs"]) + return active_jobs + + + def get_all_associated_fleets(deadline, farm_id, queue_id): + associated_fleets = [] + response = deadline.list_queue_fleet_associations(farmId=farm_id, queueId=queue_id) + associated_fleets.extend( + [ + x["fleetId"] + for x in response["queueFleetAssociations"] + if x["status"] == "ACTIVE" + ] + ) + + while response.get("nextToken"): + response = deadline.list_queue_fleet_associations( + farmId=farm_id, queueId=queue_id, nextToken=response.get("nextToken") + ) + associated_fleets.extend( + [ + x["fleetId"] + for x in response["queueFleetAssociations"] + if x["status"] == "ACTIVE" + ] + ) + return associated_fleets + + + def get_worker_limit(deadline, farm_id, fleet_id): + fleet_info = deadline.get_fleet(farmId=farm_id, fleetId=fleet_id) + return fleet_info["maxWorkerCount"] + + + def calculate_job_weights(deadline, farm_id, queue_id, job): + # get every step + steps = [] + steps_response = deadline.list_steps( + farmId=farm_id, + queueId=queue_id, + jobId=job["jobId"], + ) + steps.extend(steps_response["steps"]) + while steps_response.get("nextToken"): + steps_response = deadline.list_steps( + farmId=farm_id, + queueId=queue_id, + jobId=job["jobId"], + nextToken=steps_response.get("nextToken"), + ) + steps.extend(steps_response["steps"]) + + # get every task + tasks = [] + for step in steps: + task_response = deadline.list_tasks( + farmId=farm_id, + queueId=queue_id, + stepId=step["stepId"], + jobId=job["jobId"], + ) + tasks.extend(task_response["tasks"]) + while task_response.get("nextToken"): + task_response = deadline.list_tasks( + farmId=farm_id, + queueId=queue_id, + stepId=step["stepId"], + jobId=job["jobId"], + nextToken=task_response.get("nextToken"), + ) + tasks.extend(task_response["tasks"]) + + error_count = sum([x["failureRetryCount"] for x in tasks]) + # total_rendering_tasks = sum([x["runStatus"] in ["RUNNING", "ASSIGNED", "STARTING", "SCHEDULED", "READY"] for x in tasks]) + total_rendering_tasks = sum( + [ + x["runStatus"] in ["RUNNING", "ASSIGNED", "STARTING", "SCHEDULED"] + for x in tasks + ] + ) + + PW = float(os.environ["PRIORITY_WEIGHT"]) + EW = float(os.environ["ERROR_WEIGHT"]) + SW = float(os.environ["SUBMISSION_TIME_WEIGHT"]) + RW = float(os.environ["RENDERING_TASK_WEIGHT"]) + RB = float(os.environ["RENDERING_TASK_BUFFER"]) + + now = datetime.now(timezone.utc) # Get current time in UTC + time_difference = (now - job["createdAt"]).total_seconds() + + priority_factor = job["priority"] * PW + error_factor = error_count * EW + time_factor = time_difference * SW + rendering_factor = (total_rendering_tasks - RB) * RW + weight = max(1, priority_factor + error_factor + time_factor + rendering_factor) + return weight + + + def update_job_limit(deadline, farm_id, queue_id, job_id, worker_limit): + try: + update_job_result = deadline.update_job( + farmId=farm_id, queueId=queue_id, jobId=job_id, maxWorkerCount=worker_limit + ) + return { + "HTTPStatusCode": update_job_result.get("ResponseMetadata").get( + "HTTPStatusCode" + ), + "RequestId": update_job_result.get("ResponseMetadata").get("RequestId"), + } + except Exception as e: + return {"Error": str(e)} + Runtime: python3.13 + MemorySize: 128 + Timeout: 60 + Architectures: + - x86_64 + + DeadlineWeightedJobBalancerConfig: + Type: AWS::Lambda::EventInvokeConfig + Properties: + FunctionName: !Ref DeadlineWeightedJobBalancer + MaximumEventAgeInSeconds: 21600 + MaximumRetryAttempts: 2 + Qualifier: '$LATEST' + + DeadlineWeightedJobBalancerSchedule: + Type: AWS::Scheduler::Schedule + Properties: + Description: String + FlexibleTimeWindow: + Mode: "OFF" + Name: !Join + - '' + - - !Ref 'AWS::StackName' + - BalancerScheduler + ScheduleExpression: "rate(1 minute)" + State: ENABLED + Target: + Arn: !GetAtt + - DeadlineWeightedJobBalancer + - Arn + DeadLetterConfig: + Arn: !GetAtt + - UnprocessedBalancingEventQueue + - Arn + Input: !Sub '[{"farmId": "${FarmId}", "queueId": "${QueueId}"}]' + RetryPolicy: + MaximumRetryAttempts: 1 + RoleArn: !GetAtt + - DeadlineWeightedJobBalancerScheduleRole + - Arn + + DeadlineWeightedJobBalancerScheduleRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - scheduler.amazonaws.com + Action: + - sts:AssumeRole + Condition: + StringEquals: + aws:SourceAccount: !Sub "${AWS::AccountId}" + RoleName: !Join + - '' + - - !Ref 'AWS::StackName' + - BalancerSchedulerRole + Policies: + - PolicyName: Amazon-EventBridge-Scheduler-Execution-Policy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: + - !GetAtt DeadlineWeightedJobBalancer.Arn + - !Sub "${DeadlineWeightedJobBalancer.Arn}:*" + - Effect: Allow + Action: + - sqs:SendMessage + Resource: + - !GetAtt + - UnprocessedBalancingEventQueue + - Arn + + UnprocessedBalancingEventQueue: + Type: 'AWS::SQS::Queue' + Properties: + QueueName: !Join + - "-" + - - deadline-unprocessed-balancing-events + - !Ref QueueId + UpdateReplacePolicy: Delete + DeletionPolicy: Delete + + UnprocessedBalancingEventQueuePolicy: + DependsOn: DeadlineWeightedJobBalancerSchedule + Type: 'AWS::SQS::QueuePolicy' + Properties: + PolicyDocument: + Statement: + - Action: 'sqs:SendMessage' + Condition: + ArnEquals: + 'aws:SourceArn': !GetAtt + - DeadlineWeightedJobBalancerSchedule + - Arn + Effect: Allow + Principal: + Service: events.amazonaws.com + Resource: !GetAtt + - UnprocessedBalancingEventQueue + - Arn + Version: 2012-10-17 + Queues: + - !Ref UnprocessedBalancingEventQueue + + DeadlineWeightedJobBalancerExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + RoleName: !Join + - '' + - - !Ref 'AWS::StackName' + - DeadlineWeightedJobBalancerRole + Policies: + - PolicyName: DefaultExecutionRole + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: logs:CreateLogGroup + Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*' + - Effect: Allow + Action: + - logs:CreateLogStream + - logs:PutLogEvents + Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*:*:*" + - PolicyName: LambdaPermissions + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - deadline:GetFleet + - deadline:UpdateJob + - deadline:SearchJobs + - deadline:ListSteps + - deadline:ListTasks + - deadline:ListQueueFleetAssociations + Resource: + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}" + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/fleet/*" + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/queue/${QueueId}/job/*" + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/queue/${QueueId}" + - Effect: Allow + Action: + - identitystore:ListGroupMembershipsForMember + Resource: '*' From ab1dc5b91d4581b9a732165799ceff9ef47408fc Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Fri, 7 Feb 2025 16:02:42 -0800 Subject: [PATCH 2/8] Implement nice README files for our new balancer Signed-off-by: Alex Hughes --- cloudformation/auxiliary_templates/README.md | 13 ++++ .../custom_balancer/README.md | 59 +++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 cloudformation/auxiliary_templates/README.md create mode 100644 cloudformation/auxiliary_templates/custom_balancer/README.md diff --git a/cloudformation/auxiliary_templates/README.md b/cloudformation/auxiliary_templates/README.md new file mode 100644 index 00000000..f5d5e292 --- /dev/null +++ b/cloudformation/auxiliary_templates/README.md @@ -0,0 +1,13 @@ +# AWS Deadline Cloud Auxiliary Templates + +## Introduction +AWS Deadline Cloud sends events to customer's default event bus https://docs.aws.amazon.com/deadline-cloud/latest/userguide/monitoring-eventbridge.html +This directory holds sample Cloud Formation Templates customers can use to customize behaviour based on events received from Deadline Cloud. + +## Setup Instructions +Each Cloud Formation Template will have its own instructions on how to set up integrations, but in general they follow the below scheme: + +1. Download the specified YAML file +2. On the AWS console, go to CloudFormation, Create Stack, and add the downloaded template. +3. Follow the specific instructions for that file, as you may need to enter specific parameters. +4. After this is set up, Deadline cloud events will reach through your setup mechanism. diff --git a/cloudformation/auxiliary_templates/custom_balancer/README.md b/cloudformation/auxiliary_templates/custom_balancer/README.md new file mode 100644 index 00000000..fe1ad5c1 --- /dev/null +++ b/cloudformation/auxiliary_templates/custom_balancer/README.md @@ -0,0 +1,59 @@ +# Deadline Custom Job Balancer + +This CloudFormation template deploys a Lambda function that controls the maxWorkerCount for each job within a Queue to implement functionality similar to the Deadline10 Balancer options. +It implements a modified version of the Deadline 10 weighting algorithm: https://docs.thinkboxsoftware.com/products/deadline/10.4/1_User%20Manual/manual/job-scheduling.html#weighted-scheduling + +## Resources Created +1. **Lambda Function**: An AWS Lambda function named `DeadlineWeightedJobBalancer` is created. This is the main driver which calculates and controls the `maxWorkerCount` setting for jobs. +2. **EventBridge Schedule**: A schedule is created to trigger the Lambda function once every minute. +3. **IAM Role**: An IAM role `BalancerSchedulerRole` is created and used by the Schedule to trigger the Lambda function. +4. **SQS Queue**: An SQS Queue `deadline-unprocessed-balancing-events` is created to act as a DeadLetterQueue for the Schedule. +5. **IAM Role**: An IAM role `DeadlineWeightedJobBalancerRole` is created to be used as the Lambda execution role for our `DeadlineWeightedJobBalancer`. + +## Prerequisites + +Before deploying this CloudFormation template, check you have the following as per your chosen method of notifications: + +1. The AWS CLI installed and configured with your AWS credentials (if setting up using CLI option). +2. A Deadline Cloud Farm and Queue deployed in your account. + +## Deployment + +You can use either of the below methods for deployment - AWS Console or AWS CLI. + +### AWS Console + +To deploy this CloudFormation template via the AWS Console, follow these steps: + +1. Save the CloudFormation template to a local file (e.g., `/Users/ \ + --template-body file://custom_queue_job_balancer.yaml \ + --parameters \ + ParameterKey=FarmId,ParameterValue=farm-XXXXXXXXX \ + ParameterKey=QueueId,ParameterValue=queue-XXXXXXXXX + ``` + + Replace `farm-XXXXXXXXX`, and `queue-XXXXXXXXX` with your actual values. + +4. Wait for the stack creation to complete. You can monitor the progress using the `aws cloudformation describe-stacks` command. + +After successful deployment & confirmation, you should start seeing the `maxWorkerCount` get automatically modified for each job in your queue. From 6ac4a12b36037c184e4ee5895700a7a4c5f38dd3 Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Fri, 7 Feb 2025 16:40:34 -0800 Subject: [PATCH 3/8] Implement our balancing adjustments Jobs at 100 priority should be the only ones that matter If there are no jobs at 100, only jobs that have priority above 0 should matter If there are only jobs with priority of 0, start allocating to those ones Refactor our edge case logic to filter out invalid jobs This simplifies logic to only operate on jobs that are valid in our current context Signed-off-by: Alex Hughes --- .../custom_queue_job_balancer.yaml | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml index df2601b5..a4a8bd7f 100644 --- a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -44,6 +44,19 @@ Resources: [get_worker_limit(deadline, farm_id, x) for x in active_fleets] ) + paused_job_count = get_paused_job_count(active_jobs) + has_high_priority_job = has_high_priority_jobs(active_jobs) + + # Handle "Only Paused Jobs" jobs where we now can allocate workers to jobs with 0 priority + if paused_job_count != len(active_jobs): + # Filter out any jobs that are set to priority 0 UNLESS they are all set to 0 + active_jobs = filter(lambda job: job["priority"] != 0, active_jobs) + + if has_high_priority_job: + # Handle "High Priority" jobs where we should only balance to jobs with 100 priority + # Filter out any jobs that are not priority 100 if we have at least one job at priority 100 + active_jobs = filter(lambda job: job["priority"] == 100, active_jobs) + job_weights = [ { "name": x["name"], @@ -78,6 +91,16 @@ Resources: return {"statusCode": 200, "body": json.dumps(response)} + def has_high_priority_jobs(active_jobs): + for job in active_jobs: + if job["priority"] == 100: + return True + return False + + def get_paused_job_count(active_jobs): + return len(list(filter(lambda job: job["priority"] == 0, active_jobs))) + + def get_all_active_jobs(deadline, farm_id, queue_id): active_jobs = [] filter_expressions = { From 0cc5b19b0be2742ddafea3cff574861b65adf30b Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Fri, 7 Feb 2025 16:41:35 -0800 Subject: [PATCH 4/8] Square the priority factor Signed-off-by: Alex Hughes --- .../custom_balancer/custom_queue_job_balancer.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml index a4a8bd7f..b1446bdc 100644 --- a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -255,7 +255,7 @@ Resources: error_factor = error_count * EW time_factor = time_difference * SW rendering_factor = (total_rendering_tasks - RB) * RW - weight = max(1, priority_factor + error_factor + time_factor + rendering_factor) + weight = max(1, (priority_factor * priority_factor) + error_factor + time_factor + rendering_factor) return weight From 932a2f183084d5d1be58d05fb30b6c0e1e317f17 Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Fri, 7 Mar 2025 10:42:56 -0800 Subject: [PATCH 5/8] Update lambda code to use paginators Signed-off-by: Alex Hughes --- .../custom_queue_job_balancer.yaml | 54 ++++--------------- 1 file changed, 10 insertions(+), 44 deletions(-) diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml index b1446bdc..2011559c 100644 --- a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -167,23 +167,12 @@ Resources: def get_all_associated_fleets(deadline, farm_id, queue_id): associated_fleets = [] - response = deadline.list_queue_fleet_associations(farmId=farm_id, queueId=queue_id) - associated_fleets.extend( - [ - x["fleetId"] - for x in response["queueFleetAssociations"] - if x["status"] == "ACTIVE" - ] - ) - - while response.get("nextToken"): - response = deadline.list_queue_fleet_associations( - farmId=farm_id, queueId=queue_id, nextToken=response.get("nextToken") - ) + qfa_paginator = deadline.get_paginator("list_queue_fleet_associations") + for qfa_page in qfa_paginator.paginate(farmId=farm_id, queueId=queue_id): associated_fleets.extend( [ x["fleetId"] - for x in response["queueFleetAssociations"] + for x in qfa_page.get("queueFleetAssociations", []) if x["status"] == "ACTIVE" ] ) @@ -198,40 +187,17 @@ Resources: def calculate_job_weights(deadline, farm_id, queue_id, job): # get every step steps = [] - steps_response = deadline.list_steps( - farmId=farm_id, - queueId=queue_id, - jobId=job["jobId"], - ) - steps.extend(steps_response["steps"]) - while steps_response.get("nextToken"): - steps_response = deadline.list_steps( - farmId=farm_id, - queueId=queue_id, - jobId=job["jobId"], - nextToken=steps_response.get("nextToken"), - ) - steps.extend(steps_response["steps"]) + + steps_paginator = deadline.get_paginator("list_steps") + for step_page in steps_paginator.paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"]): + steps.extend(step_page.get("steps", [])) # get every task tasks = [] for step in steps: - task_response = deadline.list_tasks( - farmId=farm_id, - queueId=queue_id, - stepId=step["stepId"], - jobId=job["jobId"], - ) - tasks.extend(task_response["tasks"]) - while task_response.get("nextToken"): - task_response = deadline.list_tasks( - farmId=farm_id, - queueId=queue_id, - stepId=step["stepId"], - jobId=job["jobId"], - nextToken=task_response.get("nextToken"), - ) - tasks.extend(task_response["tasks"]) + tasks_paginator = deadline.get_paginator("list_tasks") + for task_page in tasks_paginator.paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"], stepId=step["stepId"]): + tasks.extend(task_page.get("tasks", [])) error_count = sum([x["failureRetryCount"] for x in tasks]) # total_rendering_tasks = sum([x["runStatus"] in ["RUNNING", "ASSIGNED", "STARTING", "SCHEDULED", "READY"] for x in tasks]) From 00a5bd585ece30d2c6835d268834aab4d7e752d2 Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Mon, 24 Mar 2025 08:45:48 -0700 Subject: [PATCH 6/8] refactor: Adjusting the custom balancer code Credit @mwiebe Signed-off-by: Alex Hughes --- .../custom_queue_job_balancer.yaml | 150 +++++++++--------- 1 file changed, 71 insertions(+), 79 deletions(-) diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml index 2011559c..f4cfa34a 100644 --- a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -1,4 +1,3 @@ -Description: An AWS Serverless Application Model template describing your function. Description: A Lambda Function that acts as a balancer for a Deadline Cloud Queue by modifying maxWorkers Parameters: FarmId: @@ -22,14 +21,24 @@ Resources: Role: !GetAtt DeadlineWeightedJobBalancerExecutionRole.Arn Code: ZipFile: | + import os + import sys + import subprocess + + # Install the latest version of boto3 that has the job maxWorkerCount in it + os.makedirs("/tmp/site-packages", exist_ok=True) + sys.path.insert(0, "/tmp/site-packages") + subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "--target", "/tmp/site-packages", "boto3"]) + import json import boto3 - import os from datetime import datetime, timezone from math import ceil + def lambda_handler(event, context): deadline = boto3.client("deadline") + # input event is list of objects with farm and queue ids # loop through each one and balance the job weights response = [] @@ -38,69 +47,63 @@ Resources: farm_id = item["farmId"] queue_id = item["queueId"] - active_jobs = get_all_active_jobs(deadline, farm_id, queue_id) - active_fleets = get_all_associated_fleets(deadline, farm_id, queue_id) - total_workers = sum( - [get_worker_limit(deadline, farm_id, x) for x in active_fleets] + active_fleet_ids = get_all_associated_fleet_ids(deadline, farm_id, queue_id) + total_max_worker_count = sum( + [get_max_worker_count(deadline, farm_id, x) for x in active_fleet_ids] ) - paused_job_count = get_paused_job_count(active_jobs) - has_high_priority_job = has_high_priority_jobs(active_jobs) + active_jobs = get_all_active_jobs(deadline, farm_id, queue_id) + active_job_count = len(active_jobs) - # Handle "Only Paused Jobs" jobs where we now can allocate workers to jobs with 0 priority - if paused_job_count != len(active_jobs): - # Filter out any jobs that are set to priority 0 UNLESS they are all set to 0 - active_jobs = filter(lambda job: job["priority"] != 0, active_jobs) + priority_100_jobs = [job for job in active_jobs if job["priority"] == 100] + priority_100_job_count = len(priority_100_jobs) - if has_high_priority_job: - # Handle "High Priority" jobs where we should only balance to jobs with 100 priority - # Filter out any jobs that are not priority 100 if we have at least one job at priority 100 - active_jobs = filter(lambda job: job["priority"] == 100, active_jobs) + priority_0_jobs = [job for job in active_jobs if job["priority"] == 0] + priority_0_job_count = len(priority_0_jobs) - job_weights = [ - { - "name": x["name"], - "jobId": x["jobId"], - "priority": x["priority"], - "weight": calculate_job_weights(deadline, farm_id, queue_id, x), - } - for x in active_jobs - ] + if priority_100_job_count > 0: + # Priority 100 jobs get all the capacity if they exist + # so allocate all the weights to them + active_jobs = priority_100_jobs + elif priority_0_job_count < active_job_count: + # Priority 0 jobs get no capacity until all other jobs are done + # so allocate all the weights to higher priority jobs + active_jobs = [job for job in active_jobs if job["priority"] != 0] - total_weights = max(1, sum([x["weight"] for x in job_weights])) - for job_weight in job_weights: + for job in active_jobs: + job["weight"] = calculate_job_weight(deadline, farm_id, queue_id, job) + + job_weight_sum = max(1, sum(job["weight"] for job in active_jobs)) + for job in active_jobs: # ceil because we'd rather have too high of a limit rather than idle workers - job_limit = max( - 1, ceil((job_weight["weight"] / total_weights) * total_workers) + job_max_worker_count = max( + 1, ceil((job["weight"] / job_weight_sum) * total_max_worker_count) ) - update_response = update_job_limit( - deadline, farm_id, queue_id, job_weight["jobId"], job_limit + update_response = update_job_max_worker_count( + deadline, farm_id, queue_id, job["jobId"], job_max_worker_count ) updates.append( { - "name": job_weight["name"], - "jobId": job_weight["jobId"], - "priority": job_weight["priority"], - "weight": job_weight["weight"], - "workerLimit": job_limit, + "name": job["name"], + "jobId": job["jobId"], + "priority": job["priority"], + "weight": job["weight"], + "workerLimit": job_max_worker_count, "response": update_response, } ) - response.append({"totalWorkers": total_workers, "updates": updates}) + response.append({ + "totalMaxWorkerCount": total_max_worker_count, + "activeJobCount": active_job_count, + "priority100JobCount": priority_100_job_count, + "priority0JobCount": priority_0_job_count, + "allocatedJobCount": len(active_jobs), + "updates": updates + }) print(json.dumps(response)) return {"statusCode": 200, "body": json.dumps(response)} - def has_high_priority_jobs(active_jobs): - for job in active_jobs: - if job["priority"] == 100: - return True - return False - - def get_paused_job_count(active_jobs): - return len(list(filter(lambda job: job["priority"] == 0, active_jobs))) - - def get_all_active_jobs(deadline, farm_id, queue_id): active_jobs = [] filter_expressions = { @@ -165,48 +168,41 @@ Resources: return active_jobs - def get_all_associated_fleets(deadline, farm_id, queue_id): - associated_fleets = [] + def get_all_associated_fleet_ids(deadline, farm_id, queue_id): + associated_fleet_ids = [] qfa_paginator = deadline.get_paginator("list_queue_fleet_associations") for qfa_page in qfa_paginator.paginate(farmId=farm_id, queueId=queue_id): - associated_fleets.extend( + associated_fleet_ids.extend( [ - x["fleetId"] - for x in qfa_page.get("queueFleetAssociations", []) - if x["status"] == "ACTIVE" + qfa["fleetId"] + for qfa in qfa_page.get("queueFleetAssociations", []) + if qfa["status"] == "ACTIVE" ] ) - return associated_fleets + return associated_fleet_ids - def get_worker_limit(deadline, farm_id, fleet_id): + def get_max_worker_count(deadline, farm_id, fleet_id): fleet_info = deadline.get_fleet(farmId=farm_id, fleetId=fleet_id) return fleet_info["maxWorkerCount"] - def calculate_job_weights(deadline, farm_id, queue_id, job): - # get every step - steps = [] + def calculate_job_weight(deadline, farm_id, queue_id, job): + job_task_run_status_counts = job["taskRunStatusCounts"] + total_rendering_tasks = sum(job_task_run_status_counts[run_status] + for run_status in ["RUNNING", "ASSIGNED", "STARTING", "SCHEDULED", "READY"]) - steps_paginator = deadline.get_paginator("list_steps") - for step_page in steps_paginator.paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"]): - steps.extend(step_page.get("steps", [])) - - # get every task + # While task run statuses are available counted at the job level, task failure retry counts are not. + # Therefore, we loop through all the individual tasks to get that. tasks = [] + steps_pages = deadline.get_paginator("list_steps").paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"]) + steps = steps_pages.build_full_result()["steps"] + # get every task across all steps for step in steps: - tasks_paginator = deadline.get_paginator("list_tasks") - for task_page in tasks_paginator.paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"], stepId=step["stepId"]): - tasks.extend(task_page.get("tasks", [])) + tasks_pages = deadline.get_paginator("list_tasks").paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"], stepId=step["stepId"]) + tasks.extend(tasks_pages.build_full_result()["tasks"]) - error_count = sum([x["failureRetryCount"] for x in tasks]) - # total_rendering_tasks = sum([x["runStatus"] in ["RUNNING", "ASSIGNED", "STARTING", "SCHEDULED", "READY"] for x in tasks]) - total_rendering_tasks = sum( - [ - x["runStatus"] in ["RUNNING", "ASSIGNED", "STARTING", "SCHEDULED"] - for x in tasks - ] - ) + error_count = sum([task["failureRetryCount"] for task in tasks]) PW = float(os.environ["PRIORITY_WEIGHT"]) EW = float(os.environ["ERROR_WEIGHT"]) @@ -225,7 +221,7 @@ Resources: return weight - def update_job_limit(deadline, farm_id, queue_id, job_id, worker_limit): + def update_job_max_worker_count(deadline, farm_id, queue_id, job_id, worker_limit): try: update_job_result = deadline.update_job( farmId=farm_id, queueId=queue_id, jobId=job_id, maxWorkerCount=worker_limit @@ -395,7 +391,3 @@ Resources: - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/fleet/*" - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/queue/${QueueId}/job/*" - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/queue/${QueueId}" - - Effect: Allow - Action: - - identitystore:ListGroupMembershipsForMember - Resource: '*' From 74a77f1d5d0b5dcb7159c9ef18f25cb5c349e134 Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Mon, 24 Mar 2025 09:15:58 -0700 Subject: [PATCH 7/8] Refactor Lambda to expect event to match Job Lifecycle Status Change event Signed-off-by: Alex Hughes --- .../custom_queue_job_balancer.yaml | 116 +++++++++--------- 1 file changed, 61 insertions(+), 55 deletions(-) diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml index f4cfa34a..0906d55d 100644 --- a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -39,67 +39,72 @@ Resources: def lambda_handler(event, context): deadline = boto3.client("deadline") - # input event is list of objects with farm and queue ids - # loop through each one and balance the job weights + # Input event matches the structure of a Job Lifecycle Status Change event + # {"detail": {"farmId": "${FarmId}", "queueId": "${QueueId}"}} response = [] - for item in event: - updates = [] - farm_id = item["farmId"] - queue_id = item["queueId"] + updates = [] + farm_id = event.get("detail", {}).get("farmId", "") + queue_id = event.get("detail", {}).get("queueId", "") + previous_lifecycle_status = event.get("detail", {}).get("previousLifecycleStatus" , None) + if not farm_id and queue_id: + return {"statusCode": 422, "body": "Missing .detail.farmId or .detail.queueId"} - active_fleet_ids = get_all_associated_fleet_ids(deadline, farm_id, queue_id) - total_max_worker_count = sum( - [get_max_worker_count(deadline, farm_id, x) for x in active_fleet_ids] - ) + if previous_lifecycle_status is not None: + return {"statusCode": 200, "body": "Balancer skipping lifecycle events that are not job submission"} + + active_fleet_ids = get_all_associated_fleet_ids(deadline, farm_id, queue_id) + total_max_worker_count = sum( + [get_max_worker_count(deadline, farm_id, x) for x in active_fleet_ids] + ) - active_jobs = get_all_active_jobs(deadline, farm_id, queue_id) - active_job_count = len(active_jobs) + active_jobs = get_all_active_jobs(deadline, farm_id, queue_id) + active_job_count = len(active_jobs) - priority_100_jobs = [job for job in active_jobs if job["priority"] == 100] - priority_100_job_count = len(priority_100_jobs) + priority_100_jobs = [job for job in active_jobs if job["priority"] == 100] + priority_100_job_count = len(priority_100_jobs) - priority_0_jobs = [job for job in active_jobs if job["priority"] == 0] - priority_0_job_count = len(priority_0_jobs) + priority_0_jobs = [job for job in active_jobs if job["priority"] == 0] + priority_0_job_count = len(priority_0_jobs) - if priority_100_job_count > 0: - # Priority 100 jobs get all the capacity if they exist - # so allocate all the weights to them - active_jobs = priority_100_jobs - elif priority_0_job_count < active_job_count: - # Priority 0 jobs get no capacity until all other jobs are done - # so allocate all the weights to higher priority jobs - active_jobs = [job for job in active_jobs if job["priority"] != 0] + if priority_100_job_count > 0: + # Priority 100 jobs get all the capacity if they exist + # so allocate all the weights to them + active_jobs = priority_100_jobs + elif priority_0_job_count < active_job_count: + # Priority 0 jobs get no capacity until all other jobs are done + # so allocate all the weights to higher priority jobs + active_jobs = [job for job in active_jobs if job["priority"] != 0] - for job in active_jobs: - job["weight"] = calculate_job_weight(deadline, farm_id, queue_id, job) + for job in active_jobs: + job["weight"] = calculate_job_weight(deadline, farm_id, queue_id, job) - job_weight_sum = max(1, sum(job["weight"] for job in active_jobs)) - for job in active_jobs: - # ceil because we'd rather have too high of a limit rather than idle workers - job_max_worker_count = max( - 1, ceil((job["weight"] / job_weight_sum) * total_max_worker_count) - ) - update_response = update_job_max_worker_count( - deadline, farm_id, queue_id, job["jobId"], job_max_worker_count - ) - updates.append( - { - "name": job["name"], - "jobId": job["jobId"], - "priority": job["priority"], - "weight": job["weight"], - "workerLimit": job_max_worker_count, - "response": update_response, - } - ) - response.append({ - "totalMaxWorkerCount": total_max_worker_count, - "activeJobCount": active_job_count, - "priority100JobCount": priority_100_job_count, - "priority0JobCount": priority_0_job_count, - "allocatedJobCount": len(active_jobs), - "updates": updates - }) + job_weight_sum = max(1, sum(job["weight"] for job in active_jobs)) + for job in active_jobs: + # ceil because we'd rather have too high of a limit rather than idle workers + job_max_worker_count = max( + 1, ceil((job["weight"] / job_weight_sum) * total_max_worker_count) + ) + update_response = update_job_max_worker_count( + deadline, farm_id, queue_id, job["jobId"], job_max_worker_count + ) + updates.append( + { + "name": job["name"], + "jobId": job["jobId"], + "priority": job["priority"], + "weight": job["weight"], + "workerLimit": job_max_worker_count, + "response": update_response, + } + ) + response.append({ + "totalMaxWorkerCount": total_max_worker_count, + "activeJobCount": active_job_count, + "priority100JobCount": priority_100_job_count, + "priority0JobCount": priority_0_job_count, + "allocatedJobCount": len(active_jobs), + "updates": updates + }) print(json.dumps(response)) return {"statusCode": 200, "body": json.dumps(response)} @@ -199,7 +204,8 @@ Resources: steps = steps_pages.build_full_result()["steps"] # get every task across all steps for step in steps: - tasks_pages = deadline.get_paginator("list_tasks").paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"], stepId=step["stepId"]) + tasks_pages = deadline.get_paginator("list_tasks").paginate(farmId=farm_id, queueId=queue_id, + jobId=job["jobId"], stepId=step["stepId"]) tasks.extend(tasks_pages.build_full_result()["tasks"]) error_count = sum([task["failureRetryCount"] for task in tasks]) @@ -268,7 +274,7 @@ Resources: Arn: !GetAtt - UnprocessedBalancingEventQueue - Arn - Input: !Sub '[{"farmId": "${FarmId}", "queueId": "${QueueId}"}]' + Input: !Sub '{"detail": {"farmId": "${FarmId}", "queueId": "${QueueId}"}}' RetryPolicy: MaximumRetryAttempts: 1 RoleArn: !GetAtt From fccbd5386d3c72a5c971eb9a3d250d32a2edd8df Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Mon, 24 Mar 2025 09:16:25 -0700 Subject: [PATCH 8/8] Add support for responding to Job Lifecycle Status Change events Signed-off-by: Alex Hughes --- .../custom_queue_job_balancer.yaml | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml index 0906d55d..938e19cd 100644 --- a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -6,6 +6,18 @@ Parameters: QueueId: Description: The QueueID of the Deadline Queue that you are adding a balancer for Type: String + BalanceOnJobSubmission: + Description: "Whether or not to trigger the Balancer in response to a Job Submission" + Type: String + Default: "true" + AllowedValues: + - "true" + - "false" + +Conditions: + JobSubmissionTriggersLambda: !Equals + - !Ref BalanceOnJobSubmission + - "true" Resources: DeadlineWeightedJobBalancer: Type: AWS::Lambda::Function @@ -254,6 +266,65 @@ Resources: MaximumRetryAttempts: 2 Qualifier: '$LATEST' + DeadlineWeightedJobBalancerSubmissionRule: + Condition: JobSubmissionTriggersLambda + Type: AWS::Events::Rule + Properties: + Description: Responds to Job Submissions and balances the Queue + State: ENABLED + EventPattern: + source: + - "aws.deadline" + detail-type: + - "Job Lifecycle Status Change" + Targets: + - Arn: !GetAtt + - DeadlineWeightedJobBalancer + - Arn + DeadLetterConfig: + Arn: !GetAtt + - UnprocessedBalancingEventQueue + - Arn + RetryPolicy: + MaximumRetryAttempts: 1 + RoleArn: !GetAtt + - DeadlineWeightedJobBalancerSubmissionRuleRole + - Arn + DeadlineWeightedJobBalancerSubmissionRuleRole: + Condition: JobSubmissionTriggersLambda + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - events.amazonaws.com + Action: + - sts:AssumeRole + Condition: + StringEquals: + aws:SourceAccount: !Sub "${AWS::AccountId}" + Policies: + - PolicyName: Amazon-EventBridge-Rule-Execution-Policy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: + - !GetAtt DeadlineWeightedJobBalancer.Arn + - !Sub "${DeadlineWeightedJobBalancer.Arn}:*" + - Effect: Allow + Action: + - sqs:SendMessage + Resource: + - !GetAtt + - UnprocessedBalancingEventQueue + - Arn + DeadlineWeightedJobBalancerSchedule: Type: AWS::Scheduler::Schedule Properties: