diff --git a/cloudformation/auxiliary_templates/README.md b/cloudformation/auxiliary_templates/README.md new file mode 100644 index 00000000..f5d5e292 --- /dev/null +++ b/cloudformation/auxiliary_templates/README.md @@ -0,0 +1,13 @@ +# AWS Deadline Cloud Auxiliary Templates + +## Introduction +AWS Deadline Cloud sends events to customer's default event bus https://docs.aws.amazon.com/deadline-cloud/latest/userguide/monitoring-eventbridge.html +This directory holds sample Cloud Formation Templates customers can use to customize behaviour based on events received from Deadline Cloud. + +## Setup Instructions +Each Cloud Formation Template will have its own instructions on how to set up integrations, but in general they follow the below scheme: + +1. Download the specified YAML file +2. On the AWS console, go to CloudFormation, Create Stack, and add the downloaded template. +3. Follow the specific instructions for that file, as you may need to enter specific parameters. +4. After this is set up, Deadline cloud events will reach through your setup mechanism. diff --git a/cloudformation/auxiliary_templates/custom_balancer/README.md b/cloudformation/auxiliary_templates/custom_balancer/README.md new file mode 100644 index 00000000..fe1ad5c1 --- /dev/null +++ b/cloudformation/auxiliary_templates/custom_balancer/README.md @@ -0,0 +1,59 @@ +# Deadline Custom Job Balancer + +This CloudFormation template deploys a Lambda function that controls the maxWorkerCount for each job within a Queue to implement functionality similar to the Deadline10 Balancer options. +It implements a modified version of the Deadline 10 weighting algorithm: https://docs.thinkboxsoftware.com/products/deadline/10.4/1_User%20Manual/manual/job-scheduling.html#weighted-scheduling + +## Resources Created +1. **Lambda Function**: An AWS Lambda function named `DeadlineWeightedJobBalancer` is created. This is the main driver which calculates and controls the `maxWorkerCount` setting for jobs. +2. **EventBridge Schedule**: A schedule is created to trigger the Lambda function once every minute. +3. **IAM Role**: An IAM role `BalancerSchedulerRole` is created and used by the Schedule to trigger the Lambda function. +4. **SQS Queue**: An SQS Queue `deadline-unprocessed-balancing-events` is created to act as a DeadLetterQueue for the Schedule. +5. **IAM Role**: An IAM role `DeadlineWeightedJobBalancerRole` is created to be used as the Lambda execution role for our `DeadlineWeightedJobBalancer`. + +## Prerequisites + +Before deploying this CloudFormation template, check you have the following as per your chosen method of notifications: + +1. The AWS CLI installed and configured with your AWS credentials (if setting up using CLI option). +2. A Deadline Cloud Farm and Queue deployed in your account. + +## Deployment + +You can use either of the below methods for deployment - AWS Console or AWS CLI. + +### AWS Console + +To deploy this CloudFormation template via the AWS Console, follow these steps: + +1. Save the CloudFormation template to a local file (e.g., `/Users/ \ + --template-body file://custom_queue_job_balancer.yaml \ + --parameters \ + ParameterKey=FarmId,ParameterValue=farm-XXXXXXXXX \ + ParameterKey=QueueId,ParameterValue=queue-XXXXXXXXX + ``` + + Replace `farm-XXXXXXXXX`, and `queue-XXXXXXXXX` with your actual values. + +4. Wait for the stack creation to complete. You can monitor the progress using the `aws cloudformation describe-stacks` command. + +After successful deployment & confirmation, you should start seeing the `maxWorkerCount` get automatically modified for each job in your queue. diff --git a/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml new file mode 100644 index 00000000..938e19cd --- /dev/null +++ b/cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml @@ -0,0 +1,470 @@ +Description: A Lambda Function that acts as a balancer for a Deadline Cloud Queue by modifying maxWorkers +Parameters: + FarmId: + Description: The FarmID of the Deadline Farm that you are adding a balancer for + Type: String + QueueId: + Description: The QueueID of the Deadline Queue that you are adding a balancer for + Type: String + BalanceOnJobSubmission: + Description: "Whether or not to trigger the Balancer in response to a Job Submission" + Type: String + Default: "true" + AllowedValues: + - "true" + - "false" + +Conditions: + JobSubmissionTriggersLambda: !Equals + - !Ref BalanceOnJobSubmission + - "true" +Resources: + DeadlineWeightedJobBalancer: + Type: AWS::Lambda::Function + Properties: + Environment: + Variables: + ERROR_WEIGHT: '-10' + PRIORITY_WEIGHT: '100' + RENDERING_TASK_BUFFER: '1' + RENDERING_TASK_WEIGHT: '-100' + SUBMISSION_TIME_WEIGHT: '3' + Handler: index.lambda_handler + Role: !GetAtt DeadlineWeightedJobBalancerExecutionRole.Arn + Code: + ZipFile: | + import os + import sys + import subprocess + + # Install the latest version of boto3 that has the job maxWorkerCount in it + os.makedirs("/tmp/site-packages", exist_ok=True) + sys.path.insert(0, "/tmp/site-packages") + subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "--target", "/tmp/site-packages", "boto3"]) + + import json + import boto3 + from datetime import datetime, timezone + from math import ceil + + + def lambda_handler(event, context): + deadline = boto3.client("deadline") + + # Input event matches the structure of a Job Lifecycle Status Change event + # {"detail": {"farmId": "${FarmId}", "queueId": "${QueueId}"}} + response = [] + updates = [] + farm_id = event.get("detail", {}).get("farmId", "") + queue_id = event.get("detail", {}).get("queueId", "") + previous_lifecycle_status = event.get("detail", {}).get("previousLifecycleStatus" , None) + if not farm_id and queue_id: + return {"statusCode": 422, "body": "Missing .detail.farmId or .detail.queueId"} + + if previous_lifecycle_status is not None: + return {"statusCode": 200, "body": "Balancer skipping lifecycle events that are not job submission"} + + active_fleet_ids = get_all_associated_fleet_ids(deadline, farm_id, queue_id) + total_max_worker_count = sum( + [get_max_worker_count(deadline, farm_id, x) for x in active_fleet_ids] + ) + + active_jobs = get_all_active_jobs(deadline, farm_id, queue_id) + active_job_count = len(active_jobs) + + priority_100_jobs = [job for job in active_jobs if job["priority"] == 100] + priority_100_job_count = len(priority_100_jobs) + + priority_0_jobs = [job for job in active_jobs if job["priority"] == 0] + priority_0_job_count = len(priority_0_jobs) + + if priority_100_job_count > 0: + # Priority 100 jobs get all the capacity if they exist + # so allocate all the weights to them + active_jobs = priority_100_jobs + elif priority_0_job_count < active_job_count: + # Priority 0 jobs get no capacity until all other jobs are done + # so allocate all the weights to higher priority jobs + active_jobs = [job for job in active_jobs if job["priority"] != 0] + + for job in active_jobs: + job["weight"] = calculate_job_weight(deadline, farm_id, queue_id, job) + + job_weight_sum = max(1, sum(job["weight"] for job in active_jobs)) + for job in active_jobs: + # ceil because we'd rather have too high of a limit rather than idle workers + job_max_worker_count = max( + 1, ceil((job["weight"] / job_weight_sum) * total_max_worker_count) + ) + update_response = update_job_max_worker_count( + deadline, farm_id, queue_id, job["jobId"], job_max_worker_count + ) + updates.append( + { + "name": job["name"], + "jobId": job["jobId"], + "priority": job["priority"], + "weight": job["weight"], + "workerLimit": job_max_worker_count, + "response": update_response, + } + ) + response.append({ + "totalMaxWorkerCount": total_max_worker_count, + "activeJobCount": active_job_count, + "priority100JobCount": priority_100_job_count, + "priority0JobCount": priority_0_job_count, + "allocatedJobCount": len(active_jobs), + "updates": updates + }) + print(json.dumps(response)) + return {"statusCode": 200, "body": json.dumps(response)} + + + def get_all_active_jobs(deadline, farm_id, queue_id): + active_jobs = [] + filter_expressions = { + "filters": [ + { + "groupFilter": { + "filters": [ + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "READY", + } + }, + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "ASSIGNED", + } + }, + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "SCHEDULED", + } + }, + ], + "operator": "OR", + } + }, + { + "stringFilter": { + "name": "TASK_RUN_STATUS", + "operator": "EQUAL", + "value": "RUNNING", + } + }, + ], + "operator": "OR", + } + + job_search_response = deadline.search_jobs( + filterExpressions=filter_expressions, + farmId=farm_id, + queueIds=[queue_id], + itemOffset=0, + pageSize=100, + ) + active_jobs.extend(job_search_response["jobs"]) + + while job_search_response.get("nextItemOffset"): + job_search_response = deadline.search_jobs( + filterExpressions=filter_expressions, + farmId=farm_id, + queueIds=[queue_id], + itemOffset=job_search_response.get("nextItemOffset"), + pageSize=100, + ) + active_jobs.extend(job_search_response["jobs"]) + return active_jobs + + + def get_all_associated_fleet_ids(deadline, farm_id, queue_id): + associated_fleet_ids = [] + qfa_paginator = deadline.get_paginator("list_queue_fleet_associations") + for qfa_page in qfa_paginator.paginate(farmId=farm_id, queueId=queue_id): + associated_fleet_ids.extend( + [ + qfa["fleetId"] + for qfa in qfa_page.get("queueFleetAssociations", []) + if qfa["status"] == "ACTIVE" + ] + ) + return associated_fleet_ids + + + def get_max_worker_count(deadline, farm_id, fleet_id): + fleet_info = deadline.get_fleet(farmId=farm_id, fleetId=fleet_id) + return fleet_info["maxWorkerCount"] + + + def calculate_job_weight(deadline, farm_id, queue_id, job): + job_task_run_status_counts = job["taskRunStatusCounts"] + total_rendering_tasks = sum(job_task_run_status_counts[run_status] + for run_status in ["RUNNING", "ASSIGNED", "STARTING", "SCHEDULED", "READY"]) + + # While task run statuses are available counted at the job level, task failure retry counts are not. + # Therefore, we loop through all the individual tasks to get that. + tasks = [] + steps_pages = deadline.get_paginator("list_steps").paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"]) + steps = steps_pages.build_full_result()["steps"] + # get every task across all steps + for step in steps: + tasks_pages = deadline.get_paginator("list_tasks").paginate(farmId=farm_id, queueId=queue_id, + jobId=job["jobId"], stepId=step["stepId"]) + tasks.extend(tasks_pages.build_full_result()["tasks"]) + + error_count = sum([task["failureRetryCount"] for task in tasks]) + + PW = float(os.environ["PRIORITY_WEIGHT"]) + EW = float(os.environ["ERROR_WEIGHT"]) + SW = float(os.environ["SUBMISSION_TIME_WEIGHT"]) + RW = float(os.environ["RENDERING_TASK_WEIGHT"]) + RB = float(os.environ["RENDERING_TASK_BUFFER"]) + + now = datetime.now(timezone.utc) # Get current time in UTC + time_difference = (now - job["createdAt"]).total_seconds() + + priority_factor = job["priority"] * PW + error_factor = error_count * EW + time_factor = time_difference * SW + rendering_factor = (total_rendering_tasks - RB) * RW + weight = max(1, (priority_factor * priority_factor) + error_factor + time_factor + rendering_factor) + return weight + + + def update_job_max_worker_count(deadline, farm_id, queue_id, job_id, worker_limit): + try: + update_job_result = deadline.update_job( + farmId=farm_id, queueId=queue_id, jobId=job_id, maxWorkerCount=worker_limit + ) + return { + "HTTPStatusCode": update_job_result.get("ResponseMetadata").get( + "HTTPStatusCode" + ), + "RequestId": update_job_result.get("ResponseMetadata").get("RequestId"), + } + except Exception as e: + return {"Error": str(e)} + Runtime: python3.13 + MemorySize: 128 + Timeout: 60 + Architectures: + - x86_64 + + DeadlineWeightedJobBalancerConfig: + Type: AWS::Lambda::EventInvokeConfig + Properties: + FunctionName: !Ref DeadlineWeightedJobBalancer + MaximumEventAgeInSeconds: 21600 + MaximumRetryAttempts: 2 + Qualifier: '$LATEST' + + DeadlineWeightedJobBalancerSubmissionRule: + Condition: JobSubmissionTriggersLambda + Type: AWS::Events::Rule + Properties: + Description: Responds to Job Submissions and balances the Queue + State: ENABLED + EventPattern: + source: + - "aws.deadline" + detail-type: + - "Job Lifecycle Status Change" + Targets: + - Arn: !GetAtt + - DeadlineWeightedJobBalancer + - Arn + DeadLetterConfig: + Arn: !GetAtt + - UnprocessedBalancingEventQueue + - Arn + RetryPolicy: + MaximumRetryAttempts: 1 + RoleArn: !GetAtt + - DeadlineWeightedJobBalancerSubmissionRuleRole + - Arn + DeadlineWeightedJobBalancerSubmissionRuleRole: + Condition: JobSubmissionTriggersLambda + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - events.amazonaws.com + Action: + - sts:AssumeRole + Condition: + StringEquals: + aws:SourceAccount: !Sub "${AWS::AccountId}" + Policies: + - PolicyName: Amazon-EventBridge-Rule-Execution-Policy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: + - !GetAtt DeadlineWeightedJobBalancer.Arn + - !Sub "${DeadlineWeightedJobBalancer.Arn}:*" + - Effect: Allow + Action: + - sqs:SendMessage + Resource: + - !GetAtt + - UnprocessedBalancingEventQueue + - Arn + + DeadlineWeightedJobBalancerSchedule: + Type: AWS::Scheduler::Schedule + Properties: + Description: String + FlexibleTimeWindow: + Mode: "OFF" + Name: !Join + - '' + - - !Ref 'AWS::StackName' + - BalancerScheduler + ScheduleExpression: "rate(1 minute)" + State: ENABLED + Target: + Arn: !GetAtt + - DeadlineWeightedJobBalancer + - Arn + DeadLetterConfig: + Arn: !GetAtt + - UnprocessedBalancingEventQueue + - Arn + Input: !Sub '{"detail": {"farmId": "${FarmId}", "queueId": "${QueueId}"}}' + RetryPolicy: + MaximumRetryAttempts: 1 + RoleArn: !GetAtt + - DeadlineWeightedJobBalancerScheduleRole + - Arn + + DeadlineWeightedJobBalancerScheduleRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - scheduler.amazonaws.com + Action: + - sts:AssumeRole + Condition: + StringEquals: + aws:SourceAccount: !Sub "${AWS::AccountId}" + RoleName: !Join + - '' + - - !Ref 'AWS::StackName' + - BalancerSchedulerRole + Policies: + - PolicyName: Amazon-EventBridge-Scheduler-Execution-Policy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: + - !GetAtt DeadlineWeightedJobBalancer.Arn + - !Sub "${DeadlineWeightedJobBalancer.Arn}:*" + - Effect: Allow + Action: + - sqs:SendMessage + Resource: + - !GetAtt + - UnprocessedBalancingEventQueue + - Arn + + UnprocessedBalancingEventQueue: + Type: 'AWS::SQS::Queue' + Properties: + QueueName: !Join + - "-" + - - deadline-unprocessed-balancing-events + - !Ref QueueId + UpdateReplacePolicy: Delete + DeletionPolicy: Delete + + UnprocessedBalancingEventQueuePolicy: + DependsOn: DeadlineWeightedJobBalancerSchedule + Type: 'AWS::SQS::QueuePolicy' + Properties: + PolicyDocument: + Statement: + - Action: 'sqs:SendMessage' + Condition: + ArnEquals: + 'aws:SourceArn': !GetAtt + - DeadlineWeightedJobBalancerSchedule + - Arn + Effect: Allow + Principal: + Service: events.amazonaws.com + Resource: !GetAtt + - UnprocessedBalancingEventQueue + - Arn + Version: 2012-10-17 + Queues: + - !Ref UnprocessedBalancingEventQueue + + DeadlineWeightedJobBalancerExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + RoleName: !Join + - '' + - - !Ref 'AWS::StackName' + - DeadlineWeightedJobBalancerRole + Policies: + - PolicyName: DefaultExecutionRole + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: logs:CreateLogGroup + Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*' + - Effect: Allow + Action: + - logs:CreateLogStream + - logs:PutLogEvents + Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*:*:*" + - PolicyName: LambdaPermissions + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - deadline:GetFleet + - deadline:UpdateJob + - deadline:SearchJobs + - deadline:ListSteps + - deadline:ListTasks + - deadline:ListQueueFleetAssociations + Resource: + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}" + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/fleet/*" + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/queue/${QueueId}/job/*" + - !Sub "arn:aws:deadline:${AWS::Region}:${AWS::AccountId}:farm/${FarmId}/queue/${QueueId}"