Skip to content
Merged

Iac #18

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 30 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,61 +1,39 @@
# Reference Architecture for a Serverless Data Processing Pipeline

This repository contains the infrastructure-as-code (IaC) and most basic CI/CD pipelines for a serverless data processing application on AWS. The infrastructure is defined using Terraform, and continuous integration is handled by GitHub Actions.
This repository contains the infrastructure-as-code (IaC) and basic CI pipelines for a serverless RnR data processing application deployment on AWS. This solution is designed to be scalable, resilient, and cost-effective, leveraging modern cloud-native technologies. The infrastructure is defined using Terraform, and continuous integration and artifact deployment are handled by GitHub Actions.

## Architecture Overview

Triggers: An EventBridge Scheduler kicks off the process on a defined schedule.
Triggers: An EventBridge Scheduler kicks off processes on defined schedules.

Data Ingestion: A Producer Lambda function sends messages to a central RabbitMQ Broker (Amazon MQ). It also has access to and ElasticCache (Redis) for caching.
Data Ingestion: A Producer Lambda function fetches data from external weather APIs, checks for duplicates using ElastiCache for Redis, and sends messages to a central RabbitMQ Broker (Amazon MQ).

Core Processing: A long-running Fargate Worker Task (ECS) consumes messages from RabbitMQ, performs the main business logic, and uses EFS and S3 for shared storage. It also has ElastiCache (Redis) for caching, though that may only be required by the Data ingestion (#TODO: Clarify Access requirements)
Core Processing: A long-running Fargate Worker Task (ECS) consumes messages from RabbitMQ, performs the main business logic, and uses S3 for shared storage.

Autoscaling: An Autoscaler Lambda, triggered by an EventBridge schedule, calculates the backlog of messages per running task and publishes a custom CloudWatch metric. CloudWatch Alarms monitor this metric and trigger Application Auto Scaling policies to adjust the number of Fargate tasks up to a defined maximum number of workers.

Data Storage: The worker task reads from an Input S3 Bucket and writes its results to an Output S3 Bucket.

Post-Processing: A Post-Process Lambda is triggered by new objects created in the Output S3 bucket to perform final actions. (#TODO: Clarify how this really is triggered)
Post-Processing: A Post-Process Lambda is triggered on a schedule and processes any new objects created in the Output S3 bucket to perform final actions.

Security: IAM Roles aim to provide least-privilege permissions, and Secrets Manager securely stores RabbitMQ credentials.

## Action Items for Development Team

To integrate the real application code, one would need to review and update the following areas.

- [X] Lambda Functions

The placeholder Lambda code is located in lambdas/producer/producer_lambda.py

Action: Replace the contents of these files with your production-ready Python code.

Action: Update the corresponding requirements.txt file in each Lambda's directory with any new Python dependencies.

- [X] ECS Worker Task

The placeholder ECS worker code is located in ecs/worker/worker.py.

Action: Replace this with the core data processing logic, image, ETC.

- [ ] IAM Permissions

The current IAM roles have basic permissions for S3 access, VPC networking, and reading the RabbitMQ secret. The role definitions are in terraform/modules/iam_roles/main.tf.

Action: Identify any other required AWS services, and provide the list of required permissions to the infrastructure team for IAM policy updates.

- [ ] Environment Variables & Secrets

The infrastructure passes a basic example set of environment variables (bucket names, RabbitMQ endpoint, ETC.) to the compute services. These are defined in terraform/modules/app/main.tf.

Action: Identify any additional configuration values or secrets your application needs. For non-sensitive values, we can add them as new environment variables. For sensitive values (API keys, ETC.), we will likely add them to AWS Secrets Manager and grant the necessary IAM permissions to access them.
## Pre-requisites

- An AWS Account
- A VPC with private subnets
- An S3 Bucket for Terraform state and Lambda code
- Terraform installed locally
- AWS CLI configured with appropriate credentials

## Pre-requisites
### Bucket Policy Requirements

This application's IaC requires an existing S3 bucket to store input and output data currently.
To allow the ECS tasks and Lambda functions created by this stack to access the bucket, the bucket owner must update the bucket policy to grant permissions to the application’s IAM roles.

After deploying this stack, provide the following IAM role ARNs (created by Terraform) to the S3 Bucket Owner.

```
arn:aws:iam::<ACCOUNT_ID>:role/<APP_NAME>-<ENV>-lambda-producer-role
arn:aws:iam::<ACCOUNT_ID>:role/<APP_NAME>-<ENV>-ecs-task-role
arn:aws:iam::<ACCOUNT_ID>:role/<APP_NAME>-<ENV>-lambda-postproc-role
```
Expand All @@ -68,6 +46,7 @@ You will want to integrate the following or request that theythe owner integrate
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::<ACCOUNT_ID>:role/<APP_NAME>-<ENV>-lambda-producer-role",
"arn:aws:iam::<ACCOUNT_ID>:role/<APP_NAME>-<ENV>-ecs-task-role",
"arn:aws:iam::<ACCOUNT_ID>:role/<APP_NAME>-<ENV>-lambda-postproc-role"
]
Expand All @@ -84,3 +63,18 @@ You will want to integrate the following or request that theythe owner integrate
]
}
```
### CI/CD Pipelines

This repository includes two GitHub Actions workflows:

1. cicd-container.yml: This workflow is triggered on pushes and pull requests to the main and development branches. It builds the Docker image for the Fargate worker, scans it for vulnerabilities using Trivy, and pushes it to GitHub Container Registry (ghcr.io).

2. cicd-lambdas.yml: This workflow is triggered on pushes to the main branch when files in the lambdas or lambda_layers directories change. It packages the Lambda functions and layers into zip files and uploads them as artifacts, ready for deployment.

### Future Improvements for Consideration

- Production-Ready Messaging: For a production environment, the Amazon MQ for RabbitMQ broker should likely be configured in a cluster with a multi-AZ deployment for high availability.
- Enhanced Security: The security groups could be made more restrictive. For example, the egress rules could be limited to only the necessary endpoints.
- Distributed Tracing or Enhance Logging: Implementing a tool like AWS X-Ray would provide end-to-end tracing of requests as they flow through the system, from the producer Lambda to the Fargate worker.
- Dead-Letter Queues (DLQs): To improve resiliency, DLQs could be configured for the RabbitMQ queues to handle messages that cannot be processed successfully.
- Monitoring and notifications: For a production environment, the this solution will likely need to be integrated with a monitoring and notification system configured for appropriate issue escalation.
31 changes: 26 additions & 5 deletions infrastructure_diagram.puml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
!includeurl AWSPUML/Compute/Lambda.puml
!includeurl AWSPUML/Containers/Fargate.puml
!includeurl AWSPUML/Storage/SimpleStorageService.puml
!includeurl AWSPUML/ManagementGovernance/CloudWatch.puml
!includeurl AWSPUML/ManagementGovernance/ApplicationAutoScaling2.puml


title RnR AWS Infrastructure Architecture
skinparam linetype ortho
Expand All @@ -23,8 +26,12 @@ package "RnR AWS Infrastructure" {
cloud "AWS Cloud" {
EventBridge(producer_scheduler, "EventBridge Scheduler", "Triggers the Producer on a schedule (5 min)")
EventBridge(postprocess_scheduler, "EventBridge Scheduler", "Triggers the Post Process on a schedule (10 min)")
EventBridge(autoscaler_scheduler, "EventBridge Scheduler", "Triggers the Autoscaler (1 min)")

SecretsManager(secrets, "Secrets Manager", "Stores RabbitMQ credentials")
CloudWatch(cw_alarms, "CloudWatch Alarms", "Monitors Backlog & MessageCount")
ApplicationAutoScaling2(app_scaling, "Application Auto Scaling", "Adjusts ECS Task count")


rectangle "VPC" <<vpc>> {
MQ(rabbitmq, "Amazon MQ (RabbitMQ)", "Message queue for processing tasks")
Expand All @@ -33,6 +40,7 @@ package "RnR AWS Infrastructure" {
Lambda(producer_lambda, "Producer Lambda", "Fetches data, checks cache, and queues messages")
Fargate(fargate_worker, "ECS Fargate Worker", "T-Route")
Lambda(postproc_lambda, "Post-Process Lambda", "Performs final actions on output")
Lambda(autoscaler_lambda, "Autoscaler Lambda", "Calculates backlog per task")
}

SimpleStorageService(bucket, "S3 Bucket", "Object Storage")
Expand All @@ -46,26 +54,39 @@ rectangle "External APIs" {

' --- Relationships and Data Flow ---
legend top right
|= Flow |= Color |
| Producer | <#blue> |
| Worker (T-Route) | <#green> |
| Post Process | <#purple>|
|= Flow |= Color |
| Producer | <#blue> |
| Worker (T-Route) | <#green> |
| Post Process | <#purple> |
| Autoscaling | <#orange> |
endlegend

' Producer Flow
producer_scheduler --> producer_lambda : 1. Triggers (cron)
producer_lambda -[#blue]-> weather_api : "2. Fetches data"
producer_lambda -[#blue]-> nwps_api : "2. Fetches data"
producer_lambda -[#blue]-> redis : 3. Checks for existing data
producer_lambda -[#blue]-> rabbitmq : 4. Publishes message
producer_lambda .[#blue].> secrets : Reads credentials

' Worker Flow
fargate_worker -[#green]-> rabbitmq : 5. Consumes message
fargate_worker -[#green]-> bucket : 6. Reads domain data
fargate_worker -[#green]-> bucket : 7. Writes results
fargate_worker .[#green].> secrets : Reads credentials

' Post-Processing Flow
postprocess_scheduler --> postproc_lambda : 8. Triggers (cron)
postproc_lambda -[#purple]-> bucket : 9. Processes output data
postproc_lambda -[#purple]-> bucket : 10. Reads domain data

@enduml
' Autoscaling Flow
autoscaler_scheduler -[#orange]-> autoscaler_lambda : A. Triggers (cron)
autoscaler_lambda -[#orange]-> rabbitmq : "B. Gets MessageCount metric via CloudWatch"
autoscaler_lambda -[#orange]-> fargate_worker : "C. Gets Running Task count via ECS API"
autoscaler_lambda -[#orange]-> cw_alarms : "D. Publishes BacklogPerTask metric"
cw_alarms -[#orange]-> app_scaling : "E. Triggers scaling policy"
app_scaling -[#orange]-> fargate_worker : "F. Adjusts task count"


@enduml
1 change: 1 addition & 0 deletions lambda_layers/postprocess_dependencies/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pandas==2.3.2
netcdf4==1.7.2
xarray==2025.07.1
fastparquet==2024.11.0
130 changes: 130 additions & 0 deletions lambdas/autoscaler/autoscaler_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import boto3
import os
import logging
from datetime import datetime, timedelta

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_queue_depth(cw_client, mq_broker_name, mq_queue_name):
"""
Get the number of messages on the RabbitMQ broker.
NOTE: This uses the broker-level 'MessageCount' as a proxy for the specific queue depth.
"""
try:
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=5)

response = cw_client.get_metric_data(
MetricDataQueries=[
{
'Id': 'mq_message_count',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/AmazonMQ',
'MetricName': 'MessageCount',
'Dimensions': [
{'Name': 'Broker', 'Value': mq_broker_name},
]
},
'Period': 60,
'Stat': 'Average',
},
'ReturnData': True,
},
],
StartTime=start_time,
EndTime=end_time,
ScanBy='TimestampDescending',
MaxDatapoints=1
)

if response['MetricDataResults'][0]['Values']:
# This value is the total for the broker
queue_size = response['MetricDataResults'][0]['Values'][0]
logger.info(f"Broker '{mq_broker_name}' has a total of {queue_size} messages.")
return queue_size
else:
logger.warning(f"No metric data found for MessageCount for broker '{mq_broker_name}'. Assuming 0.")
return 0

except Exception as e:
logger.error(f"Error getting MessageCount metric for broker '{mq_broker_name}': {e}")
raise

def get_running_task_count(ecs_client, cluster_name, service_name):
"""
Get the number of running tasks for the ECS service.
"""
try:
response = ecs_client.describe_services(cluster=cluster_name, services=[service_name])
if response['services']:
running_count = response['services'][0]['runningCount']
logger.info(f"Service '{service_name}' in cluster '{cluster_name}' has {running_count} running tasks.")
return running_count
else:
logger.error(f"ECS service '{service_name}' not found in cluster '{cluster_name}'.")
return 0
except Exception as e:
logger.error(f"Error describing ECS service '{service_name}': {e}")
raise

def publish_backlog_per_task_metric(cw_client, cluster_name, service_name, backlog_per_task):
"""
Publish the BacklogPerTask metric to CloudWatch.
"""
try:
cw_client.put_metric_data(
Namespace='ECS/Service/RabbitMQ',
MetricData=[
{
'MetricName': 'BacklogPerTask',
'Dimensions': [
{'Name': 'ClusterName', 'Value': cluster_name},
{'Name': 'ServiceName', 'Value': service_name},
],
'Value': backlog_per_task,
'Unit': 'Count'
},
]
)
logger.info(f"Successfully published BacklogPerTask metric with value: {backlog_per_task}")
except Exception as e:
logger.error(f"Error publishing BacklogPerTask metric: {e}")
raise

def lambda_handler(event, context):
"""
Main Lambda handler function.
"""
cluster_name = os.environ.get("ECS_CLUSTER_NAME")
service_name = os.environ.get("ECS_SERVICE_NAME")
mq_broker_name = os.environ.get("MQ_BROKER_NAME")
mq_queue_name = os.environ.get("MQ_QUEUE_NAME")
region = os.environ.get("AWS_REGION")

if not all([cluster_name, service_name, mq_broker_name, mq_queue_name, region]):
logging.error("One or more environment variables are not set.")
return {'statusCode': 500, 'body': 'Missing environment variables'}

# These clients are now created here and passed to the functions
ecs_client = boto3.client('ecs', region_name=region)
cw_client = boto3.client('cloudwatch', region_name=region)

try:
queue_depth = get_queue_depth(cw_client, mq_broker_name, mq_queue_name)
running_task_count = get_running_task_count(ecs_client, cluster_name, service_name)

if running_task_count > 0:
backlog_per_task = queue_depth / running_task_count
else:
backlog_per_task = 0

publish_backlog_per_task_metric(cw_client, cluster_name, service_name, backlog_per_task)

return {'statusCode': 200, 'body': 'Metric published successfully'}

except Exception as e:
logging.error(f"An error occurred: {e}")
return {'statusCode': 500, 'body': 'An error occurred during execution'}
21 changes: 21 additions & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ module "application" {
fargate_cpu = var.fargate_cpu
fargate_memory = var.fargate_memory
fargate_initial_task_count = var.fargate_initial_task_count
fargate_min_task_count = var.fargate_min_task_count
fargate_max_task_count = var.fargate_max_task_count
}

lambda_code = {
bucket_name = var.lambda_code_bucket_name
autoscaler_s3_key = var.lambda_autoscaler_zip_s3_key
producer_s3_key = var.lambda_producer_zip_s3_key
post_process_s3_key = var.lambda_postproc_zip_s3_key
post_process_layer_s3_key = var.lambda_postproc_layer_zip_s3_key
Expand All @@ -111,15 +113,18 @@ module "application" {
iam_roles = {
ecs_task_execution = module.iam_roles.ecs_task_execution_role_arn
ecs_task = module.iam_roles.ecs_task_role_arn
autoscaler_lambda = module.iam_roles.lambda_autoscaler_role_arn
producer_lambda = module.iam_roles.lambda_producer_role_arn
post_process_lambda = module.iam_roles.lambda_postproc_role_arn
scheduler_role = module.iam_roles.scheduler_role_arn
}

service_dependencies = {
app_bucket_name = var.app_bucket_name
app_output_s3_key = var.app_output_s3_key
hydrofabric_s3_key = var.hydrofabric_s3_key
postprocess_output_s3_key = var.postprocess_output_s3_key
rabbitmq_broker_name = module.messaging.rabbitmq_broker_name
rabbitmq_endpoint = module.messaging.rabbitmq_endpoint
rabbitmq_secret_arn = module.messaging.rabbitmq_secret_arn
elasticache_endpoint = module.data_stores.elasticache_redis_endpoint
Expand All @@ -130,6 +135,22 @@ module "application" {
# Orchestration and Triggers
# -----------------------------------------------------------------------------

resource "aws_scheduler_schedule" "autoscaler_lambda_trigger" {
name = "${var.app_name}-${var.environment}-autoscaler-trigger"
group_name = "default"

flexible_time_window {
mode = "OFF"
}

schedule_expression = "rate(2 minute)"

target {
arn = module.application.lambda_autoscaler_arn
role_arn = module.iam_roles.scheduler_role_arn
}
}

resource "aws_scheduler_schedule" "producer_lambda_trigger" {
name = "${var.app_name}-${var.environment}-producer-trigger"
group_name = "default"
Expand Down
Loading