-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
444 lines (369 loc) · 14.7 KB
/
lambda_function.py
File metadata and controls
444 lines (369 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
import json
import logging
import os
import traceback
from datetime import date, datetime
from pathlib import Path
from typing import Any, Optional
from pydantic import ValidationError
from pymysql.cursors import DictCursor
from requests.exceptions import RequestException
from requests.models import HTTPError
from models.response import ErrorType, NasaApiResponse
from models.event import ApiGatewayEvent
from services.aws import SecretsManagerWrapper
from services.db import MysqlDriver
from services.rest_api import ExternalApiService
from utils.config import settings
from utils.logger import configure_logger
logger = configure_logger(Path(__file__).stem)
############################################
### Custom Exceptions for the Lambda App ###
############################################
class LambdaConfigError(Exception):
"""Exception raised for configuration errors in the Lambda function."""
pass
class CredentialsSetupError(Exception):
"""Exception raised for errors in setting up credentials."""
pass
class DbRelatedError(Exception):
"""Exception raised for database-related errors."""
pass
class EventParsingError(Exception):
"""Exception raised for errors in parsing the event object."""
pass
class LambdaLogicError(Exception):
"""Exception raised for errors in the Lambda function's business logic."""
pass
class LambdaValidationError(Exception):
"""Exception raised for input validation errors."""
pass
class ExternalApiError(Exception):
"""Exception raised for errors related to external API calls."""
pass
# Define standard error codes for different error types
ERROR_CODES = {
"INVALID_INPUT": 400,
"UNAUTHORIZED": 401,
"RESOURCE_NOT_FOUND": 404,
"VALIDATION_ERROR": 422,
"DATABASE_ERROR": 500,
"CONFIG_ERROR": 500,
"INTERNAL_ERROR": 500,
"TIMEOUT_ERROR": 504,
"EXTERNAL_API_ERROR": 502,
"UNKNOWN_ERROR": 500,
"EVENT_PARSING_ERROR": 400,
"CREDENTIALS_SETUP_ERROR": 500,
}
############################################
######## Helper functions #######
############################################
class DateTimeEncoder(json.JSONEncoder):
"""
Custom JSON encoder to handle date and datetime objects.
Converts dates and datetimes to ISO format string.
"""
def default(self, o):
if isinstance(o, (datetime, date)):
return o.isoformat()
return super().default(o)
def parse_event_data(event: dict) -> ApiGatewayEvent:
"""
Parse and validate the Lambda event data.
Args:
event: The Lambda event dictionary containing trigger information
Returns:
ApiGatewayEvent: A structured object containing parsed event data
Raises:
EventParsingError: If the event data is invalid
"""
try:
return ApiGatewayEvent.from_event(event)
except ValueError as e:
raise EventParsingError(str(e)) from e
def retrieve_expected_and_received_api_key(event: dict, secrets_manager: SecretsManagerWrapper) -> tuple[str, str]:
# First check if SecretsManager got the ref api key
try:
expected_api_key = secrets_manager.secrets["LOGISTICS_API_GATEWAY_KEY"]
except KeyError as e:
err_msg = "could not find 'X_API_KEY' in secrets from SecretsManager"
logger.error(err_msg)
raise KeyError(err_msg) from e
try:
headers = event["headers"]
except KeyError as e:
err_msg = "No headers present in event! Cannot validate 'x-api-key'."
logger.error(err_msg)
raise KeyError(err_msg) from e
try:
x_api_key = headers["x-api-key"]
except KeyError as e:
err_msg = "Could not find 'x-api-key' in headers or the Lambda event"
logger.error(err_msg)
raise KeyError(err_msg) from e
return expected_api_key, x_api_key
def format_success_response(data: dict[str, Any], request_id: str, status_code: int = 200) -> dict[str, Any]:
"""
Format a success response with consistent headers.
Args:
data: The response data
request_id: The request ID for tracking
status_code: The HTTP status code (allow 20x like 207)
Returns:
Formatted response dictionary
"""
# Create response body with lambda func metadata
response_body = {"timestamp": datetime.now().isoformat(), "requestId": request_id, "success": True}
# Only add data keys that don't conflict with metadata
for key, value in data.items():
if key not in response_body:
response_body[key] = value
else:
logger.warning(f"Key collision in response: '{key}' from data was not included")
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Credentials": True,
"X-Request-ID": request_id,
},
"body": json.dumps(response_body, cls=DateTimeEncoder),
}
def format_error_response(
error_message: str,
error_type: ErrorType = ErrorType.INTERNAL_ERROR,
details: Optional[dict] = None,
traceback_info: Optional[str] = None,
request_id: Optional[str] = None,
) -> dict[str, Any]:
"""
Format an error response with consistent headers and detailed error information.
Args:
error_message: The human-readable error message
error_type: A standardized error type identifier
details: Additional error details (optional)
traceback_info: Optional traceback information
Returns:
Formatted error response dictionary
"""
# Use standard error code if provided as string
error_body = {
"error": {
"message": error_message,
"status_code": error_type.code,
"http_status": error_type.phrase,
"type": error_type.name,
},
"timestamp": datetime.now(settings.tz).isoformat(),
"success": False,
}
if details:
error_body["error"]["details"] = details
# Check if debug_mode is available
debug_mode = settings.lambda_log_level == "DEBUG" or os.getenv("DEBUG_MODE", "0") == "1"
if traceback_info and debug_mode:
error_body["error"]["traceback"] = traceback_info
# Quick logging
log_msg = f"Error: {error_type.name} ({error_type.code} - {error_type.phrase}). Message: {error_message}"
if details:
log_msg += f" - Details: {details}"
if request_id:
log_msg += f" - RequestID: {request_id}"
logger.error(log_msg)
return {
"statusCode": error_type.code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Credentials": True,
"X-Request-ID": request_id if request_id else "unknown",
},
"body": json.dumps(error_body, cls=DateTimeEncoder),
}
############################################
### Main Lambda Handler Function logic ###
############################################
def lambda_handler(event: dict, context):
request_id = getattr(context, "aws_request_id", "unknown")
logger.info(f"Processing request {request_id}")
# Parse the event
try:
logger.info(f"Received event: {event}")
logger.info(f"Received context: {context}")
event_data = parse_event_data(event)
except EventParsingError as e:
return format_error_response(
error_message="Error parsing event data",
error_type=ErrorType.EVENT_PARSING_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
except Exception as e:
return format_error_response(
error_message="Unexpected error when parsing event data",
error_type=ErrorType.UNKNOWN_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
logger.info(f"Converted Lambda event to ApiGatewayEvent({event_data})")
# Use secret manager to get api keys
try:
secrets_manager = SecretsManagerWrapper()
except CredentialsSetupError as e:
logger.error(f"Error getting credentials using SecretsManager: {e}")
return format_error_response(
error_message="Error getting credentials using SecretsManager",
error_type=ErrorType.CREDENTIALS_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
# Update the logging level based on settings
try:
sm_log_level = secrets_manager.secrets["LAMBDA_LOG_LEVEL"]
if sm_log_level != getattr(logging, settings.lambda_log_level):
logger.info(f"Updating the logger's level to {sm_log_level}")
logger.setLevel(sm_log_level)
for handler in logger.handlers:
handler.setLevel(sm_log_level)
except KeyError as e:
err_msg = f"Missing 'LAMBDA_LOG_LEVEL' secret in SecretsManager: {e!s}"
return format_error_response(
error_message=err_msg,
error_type=ErrorType.CREDENTIALS_ERROR,
traceback_info=traceback.format_exc(),
request_id=request_id,
)
# Validate the API key of request to API Gateway
try:
expected_api_key, x_api_key = retrieve_expected_and_received_api_key(event, secrets_manager)
except KeyError as e:
return format_error_response(
error_message="Error retrieving API keys for validation",
error_type=ErrorType.UNAUTHORIZED,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
if expected_api_key != x_api_key:
err_msg = f"Received x-api-key={x_api_key} is incorrect."
logger.error(err_msg)
return format_error_response(
error_message=err_msg,
error_type=ErrorType.UNAUTHORIZED,
details={"expected_api_key": expected_api_key, "received_api_key": x_api_key},
request_id=request_id,
traceback_info=traceback.format_exc(),
)
# Setup the db connection
db = None
try:
try:
db = (
MysqlDriver(dotenv_file=Path(".env"))
if settings.use_local_db
else MysqlDriver(secrets_manager=secrets_manager)
)
except Exception as e:
return format_error_response(
"Error instantiating MySQL db driver.",
error_type=ErrorType.DATABASE_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
# Test the connection to db according to env
try:
logger.info(
f"Excuting: 'SELECT * FROM SequelizeMeta' to test connection to {secrets_manager.secrets.get('RDS_DB_NAME', 'UNKNOWN_ENV')} database..."
)
with (
db.transaction() as connection,
connection.cursor(DictCursor) as cursor,
):
cursor.execute("SELECT * FROM SequelizeMeta")
migrations = cursor.fetchall()
logger.info("Connection successfull!")
except Exception as e:
logger.error(f"Error connecting to DB and processing data: {e}")
return format_error_response(
error_message="Error connecting to DB and processing data",
error_type=ErrorType.DATABASE_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
# Only save last five migrations
migrations = migrations[:5]
# Use service to make the API call
try:
external_api_serv = ExternalApiService()
except Exception as e:
return format_error_response(
error_message="Error instantiating ExternalApiService",
error_type=ErrorType.CREDENTIALS_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
# Test external API call to NASA APOD API
try:
nasa_api_key = secrets_manager.secrets.get("NASA_API_KEY", "DEMO_KEY")
some_date = event_data.query_string_parameters.get("someDate", date.today().isoformat())
base_url = "https://api.nasa.gov/planetary/apod"
params = {"api_key": nasa_api_key, "date": some_date}
external_api_response = external_api_serv.fetch(url=base_url, params=params)
logger.info(f"External API response: {external_api_response}")
except (HTTPError, RequestException) as e:
return format_error_response(
error_message="Error making external API call",
error_type=ErrorType.EXTERNAL_API_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
# Prepare successful response data
try:
apod_data = NasaApiResponse(
data=external_api_response,
status="success",
message="Fetched data from NASA APOD API successfully",
).model_dump()
except ValidationError as e:
return format_error_response(
error_message="Error validating external API response",
error_type=ErrorType.VALIDATION_ERROR,
details={"message": str(e), "errors": e.errors()},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
except Exception as e:
return format_error_response(
error_message="Unexpected error validating external API response",
error_type=ErrorType.UNKNOWN_ERROR,
details={"message": str(e)},
traceback_info=traceback.format_exc(),
request_id=request_id,
)
response_data = {
"message": "Lambda execution successful",
"lastestMigrations": migrations,
"apod": apod_data,
}
return format_success_response(
data=response_data,
request_id=request_id,
status_code=200,
)
finally:
# Close database connections
if db:
try:
db.close_all()
logger.info("Successfully closed all database connections")
except Exception as e:
logger.error(f"Error closing database connections: {e!s}")