From cea1c0a5c7a0f3b845954eb477e10543f8bee7e9 Mon Sep 17 00:00:00 2001 From: Simon Lalonde Date: Fri, 6 Mar 2026 11:22:16 -0500 Subject: [PATCH] Refac NASA service and test impl --- .coderabbit.yaml | 4 + .idea/.gitignore | 10 + .idea/aws-lambda-python-template.iml | 9 + .idea/misc.xml | 9 + .idea/modules.xml | 8 + .idea/vcs.xml | 7 + .pr_agent.toml | 2 + README.md | 43 ++ lambda_function.py | 563 ++++++++------------------- models/exceptions.py | 40 ++ pyproject.toml | 1 + services/migrations.py | 41 ++ services/nasa.py | 47 +++ tests/__init__.py | 0 tests/test_lambda_function.py | 52 +++ 15 files changed, 443 insertions(+), 393 deletions(-) create mode 100644 .coderabbit.yaml create mode 100644 .idea/.gitignore create mode 100644 .idea/aws-lambda-python-template.iml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .pr_agent.toml create mode 100644 models/exceptions.py create mode 100644 services/migrations.py create mode 100644 services/nasa.py create mode 100644 tests/__init__.py create mode 100644 tests/test_lambda_function.py diff --git a/.coderabbit.yaml b/.coderabbit.yaml new file mode 100644 index 0000000..a491d6d --- /dev/null +++ b/.coderabbit.yaml @@ -0,0 +1,4 @@ +# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json +reviews: + auto_review: + enabled: false diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..ab1f416 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,10 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Ignored default folder with query files +/queries/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/aws-lambda-python-template.iml b/.idea/aws-lambda-python-template.iml new file mode 100644 index 0000000..d6ebd48 --- /dev/null +++ b/.idea/aws-lambda-python-template.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..18daf47 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..7b94cec --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..8306744 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/.pr_agent.toml b/.pr_agent.toml new file mode 100644 index 0000000..0f665cf --- /dev/null +++ b/.pr_agent.toml @@ -0,0 +1,2 @@ +[config] +disable_auto_feedback = true diff --git a/README.md b/README.md index 99fa4d4..caceeb8 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,17 @@ A template for creating AWS Lambda functions with Python, using Docker images st ├── .github/workflows # Example GitHub Actions workflows ├── cloudformation # Example CloudFormation templates ├── models # Pydantic models for data validation +│ ├── event.py # API Gateway event model +│ ├── exceptions.py # Custom exceptions +│ └── response.py # Standardized response models ├── services # Business logic and external service integrations +│ ├── aws.py # AWS service wrappers (Secrets Manager) +│ ├── db.py # MySQL database driver +│ ├── migrations.py # Migration repository +│ ├── nasa.py # NASA API service +│ └── rest_api.py # Base REST API service +├── tests # Unit tests +│ └── test_lambda_function.py ├── utils # Utility functions (e.g., logger, config) ├── .pre-commit-config.yaml # Configuration for pre-commit hooks ├── debug_lambda.py # Script for running the Lambda function locally @@ -33,6 +43,39 @@ A template for creating AWS Lambda functions with Python, using Docker images st └── README.md # This file ``` +## Testing + +This template uses `pytest` and `unittest` for testing. + +### Running Tests + +To run the unit tests: + +```bash +uv run python -m unittest discover tests +``` + +Or if you prefer `pytest`: + +```bash +uv run pytest +``` + +## Code Review Challenge + +This template is designed to test AI-powered code review tools (like Junie, Copilot, or Qodo). It includes: +- A modular structure that allows for architectural reviews. +- Strategic use of type hints. +- Centralized error handling. +- Placeholder tests and TODOs to see if they are flagged. +- A "Manager" pattern that can be critiqued. + +### Areas for Reviewers to Focus On: +1. **Architecture:** Is the `LambdaManager` too heavy? Should the validation logic be more decoupled? +2. **Security:** Are secrets handled correctly? (Check `_validate_api_key` in `lambda_function.py`). +3. **Typing:** Are all `Any` types necessary? +4. **Testing:** Is the test coverage sufficient? Are the mocks appropriate? + ## Getting Started ### Prerequisites diff --git a/lambda_function.py b/lambda_function.py index 78edf8f..2d47474 100644 --- a/lambda_function.py +++ b/lambda_function.py @@ -1,444 +1,221 @@ import json import logging -import os import traceback from datetime import date, datetime from pathlib import Path -from typing import Any, Optional +from typing import Any, Dict, Optional from pydantic import ValidationError -from pymysql.cursors import DictCursor from requests.exceptions import RequestException from requests.models import HTTPError -from models.response import ErrorType, NasaApiResponse from models.event import ApiGatewayEvent +from models.exceptions import ( + CredentialsSetupError, + DbRelatedError, + EventParsingError, + ExternalApiError, + LambdaValidationError, +) +from models.response import ErrorType, NasaApiResponse from services.aws import SecretsManagerWrapper from services.db import MysqlDriver -from services.rest_api import ExternalApiService +from services.migrations import MigrationRepository +from services.nasa import NasaService from utils.config import settings from utils.logger import configure_logger logger = configure_logger(Path(__file__).stem) -############################################ -### Custom Exceptions for the Lambda App ### -############################################ - - -class LambdaConfigError(Exception): - """Exception raised for configuration errors in the Lambda function.""" - - pass - - -class CredentialsSetupError(Exception): - """Exception raised for errors in setting up credentials.""" - - pass - - -class DbRelatedError(Exception): - """Exception raised for database-related errors.""" - - pass - - -class EventParsingError(Exception): - """Exception raised for errors in parsing the event object.""" - - pass - - -class LambdaLogicError(Exception): - """Exception raised for errors in the Lambda function's business logic.""" - - pass - - -class LambdaValidationError(Exception): - """Exception raised for input validation errors.""" - - pass - +class DateTimeEncoder(json.JSONEncoder): + """Custom JSON encoder to handle date and datetime objects.""" -class ExternalApiError(Exception): - """Exception raised for errors related to external API calls.""" + def default(self, o): + if isinstance(o, (datetime, date)): + return o.isoformat() + return super().default(o) - pass +class LambdaResponseFormatter: + """Utility class to format Lambda responses consistently.""" -# Define standard error codes for different error types -ERROR_CODES = { - "INVALID_INPUT": 400, - "UNAUTHORIZED": 401, - "RESOURCE_NOT_FOUND": 404, - "VALIDATION_ERROR": 422, - "DATABASE_ERROR": 500, - "CONFIG_ERROR": 500, - "INTERNAL_ERROR": 500, - "TIMEOUT_ERROR": 504, - "EXTERNAL_API_ERROR": 502, - "UNKNOWN_ERROR": 500, - "EVENT_PARSING_ERROR": 400, - "CREDENTIALS_SETUP_ERROR": 500, -} + @staticmethod + def success(data: Dict[str, Any], request_id: str, status_code: int = 200) -> Dict[str, Any]: + response_body = { + "timestamp": datetime.now(settings.tz).isoformat(), + "requestId": request_id, + "success": True, + "data": data, + } + return { + "statusCode": status_code, + "headers": { + "Content-Type": "application/json", + "X-Request-ID": request_id, + }, + "body": json.dumps(response_body, cls=DateTimeEncoder), + } + @staticmethod + def error( + message: str, + error_type: ErrorType = ErrorType.INTERNAL_ERROR, + details: Optional[Dict[str, Any]] = None, + request_id: str = "unknown", + ) -> Dict[str, Any]: + error_body = { + "success": False, + "timestamp": datetime.now(settings.tz).isoformat(), + "requestId": request_id, + "error": { + "type": error_type.name, + "message": message, + "code": error_type.code, + }, + } + if details: + error_body["error"]["details"] = details -############################################ -######## Helper functions ####### -############################################ + if settings.lambda_log_level == "DEBUG": + error_body["error"]["stacktrace"] = traceback.format_exc() + logger.error(f"Error {error_type.name}: {message} (Request ID: {request_id})") -class DateTimeEncoder(json.JSONEncoder): - """ - Custom JSON encoder to handle date and datetime objects. + return { + "statusCode": error_type.code, + "headers": { + "Content-Type": "application/json", + "X-Request-ID": request_id, + }, + "body": json.dumps(error_body, cls=DateTimeEncoder), + } - Converts dates and datetimes to ISO format string. - """ - def default(self, o): - if isinstance(o, (datetime, date)): - return o.isoformat() - return super().default(o) +class LambdaManager: + """Main orchestrator for the Lambda function logic.""" + def __init__(self, event: Dict[str, Any], context: Any): + self.event = event + self.context = context + self.request_id = getattr(context, "aws_request_id", "unknown") + self.secrets_manager = None + self.db_driver = None -def parse_event_data(event: dict) -> ApiGatewayEvent: - """ - Parse and validate the Lambda event data. - - Args: - event: The Lambda event dictionary containing trigger information - - Returns: - ApiGatewayEvent: A structured object containing parsed event data - - Raises: - EventParsingError: If the event data is invalid - """ - try: - return ApiGatewayEvent.from_event(event) - except ValueError as e: - raise EventParsingError(str(e)) from e - - -def retrieve_expected_and_received_api_key(event: dict, secrets_manager: SecretsManagerWrapper) -> tuple[str, str]: - # First check if SecretsManager got the ref api key - try: - expected_api_key = secrets_manager.secrets["LOGISTICS_API_GATEWAY_KEY"] - except KeyError as e: - err_msg = "could not find 'X_API_KEY' in secrets from SecretsManager" - logger.error(err_msg) - raise KeyError(err_msg) from e - try: - headers = event["headers"] - except KeyError as e: - err_msg = "No headers present in event! Cannot validate 'x-api-key'." - logger.error(err_msg) - raise KeyError(err_msg) from e - - try: - x_api_key = headers["x-api-key"] - except KeyError as e: - err_msg = "Could not find 'x-api-key' in headers or the Lambda event" - logger.error(err_msg) - raise KeyError(err_msg) from e - - return expected_api_key, x_api_key - - -def format_success_response(data: dict[str, Any], request_id: str, status_code: int = 200) -> dict[str, Any]: - """ - Format a success response with consistent headers. - - Args: - data: The response data - request_id: The request ID for tracking - status_code: The HTTP status code (allow 20x like 207) - - Returns: - Formatted response dictionary - """ - # Create response body with lambda func metadata - response_body = {"timestamp": datetime.now().isoformat(), "requestId": request_id, "success": True} - - # Only add data keys that don't conflict with metadata - for key, value in data.items(): - if key not in response_body: - response_body[key] = value - else: - logger.warning(f"Key collision in response: '{key}' from data was not included") - - return { - "statusCode": status_code, - "headers": { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Credentials": True, - "X-Request-ID": request_id, - }, - "body": json.dumps(response_body, cls=DateTimeEncoder), - } - - -def format_error_response( - error_message: str, - error_type: ErrorType = ErrorType.INTERNAL_ERROR, - details: Optional[dict] = None, - traceback_info: Optional[str] = None, - request_id: Optional[str] = None, -) -> dict[str, Any]: - """ - Format an error response with consistent headers and detailed error information. - - Args: - error_message: The human-readable error message - error_type: A standardized error type identifier - details: Additional error details (optional) - traceback_info: Optional traceback information - - Returns: - Formatted error response dictionary - """ - # Use standard error code if provided as string - error_body = { - "error": { - "message": error_message, - "status_code": error_type.code, - "http_status": error_type.phrase, - "type": error_type.name, - }, - "timestamp": datetime.now(settings.tz).isoformat(), - "success": False, - } - - if details: - error_body["error"]["details"] = details - - # Check if debug_mode is available - debug_mode = settings.lambda_log_level == "DEBUG" or os.getenv("DEBUG_MODE", "0") == "1" - if traceback_info and debug_mode: - error_body["error"]["traceback"] = traceback_info - - # Quick logging - log_msg = f"Error: {error_type.name} ({error_type.code} - {error_type.phrase}). Message: {error_message}" - if details: - log_msg += f" - Details: {details}" - if request_id: - log_msg += f" - RequestID: {request_id}" - logger.error(log_msg) - - return { - "statusCode": error_type.code, - "headers": { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Credentials": True, - "X-Request-ID": request_id if request_id else "unknown", - }, - "body": json.dumps(error_body, cls=DateTimeEncoder), - } - - -############################################ -### Main Lambda Handler Function logic ### -############################################ - - -def lambda_handler(event: dict, context): - request_id = getattr(context, "aws_request_id", "unknown") - logger.info(f"Processing request {request_id}") - - # Parse the event - try: - logger.info(f"Received event: {event}") - logger.info(f"Received context: {context}") - event_data = parse_event_data(event) - except EventParsingError as e: - return format_error_response( - error_message="Error parsing event data", - error_type=ErrorType.EVENT_PARSING_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) - except Exception as e: - return format_error_response( - error_message="Unexpected error when parsing event data", - error_type=ErrorType.UNKNOWN_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) - logger.info(f"Converted Lambda event to ApiGatewayEvent({event_data})") - - # Use secret manager to get api keys - try: - secrets_manager = SecretsManagerWrapper() - except CredentialsSetupError as e: - logger.error(f"Error getting credentials using SecretsManager: {e}") - return format_error_response( - error_message="Error getting credentials using SecretsManager", - error_type=ErrorType.CREDENTIALS_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) - - # Update the logging level based on settings - try: - sm_log_level = secrets_manager.secrets["LAMBDA_LOG_LEVEL"] - if sm_log_level != getattr(logging, settings.lambda_log_level): - logger.info(f"Updating the logger's level to {sm_log_level}") - logger.setLevel(sm_log_level) - for handler in logger.handlers: - handler.setLevel(sm_log_level) - except KeyError as e: - err_msg = f"Missing 'LAMBDA_LOG_LEVEL' secret in SecretsManager: {e!s}" - return format_error_response( - error_message=err_msg, - error_type=ErrorType.CREDENTIALS_ERROR, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) - - # Validate the API key of request to API Gateway - try: - expected_api_key, x_api_key = retrieve_expected_and_received_api_key(event, secrets_manager) - except KeyError as e: - return format_error_response( - error_message="Error retrieving API keys for validation", - error_type=ErrorType.UNAUTHORIZED, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) - - if expected_api_key != x_api_key: - err_msg = f"Received x-api-key={x_api_key} is incorrect." - logger.error(err_msg) - return format_error_response( - error_message=err_msg, - error_type=ErrorType.UNAUTHORIZED, - details={"expected_api_key": expected_api_key, "received_api_key": x_api_key}, - request_id=request_id, - traceback_info=traceback.format_exc(), - ) - - # Setup the db connection - db = None - try: + def execute(self) -> Dict[str, Any]: + """Execute the main Lambda logic and return the formatted response.""" try: - db = ( - MysqlDriver(dotenv_file=Path(".env")) - if settings.use_local_db - else MysqlDriver(secrets_manager=secrets_manager) + logger.info(f"Starting execution for request: {self.request_id}") + + # 1. Initialization & Secrets + self.secrets_manager = self._init_secrets_manager() + self._configure_logging_from_secrets() + + # 2. Validation + event_data = self._parse_and_validate_event() + self._validate_api_key() + + # 3. Data Retrieval (NASA API) + nasa_data = self._fetch_nasa_data(event_data) + + # 4. Database Operations + migrations = self._fetch_db_migrations() + + # 5. Success Response + return LambdaResponseFormatter.success( + data={ + "message": "Lambda execution successful", + "latest_migrations": migrations, + "nasa_apod": nasa_data, + }, + request_id=self.request_id, ) + + except EventParsingError as e: + return LambdaResponseFormatter.error(str(e), ErrorType.EVENT_PARSING_ERROR, request_id=self.request_id) + except LambdaValidationError as e: + return LambdaResponseFormatter.error(str(e), ErrorType.UNAUTHORIZED, request_id=self.request_id) + except CredentialsSetupError as e: + return LambdaResponseFormatter.error(str(e), ErrorType.CREDENTIALS_ERROR, request_id=self.request_id) + except DbRelatedError as e: + return LambdaResponseFormatter.error(str(e), ErrorType.DATABASE_ERROR, request_id=self.request_id) + except ExternalApiError as e: + return LambdaResponseFormatter.error(str(e), ErrorType.EXTERNAL_API_ERROR, request_id=self.request_id) except Exception as e: - return format_error_response( - "Error instantiating MySQL db driver.", - error_type=ErrorType.DATABASE_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) + logger.exception(f"Unhandled exception: {e}") + return LambdaResponseFormatter.error("Internal Server Error", ErrorType.INTERNAL_ERROR, request_id=self.request_id) + finally: + self._cleanup() - # Test the connection to db according to env + def _init_secrets_manager(self) -> SecretsManagerWrapper: try: - logger.info( - f"Excuting: 'SELECT * FROM SequelizeMeta' to test connection to {secrets_manager.secrets.get('RDS_DB_NAME', 'UNKNOWN_ENV')} database..." - ) - with ( - db.transaction() as connection, - connection.cursor(DictCursor) as cursor, - ): - cursor.execute("SELECT * FROM SequelizeMeta") - migrations = cursor.fetchall() - logger.info("Connection successfull!") + return SecretsManagerWrapper() except Exception as e: - logger.error(f"Error connecting to DB and processing data: {e}") - return format_error_response( - error_message="Error connecting to DB and processing data", - error_type=ErrorType.DATABASE_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) + raise CredentialsSetupError(f"Failed to initialize Secrets Manager: {e}") from e - # Only save last five migrations - migrations = migrations[:5] + def _configure_logging_from_secrets(self): + log_level_name = self.secrets_manager.secrets.get("LAMBDA_LOG_LEVEL", settings.lambda_log_level) + level = getattr(logging, log_level_name.upper(), logging.INFO) + logger.setLevel(level) + for handler in logger.handlers: + handler.setLevel(level) - # Use service to make the API call + def _parse_and_validate_event(self) -> ApiGatewayEvent: try: - external_api_serv = ExternalApiService() + return ApiGatewayEvent.from_event(self.event) except Exception as e: - return format_error_response( - error_message="Error instantiating ExternalApiService", - error_type=ErrorType.CREDENTIALS_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) + raise EventParsingError(f"Invalid event data: {e}") from e + + def _validate_api_key(self): + # SECURE: We no longer return the expected key in the error response + expected_key = self.secrets_manager.secrets.get("LOGISTICS_API_GATEWAY_KEY") + received_key = self.event.get("headers", {}).get("x-api-key") + + if not expected_key: + raise CredentialsSetupError("API Gateway key not found in secrets") + + if expected_key != received_key: + # TODO: Consider logging the masked received key for debugging + raise LambdaValidationError("Unauthorized: Invalid API Key") + + def _fetch_nasa_data(self, event_data: ApiGatewayEvent) -> Dict[str, Any]: + nasa_service = NasaService() + api_key = self.secrets_manager.secrets.get("NASA_API_KEY", "DEMO_KEY") + apod_date = event_data.query_string_parameters.get("someDate") - # Test external API call to NASA APOD API try: - nasa_api_key = secrets_manager.secrets.get("NASA_API_KEY", "DEMO_KEY") - some_date = event_data.query_string_parameters.get("someDate", date.today().isoformat()) - base_url = "https://api.nasa.gov/planetary/apod" - params = {"api_key": nasa_api_key, "date": some_date} - external_api_response = external_api_serv.fetch(url=base_url, params=params) - - logger.info(f"External API response: {external_api_response}") - except (HTTPError, RequestException) as e: - return format_error_response( - error_message="Error making external API call", - error_type=ErrorType.EXTERNAL_API_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, + raw_data = nasa_service.get_astronomy_picture_of_the_day(api_key, apod_date) + # Validate with Pydantic model + validated_response = NasaApiResponse( + data=raw_data, + status="success", + message="Fetched from NASA successfully", ) + return validated_response.model_dump() + except (HTTPError, RequestException, ValidationError) as e: + raise ExternalApiError(f"NASA API error: {e}") from e - # Prepare successful response data + def _fetch_db_migrations(self) -> list: try: - apod_data = NasaApiResponse( - data=external_api_response, - status="success", - message="Fetched data from NASA APOD API successfully", - ).model_dump() - except ValidationError as e: - return format_error_response( - error_message="Error validating external API response", - error_type=ErrorType.VALIDATION_ERROR, - details={"message": str(e), "errors": e.errors()}, - traceback_info=traceback.format_exc(), - request_id=request_id, + self.db_driver = ( + MysqlDriver(dotenv_file=Path(".env")) + if settings.use_local_db + else MysqlDriver(secrets_manager=self.secrets_manager) ) + repo = MigrationRepository(self.db_driver) + return repo.get_latest_migrations(limit=5) except Exception as e: - return format_error_response( - error_message="Unexpected error validating external API response", - error_type=ErrorType.UNKNOWN_ERROR, - details={"message": str(e)}, - traceback_info=traceback.format_exc(), - request_id=request_id, - ) - response_data = { - "message": "Lambda execution successful", - "lastestMigrations": migrations, - "apod": apod_data, - } + raise DbRelatedError(f"Database error: {e}") from e - return format_success_response( - data=response_data, - request_id=request_id, - status_code=200, - ) - - finally: - # Close database connections - if db: + def _cleanup(self): + if self.db_driver: try: - db.close_all() - logger.info("Successfully closed all database connections") + self.db_driver.close_all() except Exception as e: - logger.error(f"Error closing database connections: {e!s}") + logger.error(f"Error closing DB connections: {e}") + + +def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: + """Entry point for the Lambda function.""" + manager = LambdaManager(event, context) + return manager.execute() diff --git a/models/exceptions.py b/models/exceptions.py new file mode 100644 index 0000000..bde025a --- /dev/null +++ b/models/exceptions.py @@ -0,0 +1,40 @@ +class LambdaConfigError(Exception): + """Exception raised for configuration errors in the Lambda function.""" + + pass + + +class CredentialsSetupError(Exception): + """Exception raised for errors in setting up credentials.""" + + pass + + +class DbRelatedError(Exception): + """Exception raised for database-related errors.""" + + pass + + +class EventParsingError(Exception): + """Exception raised for errors in parsing the event object.""" + + pass + + +class LambdaLogicError(Exception): + """Exception raised for errors in the Lambda function's business logic.""" + + pass + + +class LambdaValidationError(Exception): + """Exception raised for input validation errors.""" + + pass + + +class ExternalApiError(Exception): + """Exception raised for errors related to external API calls.""" + + pass diff --git a/pyproject.toml b/pyproject.toml index d18430a..b903026 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "pydantic-settings>=2.10.1", "pymysql>=1.1.2", "requests>=2.32.5", + "pytest>=8.0.0", ] diff --git a/services/migrations.py b/services/migrations.py new file mode 100644 index 0000000..9a81c46 --- /dev/null +++ b/services/migrations.py @@ -0,0 +1,41 @@ +from typing import Any, Dict, List, Optional + +from pymysql.cursors import DictCursor + +from services.db import MysqlDriver +from utils.logger import configure_logger + +logger = configure_logger(__name__) + + +class MigrationRepository: + """Repository to interact with database migration records.""" + + def __init__(self, db_driver: MysqlDriver): + self.db_driver = db_driver + + def get_latest_migrations(self, limit: int = 5) -> List[Dict[str, Any]]: + """ + Fetch the latest database migration records from the SequelizeMeta table. + + Args: + limit: Maximum number of records to return + + Returns: + List of migration records + + Raises: + Exception: If database connection or query fails + """ + try: + logger.info(f"Fetching latest {limit} migrations from SequelizeMeta") + with self.db_driver.transaction() as connection: + with connection.cursor(DictCursor) as cursor: + # Using parameterization to avoid SQL injection, though not strictly needed here + query = "SELECT * FROM SequelizeMeta ORDER BY name DESC LIMIT %s" + cursor.execute(query, (limit,)) + results = cursor.fetchall() + return results + except Exception as e: + logger.error(f"Error fetching migrations: {e}") + raise diff --git a/services/nasa.py b/services/nasa.py new file mode 100644 index 0000000..a984bf9 --- /dev/null +++ b/services/nasa.py @@ -0,0 +1,47 @@ +from datetime import date +from typing import Any, Dict, Optional + +from requests.exceptions import RequestException +from requests.models import HTTPError + +from models.response import NasaApiResponse +from services.rest_api import ExternalApiService +from utils.logger import configure_logger + +logger = configure_logger(__name__) + + +class NasaService: + """Service to interact with NASA APIs.""" + + def __init__(self, api_service: Optional[ExternalApiService] = None): + self.api_service = api_service or ExternalApiService() + self.base_url = "https://api.nasa.gov/planetary/apod" + + def get_astronomy_picture_of_the_day(self, api_key: str, apod_date: Optional[str] = None) -> Dict[str, Any]: + """ + Fetch the Astronomy Picture of the Day (APOD) from NASA. + + Args: + api_key: NASA API key + apod_date: Date in YYYY-MM-DD format (default is today) + + Returns: + Dict containing APOD data + + Raises: + HTTPError: If the API request fails + RequestException: For other request-related errors + """ + if not apod_date: + apod_date = date.today().isoformat() + + params = {"api_key": api_key, "date": apod_date} + + try: + logger.info(f"Fetching NASA APOD for date: {apod_date}") + response_data = self.api_service.fetch(url=self.base_url, params=params) + return response_data + except (HTTPError, RequestException) as e: + logger.error(f"Failed to fetch data from NASA APOD API: {e}") + raise diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_lambda_function.py b/tests/test_lambda_function.py new file mode 100644 index 0000000..e7cfaa4 --- /dev/null +++ b/tests/test_lambda_function.py @@ -0,0 +1,52 @@ +import unittest +from unittest.mock import MagicMock, patch +from lambda_function import LambdaManager, lambda_handler +from models.response import ErrorType + +class TestLambdaFunction(unittest.TestCase): + def setUp(self): + self.mock_context = MagicMock() + self.mock_context.aws_request_id = "test-request-id" + self.event = { + "headers": {"x-api-key": "test-key"}, + "queryStringParameters": {"someDate": "2023-01-01"} + } + + @patch("lambda_function.SecretsManagerWrapper") + @patch("lambda_function.NasaService") + @patch("lambda_function.MysqlDriver") + @patch("lambda_function.MigrationRepository") + def test_lambda_handler_success(self, mock_repo, mock_db, mock_nasa, mock_secrets): + # Setup mocks + mock_secrets_inst = mock_secrets.return_value + mock_secrets_inst.secrets = { + "LOGISTICS_API_GATEWAY_KEY": "test-key", + "NASA_API_KEY": "nasa-key", + "LAMBDA_LOG_LEVEL": "INFO" + } + + mock_nasa_inst = mock_nasa.return_value + mock_nasa_inst.get_astronomy_picture_of_the_day.return_value = {"url": "http://example.com"} + + mock_repo_inst = mock_repo.return_value + mock_repo_inst.get_latest_migrations.return_value = [{"name": "001_init"}] + + # Execute + response = lambda_handler(self.event, self.mock_context) + + # Assert + self.assertEqual(response["statusCode"], 200) + self.assertIn("data", response["body"]) + + def test_lambda_manager_init(self): + manager = LambdaManager(self.event, self.mock_context) + self.assertEqual(manager.request_id, "test-request-id") + self.assertIsNone(manager.db_driver) + + # TODO: Add more tests for error cases + def test_placeholder(self): + """A placeholder test that should be replaced with real test cases.""" + self.assertTrue(True) + +if __name__ == "__main__": + unittest.main()