diff --git a/README.md b/README.md index 66028ad..c8e3aa2 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,14 @@ This project contains an AWS Lambda maven application with [AWS Java SDK 2.x](ht The generated function handler class just returns the input. The configured AWS Java SDK client is created in `DependencyFactory` class and you can add the code to interact with the SDK client based on your use case. +### SQS Integration +The application can optionally send data to an SQS queue after saving it to DynamoDB. This feature is controlled by the following environment variables: + +- `SQS_ENABLED`: Set to "true" to enable sending to SQS +- `SQS_QUEUE_URL`: The URL of the SQS queue to send messages to + +When enabled, the application will send the book data as a JSON message to the specified SQS queue. + #### Building the project ``` mvn clean install @@ -42,4 +50,3 @@ sam deploy --guided See [Deploying Serverless Applications](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-deploying.html) for more info. - diff --git a/pom.xml b/pom.xml index 474a221..5c91a7d 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,11 @@ url-connection-client + + software.amazon.awssdk + sqs + + com.amazonaws aws-lambda-java-events diff --git a/src/main/java/com/home/amazon/serverless/DependencyFactory.java b/src/main/java/com/home/amazon/serverless/DependencyFactory.java index dd756b0..612fd71 100644 --- a/src/main/java/com/home/amazon/serverless/DependencyFactory.java +++ b/src/main/java/com/home/amazon/serverless/DependencyFactory.java @@ -7,15 +7,18 @@ import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.sqs.SqsClient; public class DependencyFactory { public static final String ENV_VARIABLE_TABLE = "TABLE"; + public static final String ENV_VARIABLE_SQS_ENABLED = "SQS_ENABLED"; + public static final String ENV_VARIABLE_SQS_QUEUE_URL = "SQS_QUEUE_URL"; private DependencyFactory() {} /** - * @return an instance of LambdaClient + * @return an instance of DynamoDbEnhancedClient */ public static DynamoDbEnhancedClient dynamoDbEnhancedClient() { return DynamoDbEnhancedClient.builder() @@ -27,8 +30,27 @@ public static DynamoDbEnhancedClient dynamoDbEnhancedClient() { .build(); } + /** + * @return an instance of SqsClient + */ + public static SqsClient sqsClient() { + return SqsClient.builder() + .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) + .region(Region.of(System.getenv(SdkSystemSetting.AWS_REGION.environmentVariable()))) + .httpClientBuilder(UrlConnectionHttpClient.builder()) + .build(); + } + public static String tableName() { return System.getenv(ENV_VARIABLE_TABLE); } + public static boolean isSqsEnabled() { + String sqsEnabled = System.getenv(ENV_VARIABLE_SQS_ENABLED); + return sqsEnabled != null && sqsEnabled.equalsIgnoreCase("true"); + } + + public static String sqsQueueUrl() { + return System.getenv(ENV_VARIABLE_SQS_QUEUE_URL); + } } diff --git a/src/main/java/com/home/amazon/serverless/lambda/PostItemHandler.java b/src/main/java/com/home/amazon/serverless/lambda/PostItemHandler.java index aabfae3..d7fbca0 100644 --- a/src/main/java/com/home/amazon/serverless/lambda/PostItemHandler.java +++ b/src/main/java/com/home/amazon/serverless/lambda/PostItemHandler.java @@ -9,8 +9,10 @@ import com.home.amazon.serverless.model.Book; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable; -import software.amazon.awssdk.enhanced.dynamodb.Key; import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import java.util.Collections; import java.util.Map; @@ -18,13 +20,21 @@ public class PostItemHandler implements RequestHandler { private final DynamoDbEnhancedClient dbClient; + private final SqsClient sqsClient; private final String tableName; private final TableSchema bookTableSchema; + private final boolean sqsEnabled; + private final String sqsQueueUrl; + private final Gson gson; public PostItemHandler() { dbClient = DependencyFactory.dynamoDbEnhancedClient(); + sqsClient = DependencyFactory.sqsClient(); tableName = DependencyFactory.tableName(); bookTableSchema = TableSchema.fromBean(Book.class); + sqsEnabled = DependencyFactory.isSqsEnabled(); + sqsQueueUrl = DependencyFactory.sqsQueueUrl(); + gson = new Gson(); } @Override @@ -38,6 +48,22 @@ public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent in item.setAuthor(queryStringParameters.get("author")); item.setName(queryStringParameters.get("name")); booksTable.putItem(item); + + // Optionally send to SQS if enabled + if (sqsEnabled && sqsQueueUrl != null && !sqsQueueUrl.isEmpty()) { + try { + String messageBody = gson.toJson(item); + SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(sqsQueueUrl) + .messageBody(messageBody) + .build(); + + SendMessageResponse sendMessageResponse = sqsClient.sendMessage(sendMessageRequest); + context.getLogger().log("Message sent to SQS. MessageId: " + sendMessageResponse.messageId()); + } catch (Exception e) { + context.getLogger().log("Error sending message to SQS: " + e.getMessage()); + } + } } return new APIGatewayProxyResponseEvent().withStatusCode(200) @@ -45,5 +71,4 @@ public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent in .withHeaders(Collections.emptyMap()) .withBody("Success"); } - } diff --git a/src/test/java/com/home/amazon/serverless/lambda/PostItemHandlerTest.java b/src/test/java/com/home/amazon/serverless/lambda/PostItemHandlerTest.java index b513a42..576805b 100644 --- a/src/test/java/com/home/amazon/serverless/lambda/PostItemHandlerTest.java +++ b/src/test/java/com/home/amazon/serverless/lambda/PostItemHandlerTest.java @@ -3,6 +3,7 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent; +import com.google.gson.Gson; import com.home.amazon.serverless.DependencyFactory; import com.home.amazon.serverless.model.Book; import org.junit.jupiter.api.BeforeEach; @@ -13,8 +14,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable; -import software.amazon.awssdk.enhanced.dynamodb.Key; import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import java.util.HashMap; import java.util.Map; @@ -22,18 +25,22 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) public class PostItemHandlerTest { private static final String TEST_TABLE_NAME = "TestTable"; private static final String TEST_PARTITION_KEY_VALUE = "123"; + private static final String TEST_QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; + private static final String TEST_AUTHOR = "Test Author"; + private static final String TEST_NAME = "Test Book"; + @Mock + private DynamoDbEnhancedClient dbClient; @Mock - private DynamoDbEnhancedClient client; + private SqsClient sqsClient; @Mock private DynamoDbTable table; @@ -44,14 +51,86 @@ public class PostItemHandlerTest { @Mock private Context context; + @Mock + private SendMessageResponse sendMessageResponse; + + private PostItemHandler handler; + private Map queryParams; + @BeforeEach public void setUp() { - when(client.table(eq(TEST_TABLE_NAME), any(TableSchema.class))).thenReturn(table); + when(dbClient.table(eq(TEST_TABLE_NAME), any(TableSchema.class))).thenReturn(table); + + queryParams = new HashMap<>(); + queryParams.put(Book.PARTITION_KEY, TEST_PARTITION_KEY_VALUE); + queryParams.put("author", TEST_AUTHOR); + queryParams.put("name", TEST_NAME); + when(request.getQueryStringParameters()).thenReturn(queryParams); + + when(sendMessageResponse.messageId()).thenReturn("test-message-id"); } @Test - public void shouldReturnItemIfExists() { - assertTrue(true); + public void shouldSaveItemToDynamoDb() { + try (MockedStatic dependencyFactoryMockedStatic = mockStatic(DependencyFactory.class)) { + // Setup mocks + dependencyFactoryMockedStatic.when(DependencyFactory::dynamoDbEnhancedClient).thenReturn(dbClient); + dependencyFactoryMockedStatic.when(DependencyFactory::sqsClient).thenReturn(sqsClient); + dependencyFactoryMockedStatic.when(DependencyFactory::tableName).thenReturn(TEST_TABLE_NAME); + dependencyFactoryMockedStatic.when(DependencyFactory::isSqsEnabled).thenReturn(false); + + // Create handler and invoke + handler = new PostItemHandler(); + APIGatewayProxyResponseEvent response = handler.handleRequest(request, context); + + // Verify + verify(table).putItem(any(Book.class)); + verify(sqsClient, never()).sendMessage(any(SendMessageRequest.class)); + assertEquals(200, response.getStatusCode()); + } } + @Test + public void shouldSendToSqsWhenEnabled() { + try (MockedStatic dependencyFactoryMockedStatic = mockStatic(DependencyFactory.class)) { + // Setup mocks + dependencyFactoryMockedStatic.when(DependencyFactory::dynamoDbEnhancedClient).thenReturn(dbClient); + dependencyFactoryMockedStatic.when(DependencyFactory::sqsClient).thenReturn(sqsClient); + dependencyFactoryMockedStatic.when(DependencyFactory::tableName).thenReturn(TEST_TABLE_NAME); + dependencyFactoryMockedStatic.when(DependencyFactory::isSqsEnabled).thenReturn(true); + dependencyFactoryMockedStatic.when(DependencyFactory::sqsQueueUrl).thenReturn(TEST_QUEUE_URL); + + when(sqsClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResponse); + + // Create handler and invoke + handler = new PostItemHandler(); + APIGatewayProxyResponseEvent response = handler.handleRequest(request, context); + + // Verify + verify(table).putItem(any(Book.class)); + verify(sqsClient).sendMessage(any(SendMessageRequest.class)); + assertEquals(200, response.getStatusCode()); + } + } + + @Test + public void shouldNotSendToSqsWhenQueueUrlIsNull() { + try (MockedStatic dependencyFactoryMockedStatic = mockStatic(DependencyFactory.class)) { + // Setup mocks + dependencyFactoryMockedStatic.when(DependencyFactory::dynamoDbEnhancedClient).thenReturn(dbClient); + dependencyFactoryMockedStatic.when(DependencyFactory::sqsClient).thenReturn(sqsClient); + dependencyFactoryMockedStatic.when(DependencyFactory::tableName).thenReturn(TEST_TABLE_NAME); + dependencyFactoryMockedStatic.when(DependencyFactory::isSqsEnabled).thenReturn(true); + dependencyFactoryMockedStatic.when(DependencyFactory::sqsQueueUrl).thenReturn(null); + + // Create handler and invoke + handler = new PostItemHandler(); + APIGatewayProxyResponseEvent response = handler.handleRequest(request, context); + + // Verify + verify(table).putItem(any(Book.class)); + verify(sqsClient, never()).sendMessage(any(SendMessageRequest.class)); + assertEquals(200, response.getStatusCode()); + } + } } \ No newline at end of file