Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ requests==2.32.4
python-dotenv==1.1.0
launchdarkly-server-sdk-ai
launchdarkly-server-sdk
boto3==1.35.0
Binary file not shown.
91 changes: 91 additions & 0 deletions aws-periodic-scheduler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# AWS Periodic Results Generator

## Purpose

This Lambda function automatically regenerates experiment results, monitoring data, and guarded releases for all active demo environments every 3-4 days. This keeps demo environments looking fresh and active without requiring manual intervention.

## What It Does

- Queries DynamoDB for all active demo environments (status: `completed`)
- Filters users who haven't had results regenerated in the last 3 days
- Processes users in batches of 5 per Lambda invocation
- Generates fresh data for:
- Experiments (cart suggestions, hero images, AI configs, etc.)
- Monitoring metrics (AI chatbot feedback, financial agent metrics)
- Guarded releases (A4, risk management, financial advisor, databases)
- Updates `lastResultsGenerated` timestamp in DynamoDB to track processing

## Architecture

```
CloudWatch EventBridge (every 6-12 hours)
Lambda Function
DynamoDB (fetch active users) + LaunchDarkly Management API (fetch SDK keys)
LaunchDarkly SDK (send events)
LaunchDarkly Dashboard (updated experiment results)
```

## Key Technical Solutions

### Connection Pool Fix
The Lambda includes a urllib3 monkey-patch that increases the connection pool size from 1 to 50, solving the "Connection pool is full" issue that prevents events from reaching LaunchDarkly in serverless environments.

### Batch Processing
Processes 5 users per invocation to prevent Lambda timeout. The CloudWatch scheduler triggers the Lambda every 6-12 hours, gradually processing all users that need refresh.

### Smart Filtering
Only regenerates results for users whose `lastResultsGenerated` timestamp is >= 3 days old, preventing unnecessary processing and respecting the "every 3-4 days" requirement.

## Files

- **`lambda_deploy/`** - Lambda function source code
- `LambdaPeriodicResultsGenerator.py` - Main Lambda handler
- `DynamoDBUtils.py` - DynamoDB client for user management
- `LDAPIUtils.py` - LaunchDarkly Management API client
- `results_generator.py` - Copy with dynamic PROJECT_KEY support
- All Python dependencies (ldclient, boto3, requests, etc.)

## Deployment

The deployment package is located at:
```
aws-periodic-scheduler/PeriodicResultsGeneratorLambda.zip
```

### Lambda Configuration
- **Runtime:** Python 3.11
- **Timeout:** 15 minutes (900 seconds)
- **Memory:** 512 MB
- **Handler:** `LambdaPeriodicResultsGenerator.lambda_handler`

### Environment Variables
- `LD_API_KEY` - LaunchDarkly Management API Service Token (Reader access)

### IAM Permissions
- `AmazonDynamoDBFullAccess` - for reading/writing DynamoDB timestamps
- `AWSLambdaBasicExecutionRole` - for CloudWatch Logs

### CloudWatch EventBridge Schedule
Create a scheduled rule:
- **Schedule:** `rate(6 hours)` or `rate(12 hours)`
- **Target:** The Lambda function

## Testing & Verification

✅ Successfully tested in AWS Lambda
✅ Events reach LaunchDarkly without connection pool errors
✅ Experiment results update correctly in dashboard
✅ DynamoDB timestamp tracking prevents duplicate processing
✅ Batch processing prevents timeout with 300+ users

## Important Notes

- This Lambda **does NOT affect** the existing provisioning process in `DemoBuilder.py`
- Original `results_generator.py` in `demo_provisioning_scripts/` remains unchanged
- Lambda uses a separate copy with dynamic PROJECT_KEY support for multi-environment processing
- The Lambda reuses existing generator functions, ensuring consistency across provisioning and periodic updates

147 changes: 147 additions & 0 deletions aws-periodic-scheduler/lambda_deploy/DynamoDBUtils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
DynamoDB Utilities
Helper functions to interact with DynamoDB for demo provisioning records
"""
import boto3
import logging
from datetime import datetime, timedelta

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)

class DynamoDBClient:
"""Client for interacting with DynamoDB demo provisioning table"""

def __init__(self, table_name="ld-core-demo-provisioning-workflow-records-prod", region="us-east-1"):
self.table_name = table_name
self.region = region
self.dynamodb = boto3.resource('dynamodb', region_name=region)
self.table = self.dynamodb.Table(table_name)
self.user_records = {}

def get_completed_users(self):
"""
Retrieve all users with status='completed' from DynamoDB
Handles duplicate records by keeping only the most recent one per user

Returns:
list: List of unique usernames with completed status
"""
try:
logging.info(f"Scanning DynamoDB table: {self.table_name}")

response = self.table.scan(
FilterExpression='#status = :completed',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={':completed': 'completed'}
)

items = response.get('Items', [])

while 'LastEvaluatedKey' in response:
response = self.table.scan(
FilterExpression='#status = :completed',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={':completed': 'completed'},
ExclusiveStartKey=response['LastEvaluatedKey']
)
items.extend(response.get('Items', []))

logging.info(f"Found {len(items)} completed records")

#de-duplicate by username, keeping most recent
for item in items:
username = item.get('userKey')
created_at = item.get('createdAt')

if not username:
continue

if username not in self.user_records:
self.user_records[username] = item
elif created_at and created_at > self.user_records[username].get('createdAt', ''):
self.user_records[username] = item

unique_usernames = list(self.user_records.keys())
logging.info(f"Found {len(unique_usernames)} unique users")

return unique_usernames

except Exception as e:
logging.error(f"Error scanning DynamoDB: {str(e)}")
return []

def filter_users_needing_refresh(self, usernames, days_threshold=3):
"""
Filter users who need results regenerated (haven't been processed recently)

Args:
usernames: List of usernames to filter
days_threshold: Number of days since last generation to consider stale

Returns:
list: Usernames that need refresh, sorted by priority (oldest first)
"""
users_needing_refresh = []
now = datetime.now()

for username in usernames:
user_record = self.user_records.get(username, {})
last_generated = user_record.get('lastResultsGenerated')

if not last_generated:
users_needing_refresh.append((username, None))
else:
try:
last_gen_dt = datetime.fromisoformat(last_generated.replace('Z', '+00:00'))
days_since = (now - last_gen_dt).days

if days_since >= days_threshold:
users_needing_refresh.append((username, last_gen_dt))
except Exception as e:
logging.warning(f"Error parsing date for {username}: {e}")
users_needing_refresh.append((username, None))

users_needing_refresh.sort(key=lambda x: x[1] if x[1] else datetime.min)

return [username for username, _ in users_needing_refresh]

def update_last_generated_timestamp(self, username):
"""
Update the lastResultsGenerated timestamp for a user
Attempts to write to DynamoDB, falls back to logging if it fails
"""
timestamp = datetime.now().isoformat()
logging.info(f"Results generated for {username} at {timestamp}")

try:
user_record = self.user_records.get(username)
if not user_record:
logging.warning(f"No record found for {username}, cannot update timestamp")
return

user_key = user_record.get('userKey')
created_at = user_record.get('createdAt')

if not user_key or not created_at:
logging.warning(f"Missing key fields for {username}")
return

self.table.update_item(
Key={
'userKey': user_key,
'createdAt': created_at
},
UpdateExpression='SET lastResultsGenerated = :timestamp',
ExpressionAttributeValues={
':timestamp': timestamp
}
)

logging.info(f"Successfully updated timestamp in DynamoDB for {username}")

except Exception as e:
logging.warning(f"Could not update DynamoDB timestamp for {username}: {e}")

81 changes: 81 additions & 0 deletions aws-periodic-scheduler/lambda_deploy/LDAPIUtils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
LaunchDarkly API Utilities
Helper functions to interact with LaunchDarkly Management API
"""
import requests
import logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)

class LaunchDarklyAPIClient:
"""Client for interacting with LaunchDarkly Management API"""

def __init__(self, api_token):
self.api_token = api_token
self.base_url = "https://app.launchdarkly.com/api/v2"
self.headers = {
"Authorization": api_token,
"Content-Type": "application/json"
}

def get_project_environment_keys(self, project_key, environment_key="production"):
"""
Retrieve SDK key and credentials for a specific project environment

Args:
project_key: The project key (ex: "user-ld-demo")
environment_key: The environment key (default: "production")

Returns:
dict: Contains sdk_key, mobile_key, and client_id
Returns None if project doesn't exist
"""
try:
url = f"{self.base_url}/projects/{project_key}/environments/{environment_key}"

logging.info(f"Fetching credentials for project: {project_key}")
response = requests.get(url, headers=self.headers, timeout=10)

if response.status_code == 404:
logging.warning(f"Project {project_key} not found")
return None

response.raise_for_status()
env_data = response.json()

credentials = {
"project_key": project_key,
"environment_key": environment_key,
"sdk_key": env_data.get("apiKey"),
"mobile_key": env_data.get("mobileKey"),
"client_id": env_data.get("id")
}

logging.info(f"Successfully retrieved credentials for {project_key}")
return credentials

except requests.exceptions.RequestException as e:
logging.error(f"Error fetching credentials for {project_key}: {str(e)}")
return None

def project_exists(self, project_key):
"""Check if a LaunchDarkly project exists"""
try:
url = f"{self.base_url}/projects/{project_key}"
response = requests.get(url, headers=self.headers, timeout=10)
return response.status_code == 200
except Exception as e:
logging.error(f"Error checking project {project_key}: {str(e)}")
return False


def construct_project_key_from_username(username):
"""
Construct LaunchDarkly project key from username
Pattern: {username}-ld-demo
"""
return f"{username}-ld-demo"

Loading