From 202d7a342f2118ec3de4d85d41c5b63b85354c88 Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Thu, 18 Sep 2025 17:39:55 -0500 Subject: [PATCH 1/8] Post Process: Increase lambda mem to 256 MB and add pyarrow to lambda layer --- lambda_layers/postprocess_dependencies/requirements.txt | 1 + terraform/modules/app/main.tf | 1 + 2 files changed, 2 insertions(+) diff --git a/lambda_layers/postprocess_dependencies/requirements.txt b/lambda_layers/postprocess_dependencies/requirements.txt index e2ee941..470a16b 100644 --- a/lambda_layers/postprocess_dependencies/requirements.txt +++ b/lambda_layers/postprocess_dependencies/requirements.txt @@ -1,3 +1,4 @@ pandas==2.3.2 netcdf4==1.7.2 xarray==2025.07.1 +pyarrow==21.0.0 diff --git a/terraform/modules/app/main.tf b/terraform/modules/app/main.tf index 55aff0f..46c1c80 100644 --- a/terraform/modules/app/main.tf +++ b/terraform/modules/app/main.tf @@ -162,6 +162,7 @@ resource "aws_lambda_function" "post_process" { handler = "post_process_lambda.lambda_handler" runtime = "python3.12" timeout = 300 + memory_size = 256 layers = [aws_lambda_layer_version.post_process_deps.arn] From 5c440f7d2acdc993ace0bcac4f4e99cf694cacab Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Thu, 18 Sep 2025 17:50:53 -0500 Subject: [PATCH 2/8] Post Process Lambda Layer: removed pyarrow. It is too large on disk. --- lambda_layers/postprocess_dependencies/requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/lambda_layers/postprocess_dependencies/requirements.txt b/lambda_layers/postprocess_dependencies/requirements.txt index 470a16b..e2ee941 100644 --- a/lambda_layers/postprocess_dependencies/requirements.txt +++ b/lambda_layers/postprocess_dependencies/requirements.txt @@ -1,4 +1,3 @@ pandas==2.3.2 netcdf4==1.7.2 xarray==2025.07.1 -pyarrow==21.0.0 From c25e2e08f50e3d043579c7bddf090f949431cdd4 Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Thu, 18 Sep 2025 17:54:07 -0500 Subject: [PATCH 3/8] Post Process Lambda Layer: Try adding fastparquet in place of pyarrow --- lambda_layers/postprocess_dependencies/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/lambda_layers/postprocess_dependencies/requirements.txt b/lambda_layers/postprocess_dependencies/requirements.txt index e2ee941..0b8ca4d 100644 --- a/lambda_layers/postprocess_dependencies/requirements.txt +++ b/lambda_layers/postprocess_dependencies/requirements.txt @@ -1,3 +1,4 @@ pandas==2.3.2 netcdf4==1.7.2 xarray==2025.07.1 +fastparquet==2024.11.0 \ No newline at end of file From 1a38281830be132030ce5aeadf3cedefa684e5e5 Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Tue, 23 Sep 2025 13:48:50 -0500 Subject: [PATCH 4/8] Compute sizing: update to initial non-dev workload values --- terraform/modules/app/main.tf | 2 +- terraform/terraform.tfvars | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/modules/app/main.tf b/terraform/modules/app/main.tf index 46c1c80..51951a1 100644 --- a/terraform/modules/app/main.tf +++ b/terraform/modules/app/main.tf @@ -162,7 +162,7 @@ resource "aws_lambda_function" "post_process" { handler = "post_process_lambda.lambda_handler" runtime = "python3.12" timeout = 300 - memory_size = 256 + memory_size = 8192 layers = [aws_lambda_layer_version.post_process_deps.arn] diff --git a/terraform/terraform.tfvars b/terraform/terraform.tfvars index 907df3d..36fea9e 100644 --- a/terraform/terraform.tfvars +++ b/terraform/terraform.tfvars @@ -29,8 +29,8 @@ postprocess_output_s3_key = "rnr/pi-7/rnr-output" hydrofabric_s3_key = "rnr/pi-7/parquet" # Fargate Compute Configuration -fargate_cpu = 1024 # 8192 -fargate_memory = 2048 # 16384 +fargate_cpu = 8192 +fargate_memory = 16384 fargate_initial_task_count = 1 fargate_max_task_count = 2 From 2fc770655f40084740604503df687248f6a8865c Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Tue, 23 Sep 2025 16:30:09 -0500 Subject: [PATCH 5/8] ECS Worker autoscaling based on RabbitMQ Depth NGWPC-8297 --- lambdas/autoscaler/autoscaler_lambda.py | 130 ++++++++++++++++++++++++ terraform/main.tf | 21 ++++ terraform/modules/app/main.tf | 125 ++++++++++++++++++++++- terraform/modules/app/outputs.tf | 5 + terraform/modules/app/variables.tf | 4 + terraform/modules/iam_roles/main.tf | 50 ++++++++- terraform/modules/iam_roles/outputs.tf | 6 +- terraform/modules/messaging/outputs.tf | 5 + terraform/terraform.tfvars | 5 +- terraform/variables.tf | 13 ++- 10 files changed, 357 insertions(+), 7 deletions(-) create mode 100644 lambdas/autoscaler/autoscaler_lambda.py diff --git a/lambdas/autoscaler/autoscaler_lambda.py b/lambdas/autoscaler/autoscaler_lambda.py new file mode 100644 index 0000000..19cdeb0 --- /dev/null +++ b/lambdas/autoscaler/autoscaler_lambda.py @@ -0,0 +1,130 @@ +import boto3 +import os +import logging +from datetime import datetime, timedelta + +# Configure logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def get_queue_depth(cw_client, mq_broker_name, mq_queue_name): + """ + Get the number of messages on the RabbitMQ broker. + NOTE: This uses the broker-level 'MessageCount' as a proxy for the specific queue depth. + """ + try: + end_time = datetime.utcnow() + start_time = end_time - timedelta(minutes=5) + + response = cw_client.get_metric_data( + MetricDataQueries=[ + { + 'Id': 'mq_message_count', + 'MetricStat': { + 'Metric': { + 'Namespace': 'AWS/AmazonMQ', + 'MetricName': 'MessageCount', + 'Dimensions': [ + {'Name': 'Broker', 'Value': mq_broker_name}, + ] + }, + 'Period': 60, + 'Stat': 'Average', + }, + 'ReturnData': True, + }, + ], + StartTime=start_time, + EndTime=end_time, + ScanBy='TimestampDescending', + MaxDatapoints=1 + ) + + if response['MetricDataResults'][0]['Values']: + # This value is the total for the broker + queue_size = response['MetricDataResults'][0]['Values'][0] + logger.info(f"Broker '{mq_broker_name}' has a total of {queue_size} messages.") + return queue_size + else: + logger.warning(f"No metric data found for MessageCount for broker '{mq_broker_name}'. Assuming 0.") + return 0 + + except Exception as e: + logger.error(f"Error getting MessageCount metric for broker '{mq_broker_name}': {e}") + raise + +def get_running_task_count(ecs_client, cluster_name, service_name): + """ + Get the number of running tasks for the ECS service. + """ + try: + response = ecs_client.describe_services(cluster=cluster_name, services=[service_name]) + if response['services']: + running_count = response['services'][0]['runningCount'] + logger.info(f"Service '{service_name}' in cluster '{cluster_name}' has {running_count} running tasks.") + return running_count + else: + logger.error(f"ECS service '{service_name}' not found in cluster '{cluster_name}'.") + return 0 + except Exception as e: + logger.error(f"Error describing ECS service '{service_name}': {e}") + raise + +def publish_backlog_per_task_metric(cw_client, cluster_name, service_name, backlog_per_task): + """ + Publish the BacklogPerTask metric to CloudWatch. + """ + try: + cw_client.put_metric_data( + Namespace='ECS/Service/RabbitMQ', + MetricData=[ + { + 'MetricName': 'BacklogPerTask', + 'Dimensions': [ + {'Name': 'ClusterName', 'Value': cluster_name}, + {'Name': 'ServiceName', 'Value': service_name}, + ], + 'Value': backlog_per_task, + 'Unit': 'Count' + }, + ] + ) + logger.info(f"Successfully published BacklogPerTask metric with value: {backlog_per_task}") + except Exception as e: + logger.error(f"Error publishing BacklogPerTask metric: {e}") + raise + +def lambda_handler(event, context): + """ + Main Lambda handler function. + """ + cluster_name = os.environ.get("ECS_CLUSTER_NAME") + service_name = os.environ.get("ECS_SERVICE_NAME") + mq_broker_name = os.environ.get("MQ_BROKER_NAME") + mq_queue_name = os.environ.get("MQ_QUEUE_NAME") + region = os.environ.get("AWS_REGION") + + if not all([cluster_name, service_name, mq_broker_name, mq_queue_name, region]): + logging.error("One or more environment variables are not set.") + return {'statusCode': 500, 'body': 'Missing environment variables'} + + # These clients are now created here and passed to the functions + ecs_client = boto3.client('ecs', region_name=region) + cw_client = boto3.client('cloudwatch', region_name=region) + + try: + queue_depth = get_queue_depth(cw_client, mq_broker_name, mq_queue_name) + running_task_count = get_running_task_count(ecs_client, cluster_name, service_name) + + if running_task_count > 0: + backlog_per_task = queue_depth / running_task_count + else: + backlog_per_task = 0 + + publish_backlog_per_task_metric(cw_client, cluster_name, service_name, backlog_per_task) + + return {'statusCode': 200, 'body': 'Metric published successfully'} + + except Exception as e: + logging.error(f"An error occurred: {e}") + return {'statusCode': 500, 'body': 'An error occurred during execution'} diff --git a/terraform/main.tf b/terraform/main.tf index d69ae4c..83c3a8a 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -92,11 +92,13 @@ module "application" { fargate_cpu = var.fargate_cpu fargate_memory = var.fargate_memory fargate_initial_task_count = var.fargate_initial_task_count + fargate_min_task_count = var.fargate_min_task_count fargate_max_task_count = var.fargate_max_task_count } lambda_code = { bucket_name = var.lambda_code_bucket_name + autoscaler_s3_key = var.lambda_autoscaler_zip_s3_key producer_s3_key = var.lambda_producer_zip_s3_key post_process_s3_key = var.lambda_postproc_zip_s3_key post_process_layer_s3_key = var.lambda_postproc_layer_zip_s3_key @@ -111,8 +113,10 @@ module "application" { iam_roles = { ecs_task_execution = module.iam_roles.ecs_task_execution_role_arn ecs_task = module.iam_roles.ecs_task_role_arn + autoscaler_lambda = module.iam_roles.lambda_autoscaler_role_arn producer_lambda = module.iam_roles.lambda_producer_role_arn post_process_lambda = module.iam_roles.lambda_postproc_role_arn + scheduler_role = module.iam_roles.scheduler_role_arn } service_dependencies = { @@ -120,6 +124,7 @@ module "application" { app_output_s3_key = var.app_output_s3_key hydrofabric_s3_key = var.hydrofabric_s3_key postprocess_output_s3_key = var.postprocess_output_s3_key + rabbitmq_broker_name = module.messaging.rabbitmq_broker_name rabbitmq_endpoint = module.messaging.rabbitmq_endpoint rabbitmq_secret_arn = module.messaging.rabbitmq_secret_arn elasticache_endpoint = module.data_stores.elasticache_redis_endpoint @@ -130,6 +135,22 @@ module "application" { # Orchestration and Triggers # ----------------------------------------------------------------------------- +resource "aws_scheduler_schedule" "autoscaler_lambda_trigger" { + name = "${var.app_name}-${var.environment}-autoscaler-trigger" + group_name = "default" + + flexible_time_window { + mode = "OFF" + } + + schedule_expression = "rate(2 minute)" + + target { + arn = module.application.lambda_autoscaler_arn + role_arn = module.iam_roles.scheduler_role_arn + } +} + resource "aws_scheduler_schedule" "producer_lambda_trigger" { name = "${var.app_name}-${var.environment}-producer-trigger" group_name = "default" diff --git a/terraform/modules/app/main.tf b/terraform/modules/app/main.tf index 51951a1..47683e8 100644 --- a/terraform/modules/app/main.tf +++ b/terraform/modules/app/main.tf @@ -14,6 +14,16 @@ resource "aws_cloudwatch_log_group" "fargate_log_group" { } } +resource "aws_cloudwatch_log_group" "autoscaler_lambda_log_group" { + name = "/aws/lambda/${var.app_name}-${var.environment}-ecs-autoscaler" + retention_in_days = 30 + + tags = { + Name = "${var.app_name}-${var.environment}-autoscaler-lambda-log-group" + Environment = var.environment + } +} + resource "aws_cloudwatch_log_group" "producer_lambda_log_group" { name = "/aws/lambda/${var.app_name}-${var.environment}-producer" retention_in_days = 30 @@ -108,6 +118,7 @@ resource "aws_ecs_service" "worker" { } # --- Lambda Functions --- +# --- Producer Lambda Function --- resource "aws_lambda_function" "producer" { function_name = "${var.app_name}-${var.environment}-producer" @@ -133,6 +144,7 @@ resource "aws_lambda_function" "producer" { } } + tags = { Name = "${var.app_name}-${var.environment}-producer-lambda" Environment = var.environment @@ -184,4 +196,115 @@ resource "aws_lambda_function" "post_process" { Name = "${var.app_name}-${var.environment}-post-process-lambda" Environment = var.environment } -} \ No newline at end of file +} + +# --- Autoscaler Lambda Function --- + +resource "aws_lambda_function" "autoscaler" { + function_name = "${var.app_name}-${var.environment}-ecs-autoscaler" + role = var.iam_roles.autoscaler_lambda + handler = "autoscaler_lambda.lambda_handler" + runtime = "python3.12" + timeout = 60 + + s3_bucket = var.lambda_code.bucket_name + s3_key = var.lambda_code.autoscaler_s3_key + + environment { + variables = { + ECS_CLUSTER_NAME = aws_ecs_cluster.main.name + ECS_SERVICE_NAME = aws_ecs_service.worker.name + MQ_BROKER_NAME = var.service_dependencies.rabbitmq_broker_name + MQ_QUEUE_NAME = "hml_files" + } + } + + tags = { + Name = "${var.app_name}-${var.environment}-ecs-autoscaler-lambda" + Environment = var.environment + } +} + +# --- ECS Service Autoscaling --- + +resource "aws_appautoscaling_target" "ecs_target" { + max_capacity = var.compute_config.fargate_max_task_count + min_capacity = var.compute_config.fargate_min_task_count + resource_id = "service/${aws_ecs_cluster.main.name}/${aws_ecs_service.worker.name}" + scalable_dimension = "ecs:service:DesiredCount" + service_namespace = "ecs" +} + +resource "aws_cloudwatch_metric_alarm" "scale_up_alarm" { + alarm_name = "${var.app_name}-${var.environment}-scale-up" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = "2" + metric_name = "BacklogPerTask" + namespace = "ECS/Service/RabbitMQ" + period = "60" + statistic = "Average" + threshold = "100" # Scale up when backlog per task is >= 100 + + dimensions = { + ClusterName = aws_ecs_cluster.main.name + ServiceName = aws_ecs_service.worker.name + } + + alarm_actions = [aws_appautoscaling_policy.scale_up.arn] +} + +resource "aws_appautoscaling_policy" "scale_up" { + name = "${var.app_name}-${var.environment}-scale-up-policy" + policy_type = "StepScaling" + resource_id = aws_appautoscaling_target.ecs_target.resource_id + scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension + service_namespace = aws_appautoscaling_target.ecs_target.service_namespace + + step_scaling_policy_configuration { + adjustment_type = "ChangeInCapacity" + cooldown = 60 + metric_aggregation_type = "Average" + + step_adjustment { + metric_interval_lower_bound = 0 + scaling_adjustment = 1 + } + } +} + +resource "aws_cloudwatch_metric_alarm" "scale_down_alarm" { + alarm_name = "${var.app_name}-${var.environment}-scale-down" + comparison_operator = "LessThanThreshold" + evaluation_periods = "2" + metric_name = "BacklogPerTask" + namespace = "ECS/Service/RabbitMQ" + period = "60" + statistic = "Average" + threshold = "100" # Scale down when backlog per task is < 100 + + dimensions = { + ClusterName = aws_ecs_cluster.main.name + ServiceName = aws_ecs_service.worker.name + } + + alarm_actions = [aws_appautoscaling_policy.scale_down.arn] +} + +resource "aws_appautoscaling_policy" "scale_down" { + name = "${var.app_name}-${var.environment}-scale-down-policy" + policy_type = "StepScaling" + resource_id = aws_appautoscaling_target.ecs_target.resource_id + scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension + service_namespace = aws_appautoscaling_target.ecs_target.service_namespace + + step_scaling_policy_configuration { + adjustment_type = "ChangeInCapacity" + cooldown = 120 + metric_aggregation_type = "Average" + + step_adjustment { + metric_interval_upper_bound = 0 + scaling_adjustment = -1 + } + } +} diff --git a/terraform/modules/app/outputs.tf b/terraform/modules/app/outputs.tf index f942e62..56d20de 100644 --- a/terraform/modules/app/outputs.tf +++ b/terraform/modules/app/outputs.tf @@ -3,6 +3,11 @@ output "ecs_service_name" { value = aws_ecs_service.worker.name } +output "lambda_autoscaler_arn" { + description = "The ARN of the autoscaler Lambda function." + value = aws_lambda_function.autoscaler.arn +} + output "lambda_producer_arn" { description = "The ARN of the producer Lambda function." value = aws_lambda_function.producer.arn diff --git a/terraform/modules/app/variables.tf b/terraform/modules/app/variables.tf index ee213d8..5db2476 100644 --- a/terraform/modules/app/variables.tf +++ b/terraform/modules/app/variables.tf @@ -20,6 +20,7 @@ variable "compute_config" { fargate_cpu = number fargate_memory = number fargate_initial_task_count = number + fargate_min_task_count = number fargate_max_task_count = number }) } @@ -31,6 +32,7 @@ variable "lambda_code" { producer_s3_key = string post_process_s3_key = string post_process_layer_s3_key = string + autoscaler_s3_key = string }) } @@ -50,6 +52,7 @@ variable "iam_roles" { ecs_task = string producer_lambda = string post_process_lambda = string + autoscaler_lambda = string }) } @@ -60,6 +63,7 @@ variable "service_dependencies" { app_output_s3_key = string postprocess_output_s3_key = string hydrofabric_s3_key = string + rabbitmq_broker_name = string rabbitmq_endpoint = string rabbitmq_secret_arn = string elasticache_endpoint = string diff --git a/terraform/modules/iam_roles/main.tf b/terraform/modules/iam_roles/main.tf index 09bf46d..160e986 100644 --- a/terraform/modules/iam_roles/main.tf +++ b/terraform/modules/iam_roles/main.tf @@ -117,6 +117,51 @@ resource "aws_iam_role_policy_attachment" "ecs_task_read_secret" { policy_arn = aws_iam_policy.read_rabbitmq_secret.arn } +# ----------------------------------------------------------------------------- +# Autoscaler Lambda Role +# ----------------------------------------------------------------------------- +resource "aws_iam_role" "lambda_autoscaler_role" { + name = "${var.app_name}-${var.environment}-lambda-autoscaler-role" + assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json + tags = { + Name = "${var.app_name}-${var.environment}-lambda-autoscaler-role" + Environment = var.environment + } +} + +resource "aws_iam_policy" "lambda_autoscaler_policy" { + name = "${var.app_name}-${var.environment}-lambda-autoscaler-policy" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = [ + "cloudwatch:GetMetricData", + "cloudwatch:PutMetricData", + "ecs:DescribeServices" + ] + Effect = "Allow" + Resource = "*" + }, + { + Action = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + Effect = "Allow" + Resource = "arn:aws:logs:*:*:*" + } + ] + }) +} + +resource "aws_iam_role_policy_attachment" "lambda_autoscaler_policy_attachment" { + role = aws_iam_role.lambda_autoscaler_role.name + policy_arn = aws_iam_policy.lambda_autoscaler_policy.arn +} + + # ----------------------------------------------------------------------------- # Data Producer Lambda Role # Needs permission to create network interfaces in the VPC. @@ -204,7 +249,7 @@ resource "aws_iam_role" "scheduler_role" { resource "aws_iam_policy" "scheduler_policy" { name = "${var.app_name}-${var.environment}-scheduler-invoke-lambda-policy" - description = "Allows EventBridge Scheduler to invoke the producer Lambda." + description = "Allows EventBridge Scheduler to invoke the application Lambdas." policy = jsonencode({ Version = "2012-10-17", @@ -216,7 +261,8 @@ resource "aws_iam_policy" "scheduler_policy" { # This follows the principle of least privilege. Resource = [ "arn:aws:lambda:${var.aws_region}:${var.aws_account_id}:function:${var.app_name}-${var.environment}-producer", - "arn:aws:lambda:${var.aws_region}:${var.aws_account_id}:function:${var.app_name}-${var.environment}-post-process" + "arn:aws:lambda:${var.aws_region}:${var.aws_account_id}:function:${var.app_name}-${var.environment}-post-process", + "arn:aws:lambda:${var.aws_region}:${var.aws_account_id}:function:${var.app_name}-${var.environment}-ecs-autoscaler" ] } ] diff --git a/terraform/modules/iam_roles/outputs.tf b/terraform/modules/iam_roles/outputs.tf index aed4b90..39d3816 100644 --- a/terraform/modules/iam_roles/outputs.tf +++ b/terraform/modules/iam_roles/outputs.tf @@ -8,6 +8,11 @@ output "ecs_task_role_arn" { value = aws_iam_role.ecs_task_role.arn } +output "lambda_autoscaler_role_arn" { + description = "The ARN of the IAM role for the autoscaler Lambda." + value = aws_iam_role.lambda_autoscaler_role.arn +} + output "lambda_producer_role_arn" { description = "The ARN of the IAM role for the data producer Lambda." value = aws_iam_role.lambda_producer_role.arn @@ -22,4 +27,3 @@ output "scheduler_role_arn" { description = "The ARN of the IAM role for the EventBridge Scheduler." value = aws_iam_role.scheduler_role.arn } - diff --git a/terraform/modules/messaging/outputs.tf b/terraform/modules/messaging/outputs.tf index 2043cd1..d5b0c94 100644 --- a/terraform/modules/messaging/outputs.tf +++ b/terraform/modules/messaging/outputs.tf @@ -13,6 +13,11 @@ output "rabbitmq_broker_arn" { value = aws_mq_broker.rabbitmq.arn } +output "rabbitmq_broker_name" { + description = "The name of the RabbitMQ broker." + value = aws_mq_broker.rabbitmq.broker_name +} + output "rabbitmq_secret_arn" { description = "The ARN of the Secrets Manager secret for RabbitMQ credentials." value = aws_secretsmanager_secret.rabbitmq.arn diff --git a/terraform/terraform.tfvars b/terraform/terraform.tfvars index 36fea9e..6fb7c91 100644 --- a/terraform/terraform.tfvars +++ b/terraform/terraform.tfvars @@ -1,5 +1,5 @@ # ----------------------------------------------------------------------------- -# 'Non-functional IaC test' Deployment +# 'IaC test' Deployment # ----------------------------------------------------------------------------- aws_region = "us-east-1" @@ -18,6 +18,7 @@ docker_image_uri = "ghcr.io/ngwpc/rnr-aws/troute-rnr:latest" lambda_code_bucket_name = "ngwpc-infra-test" #lambda_producer_zip_s3_key = "lambda-zips/data-producer/v1.0.0.zip" #lambda_postproc_zip_s3_key = "lambda-zips/post-processor/v1.0.0.zip" +lambda_autoscaler_zip_s3_key = "lambda-zips/autoscaler.zip" lambda_producer_zip_s3_key = "lambda-zips/producer.zip" lambda_postproc_zip_s3_key = "lambda-zips/postprocess.zip" lambda_postproc_layer_zip_s3_key = "lambda-zips/postprocess_dependencies.zip" @@ -32,7 +33,7 @@ hydrofabric_s3_key = "rnr/pi-7/parquet" fargate_cpu = 8192 fargate_memory = 16384 fargate_initial_task_count = 1 -fargate_max_task_count = 2 +fargate_max_task_count = 6 # --- Schedule --- producer_schedule_expression = "rate(5 minutes)" diff --git a/terraform/variables.tf b/terraform/variables.tf index 56f208a..e1ef222 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -48,6 +48,11 @@ variable "lambda_code_bucket_name" { type = string } +variable "lambda_autoscaler_zip_s3_key" { + description = "The S3 key for the autoscaler Lambda function's ZIP file." + type = string +} + variable "lambda_producer_zip_s3_key" { description = "The S3 key for the producer Lambda function's ZIP file." type = string @@ -103,10 +108,16 @@ variable "fargate_initial_task_count" { default = 1 } +variable "fargate_min_task_count" { + description = "The minimum number of Fargate tasks to scale in to." + type = number + default = 1 +} + variable "fargate_max_task_count" { description = "The maximum number of Fargate tasks to scale out to." type = number - default = 2 + default = 6 } # --- Messaging Configuration --- From 99092060ca53eb3a480921f55fad6b033c450d4e Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Tue, 23 Sep 2025 18:42:14 -0500 Subject: [PATCH 6/8] PlantUML Diagram: added ECS Worker autoscaling based on RabbitMQ Depth NGWPC-8297 --- infrastructure_diagram.puml | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/infrastructure_diagram.puml b/infrastructure_diagram.puml index a46b0dd..8ef3f89 100644 --- a/infrastructure_diagram.puml +++ b/infrastructure_diagram.puml @@ -9,6 +9,9 @@ !includeurl AWSPUML/Compute/Lambda.puml !includeurl AWSPUML/Containers/Fargate.puml !includeurl AWSPUML/Storage/SimpleStorageService.puml +!includeurl AWSPUML/ManagementGovernance/CloudWatch.puml +!includeurl AWSPUML/ManagementGovernance/ApplicationAutoScaling2.puml + title RnR AWS Infrastructure Architecture skinparam linetype ortho @@ -23,8 +26,12 @@ package "RnR AWS Infrastructure" { cloud "AWS Cloud" { EventBridge(producer_scheduler, "EventBridge Scheduler", "Triggers the Producer on a schedule (5 min)") EventBridge(postprocess_scheduler, "EventBridge Scheduler", "Triggers the Post Process on a schedule (10 min)") + EventBridge(autoscaler_scheduler, "EventBridge Scheduler", "Triggers the Autoscaler (1 min)") SecretsManager(secrets, "Secrets Manager", "Stores RabbitMQ credentials") + CloudWatch(cw_alarms, "CloudWatch Alarms", "Monitors Backlog & MessageCount") + ApplicationAutoScaling2(app_scaling, "Application Auto Scaling", "Adjusts ECS Task count") + rectangle "VPC" <> { MQ(rabbitmq, "Amazon MQ (RabbitMQ)", "Message queue for processing tasks") @@ -33,6 +40,7 @@ package "RnR AWS Infrastructure" { Lambda(producer_lambda, "Producer Lambda", "Fetches data, checks cache, and queues messages") Fargate(fargate_worker, "ECS Fargate Worker", "T-Route") Lambda(postproc_lambda, "Post-Process Lambda", "Performs final actions on output") + Lambda(autoscaler_lambda, "Autoscaler Lambda", "Calculates backlog per task") } SimpleStorageService(bucket, "S3 Bucket", "Object Storage") @@ -46,12 +54,14 @@ rectangle "External APIs" { ' --- Relationships and Data Flow --- legend top right - |= Flow |= Color | - | Producer | <#blue> | - | Worker (T-Route) | <#green> | - | Post Process | <#purple>| + |= Flow |= Color | + | Producer | <#blue> | + | Worker (T-Route) | <#green> | + | Post Process | <#purple> | + | Autoscaling | <#orange> | endlegend +' Producer Flow producer_scheduler --> producer_lambda : 1. Triggers (cron) producer_lambda -[#blue]-> weather_api : "2. Fetches data" producer_lambda -[#blue]-> nwps_api : "2. Fetches data" @@ -59,13 +69,24 @@ producer_lambda -[#blue]-> redis : 3. Checks for existing data producer_lambda -[#blue]-> rabbitmq : 4. Publishes message producer_lambda .[#blue].> secrets : Reads credentials +' Worker Flow fargate_worker -[#green]-> rabbitmq : 5. Consumes message fargate_worker -[#green]-> bucket : 6. Reads domain data fargate_worker -[#green]-> bucket : 7. Writes results fargate_worker .[#green].> secrets : Reads credentials +' Post-Processing Flow postprocess_scheduler --> postproc_lambda : 8. Triggers (cron) postproc_lambda -[#purple]-> bucket : 9. Processes output data postproc_lambda -[#purple]-> bucket : 10. Reads domain data -@enduml +' Autoscaling Flow +autoscaler_scheduler -[#orange]-> autoscaler_lambda : A. Triggers (cron) +autoscaler_lambda -[#orange]-> rabbitmq : "B. Gets MessageCount metric via CloudWatch" +autoscaler_lambda -[#orange]-> fargate_worker : "C. Gets Running Task count via ECS API" +autoscaler_lambda -[#orange]-> cw_alarms : "D. Publishes BacklogPerTask metric" +cw_alarms -[#orange]-> app_scaling : "E. Triggers scaling policy" +app_scaling -[#orange]-> fargate_worker : "F. Adjusts task count" + + +@enduml \ No newline at end of file From 9d76cdb3c484d6edd2d393a6ba036bf4fd281bb5 Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Thu, 25 Sep 2025 18:41:01 -0500 Subject: [PATCH 7/8] Documentation cleanup --- README.md | 46 +++++++---------------------- terraform/modules/iam_roles/main.tf | 2 +- terraform/terraform.tfvars | 5 ++++ 3 files changed, 17 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 7c9a97a..f9ccea0 100644 --- a/README.md +++ b/README.md @@ -1,61 +1,36 @@ # Reference Architecture for a Serverless Data Processing Pipeline -This repository contains the infrastructure-as-code (IaC) and most basic CI/CD pipelines for a serverless data processing application on AWS. The infrastructure is defined using Terraform, and continuous integration is handled by GitHub Actions. +This repository contains the infrastructure-as-code (IaC) and basic CI pipelines for a serverless RnR data processing application deployment on AWS. The infrastructure is defined using Terraform, and continuous integration is handled by GitHub Actions. ## Architecture Overview -Triggers: An EventBridge Scheduler kicks off the process on a defined schedule. +Triggers: An EventBridge Scheduler kicks off processes on defined schedules. Data Ingestion: A Producer Lambda function sends messages to a central RabbitMQ Broker (Amazon MQ). It also has access to and ElasticCache (Redis) for caching. -Core Processing: A long-running Fargate Worker Task (ECS) consumes messages from RabbitMQ, performs the main business logic, and uses EFS and S3 for shared storage. It also has ElastiCache (Redis) for caching, though that may only be required by the Data ingestion (#TODO: Clarify Access requirements) +Core Processing: A long-running Fargate Worker Task (ECS) consumes messages from RabbitMQ, performs the main business logic, and uses S3 for shared storage. + +Autoscaler: Provides AWS MQ RabbitMQ depth based autoscaling by way of a lambda and cloudwatch events. Data Storage: The worker task reads from an Input S3 Bucket and writes its results to an Output S3 Bucket. -Post-Processing: A Post-Process Lambda is triggered by new objects created in the Output S3 bucket to perform final actions. (#TODO: Clarify how this really is triggered) +Post-Processing: A Post-Process Lambda is triggered on a schedule and processes any new objects created in the Output S3 bucket to perform final actions. Security: IAM Roles aim to provide least-privilege permissions, and Secrets Manager securely stores RabbitMQ credentials. -## Action Items for Development Team - -To integrate the real application code, one would need to review and update the following areas. - -- [X] Lambda Functions - - The placeholder Lambda code is located in lambdas/producer/producer_lambda.py - - Action: Replace the contents of these files with your production-ready Python code. - - Action: Update the corresponding requirements.txt file in each Lambda's directory with any new Python dependencies. - -- [X] ECS Worker Task - - The placeholder ECS worker code is located in ecs/worker/worker.py. - - Action: Replace this with the core data processing logic, image, ETC. - -- [ ] IAM Permissions - - The current IAM roles have basic permissions for S3 access, VPC networking, and reading the RabbitMQ secret. The role definitions are in terraform/modules/iam_roles/main.tf. - - Action: Identify any other required AWS services, and provide the list of required permissions to the infrastructure team for IAM policy updates. - -- [ ] Environment Variables & Secrets - - The infrastructure passes a basic example set of environment variables (bucket names, RabbitMQ endpoint, ETC.) to the compute services. These are defined in terraform/modules/app/main.tf. - - Action: Identify any additional configuration values or secrets your application needs. For non-sensitive values, we can add them as new environment variables. For sensitive values (API keys, ETC.), we will likely add them to AWS Secrets Manager and grant the necessary IAM permissions to access them. +## Pre-requisites +- A VPC with private Subnets +- An S3 Bucket accessible by the -## Pre-requisites ### Bucket Policy Requirements - This application's IaC requires an existing S3 bucket to store input and output data currently. To allow the ECS tasks and Lambda functions created by this stack to access the bucket, the bucket owner must update the bucket policy to grant permissions to the application’s IAM roles. After deploying this stack, provide the following IAM role ARNs (created by Terraform) to the S3 Bucket Owner. ``` +arn:aws:iam:::role/--lambda-producer-role arn:aws:iam:::role/--ecs-task-role arn:aws:iam:::role/--lambda-postproc-role ``` @@ -68,6 +43,7 @@ You will want to integrate the following or request that theythe owner integrate "Effect": "Allow", "Principal": { "AWS": [ + "arn:aws:iam:::role/--lambda-producer-role", "arn:aws:iam:::role/--ecs-task-role", "arn:aws:iam:::role/--lambda-postproc-role" ] diff --git a/terraform/modules/iam_roles/main.tf b/terraform/modules/iam_roles/main.tf index 160e986..5e85f85 100644 --- a/terraform/modules/iam_roles/main.tf +++ b/terraform/modules/iam_roles/main.tf @@ -236,7 +236,7 @@ resource "aws_iam_role_policy_attachment" "lambda_postproc_basic_execution" { # ----------------------------------------------------------------------------- # EventBridge Scheduler Role -# Needs permission to invoke the producer and post process Lambda functions. +# Needs permission to invoke autoscaler, producer & post process Lambdas. # ----------------------------------------------------------------------------- resource "aws_iam_role" "scheduler_role" { name = "${var.app_name}-${var.environment}-scheduler-role" diff --git a/terraform/terraform.tfvars b/terraform/terraform.tfvars index 6fb7c91..f554931 100644 --- a/terraform/terraform.tfvars +++ b/terraform/terraform.tfvars @@ -39,3 +39,8 @@ fargate_max_task_count = 6 producer_schedule_expression = "rate(5 minutes)" postprocess_schedule_expression = "rate(10 minutes)" +# Example to force delete the created secrets in secrets manager without waiting for the scheduled deletion, should you want to quickly delete and recreate the infrastructure. +# This should only be necessary in test environments where you want to quickly delete and recreate the infrastructure. +# aws secretsmanager delete-secret --secret-id rnrtest/test/rabbitmq-credentials --force-delete-without-recovery --region us-east-1 + + From 8af0868bba827e40ee5c26889adfdde421d2ac9f Mon Sep 17 00:00:00 2001 From: "kevin.spurrier" Date: Thu, 25 Sep 2025 19:09:46 -0500 Subject: [PATCH 8/8] Documentation cleanup: more minor documentation updates --- README.md | 28 +++++++++++++++++++++++----- terraform/terraform.tfvars | 4 +--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index f9ccea0..b9420d3 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,16 @@ # Reference Architecture for a Serverless Data Processing Pipeline -This repository contains the infrastructure-as-code (IaC) and basic CI pipelines for a serverless RnR data processing application deployment on AWS. The infrastructure is defined using Terraform, and continuous integration is handled by GitHub Actions. +This repository contains the infrastructure-as-code (IaC) and basic CI pipelines for a serverless RnR data processing application deployment on AWS. This solution is designed to be scalable, resilient, and cost-effective, leveraging modern cloud-native technologies. The infrastructure is defined using Terraform, and continuous integration and artifact deployment are handled by GitHub Actions. ## Architecture Overview Triggers: An EventBridge Scheduler kicks off processes on defined schedules. -Data Ingestion: A Producer Lambda function sends messages to a central RabbitMQ Broker (Amazon MQ). It also has access to and ElasticCache (Redis) for caching. +Data Ingestion: A Producer Lambda function fetches data from external weather APIs, checks for duplicates using ElastiCache for Redis, and sends messages to a central RabbitMQ Broker (Amazon MQ). Core Processing: A long-running Fargate Worker Task (ECS) consumes messages from RabbitMQ, performs the main business logic, and uses S3 for shared storage. -Autoscaler: Provides AWS MQ RabbitMQ depth based autoscaling by way of a lambda and cloudwatch events. +Autoscaling: An Autoscaler Lambda, triggered by an EventBridge schedule, calculates the backlog of messages per running task and publishes a custom CloudWatch metric. CloudWatch Alarms monitor this metric and trigger Application Auto Scaling policies to adjust the number of Fargate tasks up to a defined maximum number of workers. Data Storage: The worker task reads from an Input S3 Bucket and writes its results to an Output S3 Bucket. @@ -20,8 +20,11 @@ Security: IAM Roles aim to provide least-privilege permissions, and Secrets Mana ## Pre-requisites -- A VPC with private Subnets -- An S3 Bucket accessible by the +- An AWS Account +- A VPC with private subnets +- An S3 Bucket for Terraform state and Lambda code +- Terraform installed locally +- AWS CLI configured with appropriate credentials ### Bucket Policy Requirements This application's IaC requires an existing S3 bucket to store input and output data currently. @@ -60,3 +63,18 @@ You will want to integrate the following or request that theythe owner integrate ] } ``` +### CI/CD Pipelines + +This repository includes two GitHub Actions workflows: + +1. cicd-container.yml: This workflow is triggered on pushes and pull requests to the main and development branches. It builds the Docker image for the Fargate worker, scans it for vulnerabilities using Trivy, and pushes it to GitHub Container Registry (ghcr.io). + +2. cicd-lambdas.yml: This workflow is triggered on pushes to the main branch when files in the lambdas or lambda_layers directories change. It packages the Lambda functions and layers into zip files and uploads them as artifacts, ready for deployment. + +### Future Improvements for Consideration + +- Production-Ready Messaging: For a production environment, the Amazon MQ for RabbitMQ broker should likely be configured in a cluster with a multi-AZ deployment for high availability. +- Enhanced Security: The security groups could be made more restrictive. For example, the egress rules could be limited to only the necessary endpoints. +- Distributed Tracing or Enhance Logging: Implementing a tool like AWS X-Ray would provide end-to-end tracing of requests as they flow through the system, from the producer Lambda to the Fargate worker. +- Dead-Letter Queues (DLQs): To improve resiliency, DLQs could be configured for the RabbitMQ queues to handle messages that cannot be processed successfully. +- Monitoring and notifications: For a production environment, the this solution will likely need to be integrated with a monitoring and notification system configured for appropriate issue escalation. diff --git a/terraform/terraform.tfvars b/terraform/terraform.tfvars index f554931..46d0a49 100644 --- a/terraform/terraform.tfvars +++ b/terraform/terraform.tfvars @@ -13,11 +13,9 @@ subnet_name_pattern = "Test-App*" rabbitmq_console_cidr = "10.0.0.0/8" # --- Application Image and Code --- -# This would typically be passed in from a CI/CD pipeline +# These values would typically be passed in from a CI/CD pipeline and be versioned. docker_image_uri = "ghcr.io/ngwpc/rnr-aws/troute-rnr:latest" lambda_code_bucket_name = "ngwpc-infra-test" -#lambda_producer_zip_s3_key = "lambda-zips/data-producer/v1.0.0.zip" -#lambda_postproc_zip_s3_key = "lambda-zips/post-processor/v1.0.0.zip" lambda_autoscaler_zip_s3_key = "lambda-zips/autoscaler.zip" lambda_producer_zip_s3_key = "lambda-zips/producer.zip" lambda_postproc_zip_s3_key = "lambda-zips/postprocess.zip"