Skip to content

shawnawshk/s3-batch-processing-with-stepfunction

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

9 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

S3 Batch Processing with Step Functions Distributed Map + ECS

A production-ready AWS SAM solution for processing large numbers of S3 objects in parallel using AWS Step Functions Distributed Map with ECS workers on EC2 instances.

πŸ—οΈ Architecture

S3 Batch Processing Architecture

The architecture shows the complete workflow from S3 input through distributed processing to output:

  • S3 Input: Objects stored in input/ folder trigger processing
  • Step Functions: Orchestrates the entire workflow with distributed map
  • ECS Workers: Dynamic scaling EC2 instances process objects in parallel
  • Activity Pattern: Workers poll Step Functions Activity for tasks
  • S3 Output: Processed objects stored in processed/ folder

πŸ”„ Step Functions Workflow

The solution uses a sophisticated Step Functions state machine that orchestrates the entire processing pipeline:

Workflow States

Step Functions Workflow

State Details

1. ProvisionECS State

  • Type: Lambda Task
  • Purpose: Dynamically provisions ECS workers
  • Input: worker_count parameter
  • Function: Calls ECS Provisioner Lambda to:
    • Scale Auto Scaling Group to desired worker count
    • Wait for EC2 instances to be ready
    • Start ECS tasks on the instances
  • Output: Provisioning result with worker details

2. ProcessObjects State

  • Type: Distributed Map
  • Purpose: Processes all S3 objects in parallel
  • Configuration:
    • Mode: DISTRIBUTED - Uses Step Functions Distributed Map for high concurrency
    • ExecutionType: STANDARD - Full Step Functions features
    • MaxConcurrency: 10 - Limits parallel executions
    • ToleratedFailurePercentage: 10 - Allows 10% failures
  • Item Processor: Each S3 object becomes a separate execution
  • Task Resource: Step Functions Activity (polling-based)
  • Retry Logic: 3 attempts with exponential backoff
  • Timeout: 300 seconds per object

3. DeprovisionECS State

  • Type: Lambda Task
  • Purpose: Clean up resources
  • Trigger: Always runs (success or failure via Catch block)
  • Function: Calls ECS Provisioner Lambda to:
    • Stop ECS tasks
    • Scale Auto Scaling Group to 0
    • Clean up resources

Workflow Input Format

{
  "objects": [
    {"Key": "input/file1.txt"},
    {"Key": "input/file2.txt"},
    {"Key": "input/file3.txt"}
  ],
  "worker_count": 5
}

Error Handling & Resilience

  • Retry Logic: Failed object processing retries 3x with exponential backoff
  • Fault Tolerance: Up to 10% of objects can fail without stopping the workflow
  • Guaranteed Cleanup: Deprovisioning always runs via Catch block
  • Timeout Protection: 300-second timeout prevents stuck tasks
  • Activity Pattern: Workers poll for tasks, enabling dynamic scaling

Execution Flow

  1. Start: Workflow receives list of S3 objects and worker count
  2. Provision: Lambda provisions exact number of ECS workers needed
  3. Distribute: Distributed Map creates one execution per S3 object
  4. Process: Workers poll Activity for tasks and process objects in parallel
  5. Monitor: Step Functions tracks progress and handles failures
  6. Cleanup: Resources are deprovisioned regardless of success/failure

This architecture enables processing thousands of S3 objects with precise resource control and cost optimization.

✨ Key Features

  • πŸš€ Dynamic Scaling: Worker count specified at execution time (1-100 workers)
  • ⚑ Fast: Parallel processing with distributed map pattern
  • πŸ’° Cost-Effective: Dynamic scaling, pay only when processing
  • πŸ”„ Reliable: Built-in retry logic and error handling
  • πŸ“Š Observable: Comprehensive structured JSON logging
  • 🐳 Containerized: Uses Docker image from application code
  • 🌐 Portable: Dynamic VPC discovery, works across accounts/regions
  • 🎯 Flexible: Easy adaptation to any workload size

πŸš€ Quick Start

πŸ“‹ Prerequisites

  • AWS SAM CLI installed and configured
  • AWS CLI configured with appropriate region
  • Docker installed and running
  • AWS Permissions for ECS, Step Functions, S3, ECR, CloudFormation, Auto Scaling
  • jq for JSON processing

πŸ”§ Deploy

This solution uses AWS SAM (Serverless Application Model) for infrastructure deployment and management.

# Deploy with default settings (ASG max size: 100)
./deploy.sh

# Or customize deployment with instance type
./deploy.sh 5 m5.large    # Still works, but worker count is now dynamic

The deployment script uses sam build and sam deploy commands to provision all AWS resources defined in the SAM template.

πŸ§ͺ Test & Execute

# Run end-to-end test with default 3 workers
./test.sh

# Run test with custom worker count
./test.sh 10

# Execute with specific worker count
./execute.sh 5          # 5 workers
./execute.sh 20         # 20 workers
./execute.sh 50         # 50 workers

# Execute with custom S3 prefix
./execute.sh 10 data/   # 10 workers, process 'data/' prefix

Script Differences:

  • test.sh: Complete end-to-end test - generates files, monitors execution, verifies results
  • execute.sh: Quick execution launcher - uses existing S3 files, starts workflow and exits

🧹 Cleanup

# Remove all resources
./cleanup.sh

πŸ“ Project Structure

β”œβ”€β”€ README.md                    # This guide
β”œβ”€β”€ template.yaml               # AWS SAM template with dynamic scaling (max 100)
β”œβ”€β”€ deploy.sh                   # SAM deployment script
β”œβ”€β”€ test.sh                     # End-to-end test with dynamic worker count
β”œβ”€β”€ execute.sh                  # Simple execution script for any worker count
β”œβ”€β”€ cleanup.sh                  # Resource cleanup
β”œβ”€β”€ build-and-push.sh          # Docker build/push script
β”œβ”€β”€ application/                # Application code
β”‚   β”œβ”€β”€ processor.py           # Worker with activity polling & structured logging
β”‚   β”œβ”€β”€ requirements.txt       # Python dependencies
β”‚   └── Dockerfile            # Container definition
β”œβ”€β”€ functions/                  # Lambda functions
β”‚   └── ecs_provisioner/      # Dynamic ECS provisioning logic
└── statemachine/              # Step Functions workflow
    └── workflow-complete.asl.json  # Accepts dynamic worker_count input

βš™οΈ Dynamic Worker Configuration

🎯 Execution Input Format

{
  "objects": [
    {"Key": "input/file1.txt"},
    {"Key": "input/file2.txt"}
  ],
  "worker_count": 10
}

πŸ–₯️ Instance Types

  • t3.medium - Cost-effective, light workloads
  • c5.large - CPU-intensive processing (default)
  • m5.large - Balanced CPU/memory
  • m5.xlarge - Memory-intensive workloads

πŸ“Š Expected Performance

Workers Instance Type Objects Processing Time Throughput
3 c5.large 50 ~4 minutes 750 obj/hr
5 c5.large 50 ~2.5 minutes 1200 obj/hr
10 c5.large 50 ~1.5 minutes 2000 obj/hr
20 m5.large 100 ~1.5 minutes 4000 obj/hr

πŸ” Monitoring

πŸ“‹ CloudWatch Logs

# View worker logs
aws logs describe-log-streams \
  --log-group-name "/ecs/s3-batch-processor-s3-batch-processor" \
  --region ap-east-1

πŸ“Š Key Log Messages

  • "Processing task" - Task received from Step Functions Activity
  • "Processing S3 object" - Object processing started
  • "S3 object processed successfully" - Processing completed
  • "Task completed" - Task finished with count

🎯 Step Functions Console

  • Visual workflow execution tracking
  • Distributed map performance metrics
  • Error details and retry information

🎨 Customizing Processing Logic

The core processing happens in application/processor.py. Modify the process_s3_object() method:

def process_s3_object(self, object_key: str, bucket: str = None) -> Dict[str, Any]:
    # Your custom processing logic here
    # - Image processing: resize, filter, analyze
    # - Data transformation: parse, validate, enrich
    # - ML inference: classify, predict, score
    # - File conversion: PDF to text, format conversion
    
    # Current implementation: 5-second processing simulation
    time.sleep(5)
    
    # Return structured result
    return {
        'object_key': object_key,
        'processed_key': processed_key,
        'content': content,
        'processing_time': processing_time,
        'processed_at': datetime.utcnow().isoformat(),
        'worker_id': self.worker_id,
        'status': 'success'
    }

πŸ”’ Security Features

  • βœ… IAM Roles: Least privilege access managed by SAM
  • βœ… Dynamic VPC: Uses default VPC, no hardcoded values
  • βœ… Container Security: Non-root user execution
  • βœ… S3 Access: Scoped to specific bucket/prefixes
  • βœ… SAM Security: Infrastructure as Code with version control

🎯 Production Considerations

πŸ’° Cost Optimization

  • Zero cost when idle: ASG scales to 0 when no processing
  • Dynamic worker count: Scale exactly to your workload needs
  • Efficient processing: 5-second processing time per object
  • Automatic cleanup: Infrastructure scales down after completion

πŸ“ˆ Scaling Guidelines

  • Small workloads (< 50 objects): 2-5 workers
  • Medium workloads (50-500 objects): 5-20 workers
  • Large workloads (500+ objects): 20-100 workers
  • Maximum capacity: 100 workers (configurable in template.yaml)

πŸ”§ Tested Configuration

  • βœ… Dynamic worker count: 1-100 workers tested and working
  • βœ… 1:1:1 ratio: 1 worker = 1 EC2 instance = 1 ECS task
  • βœ… Dynamic VPC discovery: Portable across accounts
  • βœ… Proper logging: Structured JSON logs with processing details
  • βœ… Complete lifecycle: Provision β†’ Process β†’ Deprovision
  • βœ… AWS SAM deployment: Infrastructure as Code with repeatable deployments

πŸ› οΈ SAM Commands

# Build the application
sam build

# Deploy with guided prompts
sam deploy --guided

# Deploy with custom max workers (default: 100)
sam deploy --parameter-overrides MaxWorkers=200

# View stack outputs
sam list stack-outputs

# Delete the stack
sam delete

πŸš€ Usage Examples

# Process 10 files with 3 workers
./execute.sh 3

# Process 100 files with 25 workers for faster throughput
./execute.sh 25

# Process files from 'data/' prefix with 10 workers
./execute.sh 10 data/

# Run comprehensive test with 5 workers
./test.sh 5

# Multiple concurrent executions for load testing
./execute.sh 5 && ./execute.sh 8 && ./execute.sh 12

πŸŽ‰ Ready to process thousands of S3 objects efficiently with dynamic AWS SAM scaling!

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors