diff --git a/.coderabbit.yaml b/.coderabbit.yaml
new file mode 100644
index 0000000..a491d6d
--- /dev/null
+++ b/.coderabbit.yaml
@@ -0,0 +1,4 @@
+# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
+reviews:
+ auto_review:
+ enabled: false
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..ab1f416
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,10 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Ignored default folder with query files
+/queries/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# Editor-based HTTP Client requests
+/httpRequests/
diff --git a/.idea/aws-lambda-python-template.iml b/.idea/aws-lambda-python-template.iml
new file mode 100644
index 0000000..d6ebd48
--- /dev/null
+++ b/.idea/aws-lambda-python-template.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..18daf47
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..7b94cec
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..8306744
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.pr_agent.toml b/.pr_agent.toml
new file mode 100644
index 0000000..0f665cf
--- /dev/null
+++ b/.pr_agent.toml
@@ -0,0 +1,2 @@
+[config]
+disable_auto_feedback = true
diff --git a/README.md b/README.md
index 99fa4d4..caceeb8 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,17 @@ A template for creating AWS Lambda functions with Python, using Docker images st
├── .github/workflows # Example GitHub Actions workflows
├── cloudformation # Example CloudFormation templates
├── models # Pydantic models for data validation
+│ ├── event.py # API Gateway event model
+│ ├── exceptions.py # Custom exceptions
+│ └── response.py # Standardized response models
├── services # Business logic and external service integrations
+│ ├── aws.py # AWS service wrappers (Secrets Manager)
+│ ├── db.py # MySQL database driver
+│ ├── migrations.py # Migration repository
+│ ├── nasa.py # NASA API service
+│ └── rest_api.py # Base REST API service
+├── tests # Unit tests
+│ └── test_lambda_function.py
├── utils # Utility functions (e.g., logger, config)
├── .pre-commit-config.yaml # Configuration for pre-commit hooks
├── debug_lambda.py # Script for running the Lambda function locally
@@ -33,6 +43,39 @@ A template for creating AWS Lambda functions with Python, using Docker images st
└── README.md # This file
```
+## Testing
+
+This template uses `pytest` and `unittest` for testing.
+
+### Running Tests
+
+To run the unit tests:
+
+```bash
+uv run python -m unittest discover tests
+```
+
+Or if you prefer `pytest`:
+
+```bash
+uv run pytest
+```
+
+## Code Review Challenge
+
+This template is designed to test AI-powered code review tools (like Junie, Copilot, or Qodo). It includes:
+- A modular structure that allows for architectural reviews.
+- Strategic use of type hints.
+- Centralized error handling.
+- Placeholder tests and TODOs to see if they are flagged.
+- A "Manager" pattern that can be critiqued.
+
+### Areas for Reviewers to Focus On:
+1. **Architecture:** Is the `LambdaManager` too heavy? Should the validation logic be more decoupled?
+2. **Security:** Are secrets handled correctly? (Check `_validate_api_key` in `lambda_function.py`).
+3. **Typing:** Are all `Any` types necessary?
+4. **Testing:** Is the test coverage sufficient? Are the mocks appropriate?
+
## Getting Started
### Prerequisites
diff --git a/lambda_function.py b/lambda_function.py
index 78edf8f..2d47474 100644
--- a/lambda_function.py
+++ b/lambda_function.py
@@ -1,444 +1,221 @@
import json
import logging
-import os
import traceback
from datetime import date, datetime
from pathlib import Path
-from typing import Any, Optional
+from typing import Any, Dict, Optional
from pydantic import ValidationError
-from pymysql.cursors import DictCursor
from requests.exceptions import RequestException
from requests.models import HTTPError
-from models.response import ErrorType, NasaApiResponse
from models.event import ApiGatewayEvent
+from models.exceptions import (
+ CredentialsSetupError,
+ DbRelatedError,
+ EventParsingError,
+ ExternalApiError,
+ LambdaValidationError,
+)
+from models.response import ErrorType, NasaApiResponse
from services.aws import SecretsManagerWrapper
from services.db import MysqlDriver
-from services.rest_api import ExternalApiService
+from services.migrations import MigrationRepository
+from services.nasa import NasaService
from utils.config import settings
from utils.logger import configure_logger
logger = configure_logger(Path(__file__).stem)
-############################################
-### Custom Exceptions for the Lambda App ###
-############################################
-
-
-class LambdaConfigError(Exception):
- """Exception raised for configuration errors in the Lambda function."""
-
- pass
-
-
-class CredentialsSetupError(Exception):
- """Exception raised for errors in setting up credentials."""
-
- pass
-
-
-class DbRelatedError(Exception):
- """Exception raised for database-related errors."""
-
- pass
-
-
-class EventParsingError(Exception):
- """Exception raised for errors in parsing the event object."""
-
- pass
-
-
-class LambdaLogicError(Exception):
- """Exception raised for errors in the Lambda function's business logic."""
-
- pass
-
-
-class LambdaValidationError(Exception):
- """Exception raised for input validation errors."""
-
- pass
-
+class DateTimeEncoder(json.JSONEncoder):
+ """Custom JSON encoder to handle date and datetime objects."""
-class ExternalApiError(Exception):
- """Exception raised for errors related to external API calls."""
+ def default(self, o):
+ if isinstance(o, (datetime, date)):
+ return o.isoformat()
+ return super().default(o)
- pass
+class LambdaResponseFormatter:
+ """Utility class to format Lambda responses consistently."""
-# Define standard error codes for different error types
-ERROR_CODES = {
- "INVALID_INPUT": 400,
- "UNAUTHORIZED": 401,
- "RESOURCE_NOT_FOUND": 404,
- "VALIDATION_ERROR": 422,
- "DATABASE_ERROR": 500,
- "CONFIG_ERROR": 500,
- "INTERNAL_ERROR": 500,
- "TIMEOUT_ERROR": 504,
- "EXTERNAL_API_ERROR": 502,
- "UNKNOWN_ERROR": 500,
- "EVENT_PARSING_ERROR": 400,
- "CREDENTIALS_SETUP_ERROR": 500,
-}
+ @staticmethod
+ def success(data: Dict[str, Any], request_id: str, status_code: int = 200) -> Dict[str, Any]:
+ response_body = {
+ "timestamp": datetime.now(settings.tz).isoformat(),
+ "requestId": request_id,
+ "success": True,
+ "data": data,
+ }
+ return {
+ "statusCode": status_code,
+ "headers": {
+ "Content-Type": "application/json",
+ "X-Request-ID": request_id,
+ },
+ "body": json.dumps(response_body, cls=DateTimeEncoder),
+ }
+ @staticmethod
+ def error(
+ message: str,
+ error_type: ErrorType = ErrorType.INTERNAL_ERROR,
+ details: Optional[Dict[str, Any]] = None,
+ request_id: str = "unknown",
+ ) -> Dict[str, Any]:
+ error_body = {
+ "success": False,
+ "timestamp": datetime.now(settings.tz).isoformat(),
+ "requestId": request_id,
+ "error": {
+ "type": error_type.name,
+ "message": message,
+ "code": error_type.code,
+ },
+ }
+ if details:
+ error_body["error"]["details"] = details
-############################################
-######## Helper functions #######
-############################################
+ if settings.lambda_log_level == "DEBUG":
+ error_body["error"]["stacktrace"] = traceback.format_exc()
+ logger.error(f"Error {error_type.name}: {message} (Request ID: {request_id})")
-class DateTimeEncoder(json.JSONEncoder):
- """
- Custom JSON encoder to handle date and datetime objects.
+ return {
+ "statusCode": error_type.code,
+ "headers": {
+ "Content-Type": "application/json",
+ "X-Request-ID": request_id,
+ },
+ "body": json.dumps(error_body, cls=DateTimeEncoder),
+ }
- Converts dates and datetimes to ISO format string.
- """
- def default(self, o):
- if isinstance(o, (datetime, date)):
- return o.isoformat()
- return super().default(o)
+class LambdaManager:
+ """Main orchestrator for the Lambda function logic."""
+ def __init__(self, event: Dict[str, Any], context: Any):
+ self.event = event
+ self.context = context
+ self.request_id = getattr(context, "aws_request_id", "unknown")
+ self.secrets_manager = None
+ self.db_driver = None
-def parse_event_data(event: dict) -> ApiGatewayEvent:
- """
- Parse and validate the Lambda event data.
-
- Args:
- event: The Lambda event dictionary containing trigger information
-
- Returns:
- ApiGatewayEvent: A structured object containing parsed event data
-
- Raises:
- EventParsingError: If the event data is invalid
- """
- try:
- return ApiGatewayEvent.from_event(event)
- except ValueError as e:
- raise EventParsingError(str(e)) from e
-
-
-def retrieve_expected_and_received_api_key(event: dict, secrets_manager: SecretsManagerWrapper) -> tuple[str, str]:
- # First check if SecretsManager got the ref api key
- try:
- expected_api_key = secrets_manager.secrets["LOGISTICS_API_GATEWAY_KEY"]
- except KeyError as e:
- err_msg = "could not find 'X_API_KEY' in secrets from SecretsManager"
- logger.error(err_msg)
- raise KeyError(err_msg) from e
- try:
- headers = event["headers"]
- except KeyError as e:
- err_msg = "No headers present in event! Cannot validate 'x-api-key'."
- logger.error(err_msg)
- raise KeyError(err_msg) from e
-
- try:
- x_api_key = headers["x-api-key"]
- except KeyError as e:
- err_msg = "Could not find 'x-api-key' in headers or the Lambda event"
- logger.error(err_msg)
- raise KeyError(err_msg) from e
-
- return expected_api_key, x_api_key
-
-
-def format_success_response(data: dict[str, Any], request_id: str, status_code: int = 200) -> dict[str, Any]:
- """
- Format a success response with consistent headers.
-
- Args:
- data: The response data
- request_id: The request ID for tracking
- status_code: The HTTP status code (allow 20x like 207)
-
- Returns:
- Formatted response dictionary
- """
- # Create response body with lambda func metadata
- response_body = {"timestamp": datetime.now().isoformat(), "requestId": request_id, "success": True}
-
- # Only add data keys that don't conflict with metadata
- for key, value in data.items():
- if key not in response_body:
- response_body[key] = value
- else:
- logger.warning(f"Key collision in response: '{key}' from data was not included")
-
- return {
- "statusCode": status_code,
- "headers": {
- "Content-Type": "application/json",
- "Access-Control-Allow-Origin": "*",
- "Access-Control-Allow-Credentials": True,
- "X-Request-ID": request_id,
- },
- "body": json.dumps(response_body, cls=DateTimeEncoder),
- }
-
-
-def format_error_response(
- error_message: str,
- error_type: ErrorType = ErrorType.INTERNAL_ERROR,
- details: Optional[dict] = None,
- traceback_info: Optional[str] = None,
- request_id: Optional[str] = None,
-) -> dict[str, Any]:
- """
- Format an error response with consistent headers and detailed error information.
-
- Args:
- error_message: The human-readable error message
- error_type: A standardized error type identifier
- details: Additional error details (optional)
- traceback_info: Optional traceback information
-
- Returns:
- Formatted error response dictionary
- """
- # Use standard error code if provided as string
- error_body = {
- "error": {
- "message": error_message,
- "status_code": error_type.code,
- "http_status": error_type.phrase,
- "type": error_type.name,
- },
- "timestamp": datetime.now(settings.tz).isoformat(),
- "success": False,
- }
-
- if details:
- error_body["error"]["details"] = details
-
- # Check if debug_mode is available
- debug_mode = settings.lambda_log_level == "DEBUG" or os.getenv("DEBUG_MODE", "0") == "1"
- if traceback_info and debug_mode:
- error_body["error"]["traceback"] = traceback_info
-
- # Quick logging
- log_msg = f"Error: {error_type.name} ({error_type.code} - {error_type.phrase}). Message: {error_message}"
- if details:
- log_msg += f" - Details: {details}"
- if request_id:
- log_msg += f" - RequestID: {request_id}"
- logger.error(log_msg)
-
- return {
- "statusCode": error_type.code,
- "headers": {
- "Content-Type": "application/json",
- "Access-Control-Allow-Origin": "*",
- "Access-Control-Allow-Credentials": True,
- "X-Request-ID": request_id if request_id else "unknown",
- },
- "body": json.dumps(error_body, cls=DateTimeEncoder),
- }
-
-
-############################################
-### Main Lambda Handler Function logic ###
-############################################
-
-
-def lambda_handler(event: dict, context):
- request_id = getattr(context, "aws_request_id", "unknown")
- logger.info(f"Processing request {request_id}")
-
- # Parse the event
- try:
- logger.info(f"Received event: {event}")
- logger.info(f"Received context: {context}")
- event_data = parse_event_data(event)
- except EventParsingError as e:
- return format_error_response(
- error_message="Error parsing event data",
- error_type=ErrorType.EVENT_PARSING_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
- except Exception as e:
- return format_error_response(
- error_message="Unexpected error when parsing event data",
- error_type=ErrorType.UNKNOWN_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
- logger.info(f"Converted Lambda event to ApiGatewayEvent({event_data})")
-
- # Use secret manager to get api keys
- try:
- secrets_manager = SecretsManagerWrapper()
- except CredentialsSetupError as e:
- logger.error(f"Error getting credentials using SecretsManager: {e}")
- return format_error_response(
- error_message="Error getting credentials using SecretsManager",
- error_type=ErrorType.CREDENTIALS_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
-
- # Update the logging level based on settings
- try:
- sm_log_level = secrets_manager.secrets["LAMBDA_LOG_LEVEL"]
- if sm_log_level != getattr(logging, settings.lambda_log_level):
- logger.info(f"Updating the logger's level to {sm_log_level}")
- logger.setLevel(sm_log_level)
- for handler in logger.handlers:
- handler.setLevel(sm_log_level)
- except KeyError as e:
- err_msg = f"Missing 'LAMBDA_LOG_LEVEL' secret in SecretsManager: {e!s}"
- return format_error_response(
- error_message=err_msg,
- error_type=ErrorType.CREDENTIALS_ERROR,
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
-
- # Validate the API key of request to API Gateway
- try:
- expected_api_key, x_api_key = retrieve_expected_and_received_api_key(event, secrets_manager)
- except KeyError as e:
- return format_error_response(
- error_message="Error retrieving API keys for validation",
- error_type=ErrorType.UNAUTHORIZED,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
-
- if expected_api_key != x_api_key:
- err_msg = f"Received x-api-key={x_api_key} is incorrect."
- logger.error(err_msg)
- return format_error_response(
- error_message=err_msg,
- error_type=ErrorType.UNAUTHORIZED,
- details={"expected_api_key": expected_api_key, "received_api_key": x_api_key},
- request_id=request_id,
- traceback_info=traceback.format_exc(),
- )
-
- # Setup the db connection
- db = None
- try:
+ def execute(self) -> Dict[str, Any]:
+ """Execute the main Lambda logic and return the formatted response."""
try:
- db = (
- MysqlDriver(dotenv_file=Path(".env"))
- if settings.use_local_db
- else MysqlDriver(secrets_manager=secrets_manager)
+ logger.info(f"Starting execution for request: {self.request_id}")
+
+ # 1. Initialization & Secrets
+ self.secrets_manager = self._init_secrets_manager()
+ self._configure_logging_from_secrets()
+
+ # 2. Validation
+ event_data = self._parse_and_validate_event()
+ self._validate_api_key()
+
+ # 3. Data Retrieval (NASA API)
+ nasa_data = self._fetch_nasa_data(event_data)
+
+ # 4. Database Operations
+ migrations = self._fetch_db_migrations()
+
+ # 5. Success Response
+ return LambdaResponseFormatter.success(
+ data={
+ "message": "Lambda execution successful",
+ "latest_migrations": migrations,
+ "nasa_apod": nasa_data,
+ },
+ request_id=self.request_id,
)
+
+ except EventParsingError as e:
+ return LambdaResponseFormatter.error(str(e), ErrorType.EVENT_PARSING_ERROR, request_id=self.request_id)
+ except LambdaValidationError as e:
+ return LambdaResponseFormatter.error(str(e), ErrorType.UNAUTHORIZED, request_id=self.request_id)
+ except CredentialsSetupError as e:
+ return LambdaResponseFormatter.error(str(e), ErrorType.CREDENTIALS_ERROR, request_id=self.request_id)
+ except DbRelatedError as e:
+ return LambdaResponseFormatter.error(str(e), ErrorType.DATABASE_ERROR, request_id=self.request_id)
+ except ExternalApiError as e:
+ return LambdaResponseFormatter.error(str(e), ErrorType.EXTERNAL_API_ERROR, request_id=self.request_id)
except Exception as e:
- return format_error_response(
- "Error instantiating MySQL db driver.",
- error_type=ErrorType.DATABASE_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
+ logger.exception(f"Unhandled exception: {e}")
+ return LambdaResponseFormatter.error("Internal Server Error", ErrorType.INTERNAL_ERROR, request_id=self.request_id)
+ finally:
+ self._cleanup()
- # Test the connection to db according to env
+ def _init_secrets_manager(self) -> SecretsManagerWrapper:
try:
- logger.info(
- f"Excuting: 'SELECT * FROM SequelizeMeta' to test connection to {secrets_manager.secrets.get('RDS_DB_NAME', 'UNKNOWN_ENV')} database..."
- )
- with (
- db.transaction() as connection,
- connection.cursor(DictCursor) as cursor,
- ):
- cursor.execute("SELECT * FROM SequelizeMeta")
- migrations = cursor.fetchall()
- logger.info("Connection successfull!")
+ return SecretsManagerWrapper()
except Exception as e:
- logger.error(f"Error connecting to DB and processing data: {e}")
- return format_error_response(
- error_message="Error connecting to DB and processing data",
- error_type=ErrorType.DATABASE_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
+ raise CredentialsSetupError(f"Failed to initialize Secrets Manager: {e}") from e
- # Only save last five migrations
- migrations = migrations[:5]
+ def _configure_logging_from_secrets(self):
+ log_level_name = self.secrets_manager.secrets.get("LAMBDA_LOG_LEVEL", settings.lambda_log_level)
+ level = getattr(logging, log_level_name.upper(), logging.INFO)
+ logger.setLevel(level)
+ for handler in logger.handlers:
+ handler.setLevel(level)
- # Use service to make the API call
+ def _parse_and_validate_event(self) -> ApiGatewayEvent:
try:
- external_api_serv = ExternalApiService()
+ return ApiGatewayEvent.from_event(self.event)
except Exception as e:
- return format_error_response(
- error_message="Error instantiating ExternalApiService",
- error_type=ErrorType.CREDENTIALS_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
+ raise EventParsingError(f"Invalid event data: {e}") from e
+
+ def _validate_api_key(self):
+ # SECURE: We no longer return the expected key in the error response
+ expected_key = self.secrets_manager.secrets.get("LOGISTICS_API_GATEWAY_KEY")
+ received_key = self.event.get("headers", {}).get("x-api-key")
+
+ if not expected_key:
+ raise CredentialsSetupError("API Gateway key not found in secrets")
+
+ if expected_key != received_key:
+ # TODO: Consider logging the masked received key for debugging
+ raise LambdaValidationError("Unauthorized: Invalid API Key")
+
+ def _fetch_nasa_data(self, event_data: ApiGatewayEvent) -> Dict[str, Any]:
+ nasa_service = NasaService()
+ api_key = self.secrets_manager.secrets.get("NASA_API_KEY", "DEMO_KEY")
+ apod_date = event_data.query_string_parameters.get("someDate")
- # Test external API call to NASA APOD API
try:
- nasa_api_key = secrets_manager.secrets.get("NASA_API_KEY", "DEMO_KEY")
- some_date = event_data.query_string_parameters.get("someDate", date.today().isoformat())
- base_url = "https://api.nasa.gov/planetary/apod"
- params = {"api_key": nasa_api_key, "date": some_date}
- external_api_response = external_api_serv.fetch(url=base_url, params=params)
-
- logger.info(f"External API response: {external_api_response}")
- except (HTTPError, RequestException) as e:
- return format_error_response(
- error_message="Error making external API call",
- error_type=ErrorType.EXTERNAL_API_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
+ raw_data = nasa_service.get_astronomy_picture_of_the_day(api_key, apod_date)
+ # Validate with Pydantic model
+ validated_response = NasaApiResponse(
+ data=raw_data,
+ status="success",
+ message="Fetched from NASA successfully",
)
+ return validated_response.model_dump()
+ except (HTTPError, RequestException, ValidationError) as e:
+ raise ExternalApiError(f"NASA API error: {e}") from e
- # Prepare successful response data
+ def _fetch_db_migrations(self) -> list:
try:
- apod_data = NasaApiResponse(
- data=external_api_response,
- status="success",
- message="Fetched data from NASA APOD API successfully",
- ).model_dump()
- except ValidationError as e:
- return format_error_response(
- error_message="Error validating external API response",
- error_type=ErrorType.VALIDATION_ERROR,
- details={"message": str(e), "errors": e.errors()},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
+ self.db_driver = (
+ MysqlDriver(dotenv_file=Path(".env"))
+ if settings.use_local_db
+ else MysqlDriver(secrets_manager=self.secrets_manager)
)
+ repo = MigrationRepository(self.db_driver)
+ return repo.get_latest_migrations(limit=5)
except Exception as e:
- return format_error_response(
- error_message="Unexpected error validating external API response",
- error_type=ErrorType.UNKNOWN_ERROR,
- details={"message": str(e)},
- traceback_info=traceback.format_exc(),
- request_id=request_id,
- )
- response_data = {
- "message": "Lambda execution successful",
- "lastestMigrations": migrations,
- "apod": apod_data,
- }
+ raise DbRelatedError(f"Database error: {e}") from e
- return format_success_response(
- data=response_data,
- request_id=request_id,
- status_code=200,
- )
-
- finally:
- # Close database connections
- if db:
+ def _cleanup(self):
+ if self.db_driver:
try:
- db.close_all()
- logger.info("Successfully closed all database connections")
+ self.db_driver.close_all()
except Exception as e:
- logger.error(f"Error closing database connections: {e!s}")
+ logger.error(f"Error closing DB connections: {e}")
+
+
+def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
+ """Entry point for the Lambda function."""
+ manager = LambdaManager(event, context)
+ return manager.execute()
diff --git a/models/exceptions.py b/models/exceptions.py
new file mode 100644
index 0000000..bde025a
--- /dev/null
+++ b/models/exceptions.py
@@ -0,0 +1,40 @@
+class LambdaConfigError(Exception):
+ """Exception raised for configuration errors in the Lambda function."""
+
+ pass
+
+
+class CredentialsSetupError(Exception):
+ """Exception raised for errors in setting up credentials."""
+
+ pass
+
+
+class DbRelatedError(Exception):
+ """Exception raised for database-related errors."""
+
+ pass
+
+
+class EventParsingError(Exception):
+ """Exception raised for errors in parsing the event object."""
+
+ pass
+
+
+class LambdaLogicError(Exception):
+ """Exception raised for errors in the Lambda function's business logic."""
+
+ pass
+
+
+class LambdaValidationError(Exception):
+ """Exception raised for input validation errors."""
+
+ pass
+
+
+class ExternalApiError(Exception):
+ """Exception raised for errors related to external API calls."""
+
+ pass
diff --git a/pyproject.toml b/pyproject.toml
index d18430a..b903026 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -10,6 +10,7 @@ dependencies = [
"pydantic-settings>=2.10.1",
"pymysql>=1.1.2",
"requests>=2.32.5",
+ "pytest>=8.0.0",
]
diff --git a/services/migrations.py b/services/migrations.py
new file mode 100644
index 0000000..9a81c46
--- /dev/null
+++ b/services/migrations.py
@@ -0,0 +1,41 @@
+from typing import Any, Dict, List, Optional
+
+from pymysql.cursors import DictCursor
+
+from services.db import MysqlDriver
+from utils.logger import configure_logger
+
+logger = configure_logger(__name__)
+
+
+class MigrationRepository:
+ """Repository to interact with database migration records."""
+
+ def __init__(self, db_driver: MysqlDriver):
+ self.db_driver = db_driver
+
+ def get_latest_migrations(self, limit: int = 5) -> List[Dict[str, Any]]:
+ """
+ Fetch the latest database migration records from the SequelizeMeta table.
+
+ Args:
+ limit: Maximum number of records to return
+
+ Returns:
+ List of migration records
+
+ Raises:
+ Exception: If database connection or query fails
+ """
+ try:
+ logger.info(f"Fetching latest {limit} migrations from SequelizeMeta")
+ with self.db_driver.transaction() as connection:
+ with connection.cursor(DictCursor) as cursor:
+ # Using parameterization to avoid SQL injection, though not strictly needed here
+ query = "SELECT * FROM SequelizeMeta ORDER BY name DESC LIMIT %s"
+ cursor.execute(query, (limit,))
+ results = cursor.fetchall()
+ return results
+ except Exception as e:
+ logger.error(f"Error fetching migrations: {e}")
+ raise
diff --git a/services/nasa.py b/services/nasa.py
new file mode 100644
index 0000000..a984bf9
--- /dev/null
+++ b/services/nasa.py
@@ -0,0 +1,47 @@
+from datetime import date
+from typing import Any, Dict, Optional
+
+from requests.exceptions import RequestException
+from requests.models import HTTPError
+
+from models.response import NasaApiResponse
+from services.rest_api import ExternalApiService
+from utils.logger import configure_logger
+
+logger = configure_logger(__name__)
+
+
+class NasaService:
+ """Service to interact with NASA APIs."""
+
+ def __init__(self, api_service: Optional[ExternalApiService] = None):
+ self.api_service = api_service or ExternalApiService()
+ self.base_url = "https://api.nasa.gov/planetary/apod"
+
+ def get_astronomy_picture_of_the_day(self, api_key: str, apod_date: Optional[str] = None) -> Dict[str, Any]:
+ """
+ Fetch the Astronomy Picture of the Day (APOD) from NASA.
+
+ Args:
+ api_key: NASA API key
+ apod_date: Date in YYYY-MM-DD format (default is today)
+
+ Returns:
+ Dict containing APOD data
+
+ Raises:
+ HTTPError: If the API request fails
+ RequestException: For other request-related errors
+ """
+ if not apod_date:
+ apod_date = date.today().isoformat()
+
+ params = {"api_key": api_key, "date": apod_date}
+
+ try:
+ logger.info(f"Fetching NASA APOD for date: {apod_date}")
+ response_data = self.api_service.fetch(url=self.base_url, params=params)
+ return response_data
+ except (HTTPError, RequestException) as e:
+ logger.error(f"Failed to fetch data from NASA APOD API: {e}")
+ raise
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_lambda_function.py b/tests/test_lambda_function.py
new file mode 100644
index 0000000..e7cfaa4
--- /dev/null
+++ b/tests/test_lambda_function.py
@@ -0,0 +1,52 @@
+import unittest
+from unittest.mock import MagicMock, patch
+from lambda_function import LambdaManager, lambda_handler
+from models.response import ErrorType
+
+class TestLambdaFunction(unittest.TestCase):
+ def setUp(self):
+ self.mock_context = MagicMock()
+ self.mock_context.aws_request_id = "test-request-id"
+ self.event = {
+ "headers": {"x-api-key": "test-key"},
+ "queryStringParameters": {"someDate": "2023-01-01"}
+ }
+
+ @patch("lambda_function.SecretsManagerWrapper")
+ @patch("lambda_function.NasaService")
+ @patch("lambda_function.MysqlDriver")
+ @patch("lambda_function.MigrationRepository")
+ def test_lambda_handler_success(self, mock_repo, mock_db, mock_nasa, mock_secrets):
+ # Setup mocks
+ mock_secrets_inst = mock_secrets.return_value
+ mock_secrets_inst.secrets = {
+ "LOGISTICS_API_GATEWAY_KEY": "test-key",
+ "NASA_API_KEY": "nasa-key",
+ "LAMBDA_LOG_LEVEL": "INFO"
+ }
+
+ mock_nasa_inst = mock_nasa.return_value
+ mock_nasa_inst.get_astronomy_picture_of_the_day.return_value = {"url": "http://example.com"}
+
+ mock_repo_inst = mock_repo.return_value
+ mock_repo_inst.get_latest_migrations.return_value = [{"name": "001_init"}]
+
+ # Execute
+ response = lambda_handler(self.event, self.mock_context)
+
+ # Assert
+ self.assertEqual(response["statusCode"], 200)
+ self.assertIn("data", response["body"])
+
+ def test_lambda_manager_init(self):
+ manager = LambdaManager(self.event, self.mock_context)
+ self.assertEqual(manager.request_id, "test-request-id")
+ self.assertIsNone(manager.db_driver)
+
+ # TODO: Add more tests for error cases
+ def test_placeholder(self):
+ """A placeholder test that should be replaced with real test cases."""
+ self.assertTrue(True)
+
+if __name__ == "__main__":
+ unittest.main()