Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
946 changes: 946 additions & 0 deletions documentdb-lambda-java-sam/DocumentDBAndMongoClientEC2.yaml

Large diffs are not rendered by default.

194 changes: 194 additions & 0 deletions documentdb-lambda-java-sam/README.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions documentdb-lambda-java-sam/connect_to_mongo_shell.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

# Connect to Mongo Shell
mongosh --tls --tlsCAFile /home/ec2-user/mongoshell/global-bundle.pem --username DOCDB_CLUSTER_ADMIN_USER --password DOCDB_CLUSTER_PASSWORD --host DOCDB_CLUSTER_ENDPOINT --port 27017
19 changes: 19 additions & 0 deletions documentdb-lambda-java-sam/docdb_db_collection.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

# DocumentDB Connection Details
DOCDB_ENDPOINT="DOCDB_CLUSTER_ENDPOINT"
DOCDB_PORT="27017"
DOCDB_USERNAME="DOCDB_CLUSTER_ADMIN_USER"
DOCDB_PASSWORD="DOCDB_CLUSTER_PASSWORD"
TLS_CERT_FILE="/home/ec2-user/mongoshell/global-bundle.pem" # Path to your TLS certificate file if TLS is enabled

# Database and Collection Names
DATABASE_NAME="DOCDB_DATABASE"
COLLECTION_NAME="DOCDB_COLLECTION"

# Connect using Mongo Shell and create database and collection
mongosh --tls --tlsCAFile ${TLS_CERT_FILE} --username ${DOCDB_USERNAME} --password ${DOCDB_PASSWORD} --host ${DOCDB_ENDPOINT} --port ${DOCDB_PORT} --file mongodbcolcreate.js


# Connect to Mongo Shell
# mongosh --tls --tlsCAFile /home/ec2-user/mongoshell/global-bundle.pem --username DOCDB_CLUSTER_ADMIN_USER --password DOCDB_CLUSTER_PASSWORD --host DOCDB_CLUSTER_ENDPOINT --port 27017
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
node_modules
npm-debug.log
package-lock.json
package
*out.yml
out.json
bucket-name.txt
target
build
.gradle
*.zip
bin
obj
Gemfile.lock
lib
__pycache__
*.pyc
.classpath
.factorypath
.project
.settings/*
.aws
.sam
.aws-sam
*.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target/
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
org.eclipse.jdt.core.compiler.compliance=11
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=11
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws.services.lambda.samples.events.documentdb.streams</groupId>
<artifactId>DocumentDBStreamsConsumer</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>A sample Lambda DocumentDB Streams consumer</name>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-dynamodb -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.12.445</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.7.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>2.20.62</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration></configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.amazonaws.services.lambda.samples.events.documentdbstreams;

import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.samples.events.documentdbstreams.models.EventEvent;

public class DynamoDBUpdater {

String dynamoDBTableName;
AmazonDynamoDB client;
DynamoDB dynamoDB;
Table dynamoTable;


public DynamoDBUpdater(String dynamoDBTableName) {
super();
if (null == dynamoDBTableName) {
this.dynamoDBTableName = "SQS_LAMBDA_DYNAMO_TABLE";
} else {
this.dynamoDBTableName = dynamoDBTableName;
}
String AWS_SAM_LOCAL = System.getenv("AWS_SAM_LOCAL");
if (null == AWS_SAM_LOCAL) {
this.client = AmazonDynamoDBClientBuilder.standard().build();
} else {
this.client = AmazonDynamoDBClientBuilder.standard().withEndpointConfiguration(new EndpointConfiguration("http://127.0.0.1:8000", "")).build();
this.dynamoDBTableName = "SQS_LAMBDA_DYNAMO_TABLE";
}
this.dynamoDB = new DynamoDB(client);
this.dynamoTable = dynamoDB.getTable(this.dynamoDBTableName);
}

public PutItemOutcome insertIntoDynamoDB(EventEvent eventEvent, String EventSource, String EventSourceARN, LambdaLogger logger) {
logger.log("Now inserting a row in DynamoDB for messageID = " + eventEvent.getFullDocument().get_id());
Item item = new Item();
item.withPrimaryKey("MessageID", eventEvent.getFullDocument().get_id());
item.withString("EventSource", EventSource);
item.withString("EventSourceARN", EventSourceARN);
item.withString("EventIDData", eventEvent.get_id().get_data());
item.withString("OperationType", eventEvent.getOperationType());
item.withString("DocumentDBDatabase", eventEvent.getNs().getDb());
item.withString("DocumentDBCollection", eventEvent.getNs().getColl());
item.withString("DocumentKeyID", eventEvent.getDocumentKey().get_id());
item.withLong("ClusterTimeTimeStampT = ", eventEvent.getClusterTime().get$timestamp().getT());
item.withLong("ClusterTimeTimeStampI = ", eventEvent.getClusterTime().get$timestamp().getI());
item.withString("CustomerID = ", eventEvent.getFullDocument().get_id());
item.withString("CustomerFirstname = ", eventEvent.getFullDocument().getFirstname());
item.withString("CustomerLastname = ", eventEvent.getFullDocument().getLastname());
item.withString("CustomerStreet = ", eventEvent.getFullDocument().getStreet());
item.withString("CustomerCity = ", eventEvent.getFullDocument().getCity());
item.withString("CustomerCounty = ", eventEvent.getFullDocument().getCounty());
item.withString("CustomerState = ", eventEvent.getFullDocument().getState());
item.withString("CustomerZip = ", eventEvent.getFullDocument().getZip());
item.withString("CustomerHomePhone = ", eventEvent.getFullDocument().getHomePhone());
item.withString("CustomerCellPhone = ", eventEvent.getFullDocument().getCellPhone());
item.withString("CustomerEmail = ", eventEvent.getFullDocument().getEmail());
item.withString("CustomerCompany = ", eventEvent.getFullDocument().getCompany());
item.withString("CustomerWebsite = ", eventEvent.getFullDocument().getWebsite());
logger.log("Now done inserting a row in DynamoDB for messageID = " + eventEvent.getFullDocument().get_id());
return dynamoTable.putItem(item);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//Lambda Runtime delivers a batch of messages to the lambda function
//Each batch of messages has two fields EventSource and EventSourceARN
//Each batch of messages also has a field called Records
//The Records is a map with multiple keys and values
//Each key is a combination of the Topic Name and the Partition Number
//One batch of messages can contain messages from multiple partitions

/*
To simplify representing a batch of Kafka messages as a list of messages
We have created a Java class called KafkaMessage under the models package
Here we are mapping the structure of an incoming Kafka event to a list of
objects of the KafkaMessage class
*/

package com.amazonaws.services.lambda.samples.events.documentdbstreams;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.amazonaws.services.lambda.samples.events.documentdbstreams.models.*;


public class HandlerDocumentDBStreams implements RequestStreamHandler{
Gson gson = new GsonBuilder().setPrettyPrinting().create();
String dynamoDBTableName = System.getenv("DYNAMO_DB_TABLE");
DynamoDBUpdater ddbUpdater = new DynamoDBUpdater(dynamoDBTableName);
boolean addToDynamoDB;
//ObjectMapper objectMapper = new ObjectMapper();
@Override
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException
{
LambdaLogger logger = context.getLogger();
addToDynamoDB = true;
logger.log("Begin Event *************");
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
DocumentDBStreamMessage message = gson.fromJson(reader, DocumentDBStreamMessage.class);
logger.log("Message = " + message);
logger.log("EventSource = " + message.getEventSource());
logger.log("EventSourceARN = " + message.getEventSourceArn());
for (int i=0;i<message.getEvents().length;i++) {
logger.log("Starting a new message **************");
EventElement eventElement = message.getEvents()[i];
EventEvent eventEvent = eventElement.getEvent();
logger.log("EventIDData = " + eventEvent.get_id().get_data());
logger.log("OperationType = " + eventEvent.getOperationType());
logger.log("Database = " + eventEvent.getNs().getDb());
logger.log("Collection = " + eventEvent.getNs().getColl());
logger.log("DocumentKeyID = " + eventEvent.getDocumentKey().get_id());
logger.log("ClusterTimeTimeStampT = " + eventEvent.getClusterTime().get$timestamp().getT());
logger.log("ClusterTimeTimeStampI = " + eventEvent.getClusterTime().get$timestamp().getI());
logger.log("CustomerID = " + eventEvent.getFullDocument().get_id());
logger.log("CustomerFirstname = " + eventEvent.getFullDocument().getFirstname());
logger.log("CustomerLastname = " + eventEvent.getFullDocument().getLastname());
logger.log("CustomerStreet = " + eventEvent.getFullDocument().getStreet());
logger.log("CustomerCity = " + eventEvent.getFullDocument().getCity());
logger.log("CustomerCounty = " + eventEvent.getFullDocument().getCounty());
logger.log("CustomerState = " + eventEvent.getFullDocument().getState());
logger.log("CustomerZip = " + eventEvent.getFullDocument().getZip());
logger.log("CustomerHomePhone = " + eventEvent.getFullDocument().getHomePhone());
logger.log("CustomerCellPhone = " + eventEvent.getFullDocument().getCellPhone());
logger.log("CustomerEmail = " + eventEvent.getFullDocument().getEmail());
logger.log("CustomerCompany = " + eventEvent.getFullDocument().getCompany());
logger.log("CustomerWebsite = " + eventEvent.getFullDocument().getWebsite());
logger.log("Finishing a new message **************");
String AWS_SAM_LOCAL = System.getenv("AWS_SAM_LOCAL");
if ((null == AWS_SAM_LOCAL) && (addToDynamoDB)) {
ddbUpdater.insertIntoDynamoDB(eventEvent, message.getEventSource(), message.getEventSourceArn(), logger);
}
}
} catch (Exception e1) {
logger.log(e1.getMessage());
}
logger.log("End Event ***************");

}

public void throwit(String message) throws Exception{
throw new Exception(message);
}

}
Loading