Skip to content

Commit d02a436

Browse files
authored
Fetch Secrets (#138)
* New example * Add entry to top-level README * Amend the READMEs of example which are passing credentials as parameters * Amend other READMEs showing MSK SASL/SCRAM
1 parent 1ee7ddb commit d02a436

File tree

14 files changed

+607
-2
lines changed

14 files changed

+607
-2
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f
3838
- [**Serialization**](./java/Serialization) - Serialization of record and state
3939
- [**Windowing**](./java/Windowing) - Time-based window aggregation examples
4040
- [**Side Outputs**](./java/SideOutputs) - Using side outputs for data routing and filtering
41-
- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls\
41+
- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls
4242
- [**Custom Metrics**](./java/CustomMetrics) - Creating and publishing custom application metrics
43+
- [**Fetching credentials from Secrets Manager**](./java/FetchSecrets) - Dynamically fetching credentials from AWS Secrets Manager
4344

4445
#### Utilities
4546
- [**Fink Data Generator (JSON)**](java/FlinkDataGenerator) - How to use a Flink application as data generator, for functional and load testing.

java/FetchSecrets/README.md

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
## Fetching Secrets from Secrets Manager
2+
3+
This example demonstrates how to fetch secrets from AWS Secrets Manager at application start.
4+
5+
* Flink version: 1.20
6+
* Flink API: DataStream
7+
* Language: Java (11)
8+
* Flink connectors: DataGen, Kafka sink
9+
10+
This example shows how you can fetch any secrets from AWS Secrets Manager, without passing them as non-encrypted configuration parameters.
11+
In this case, the job is fetching username and password for MSK SASL/SCRAM authentication.
12+
The application generates random stock prices and writes them, as JSON, to a Kafka topic.
13+
14+
Note that this method works for any secrets represented as text, which are directly passed to the constructor of the operator.
15+
This method does not work for fetching keystore or truststore files.
16+
17+
### Prerequisites
18+
19+
#### MSK
20+
21+
To run this application on Amazon Managed Service for Apache Flink, you need an Amazon MSK cluster configured for
22+
SASL/SCRAM authentication. See [MSK Documentation](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html)
23+
for details on how to set it up.
24+
25+
The cluster must contain a topic named `stock-prices` or allow auto topic creation.
26+
27+
If you set up any Kafka ACL, the user must have permissions to write to this topic.
28+
29+
#### Managed Flink Application Service Role
30+
31+
The IAM Service Role attached to the Managed Flink application must have sufficient permissions to fetch the credentials
32+
from Amazon Secrets Manager. See [Amazon Secrets Manager documentation](https://docs.aws.amazon.com/secretsmanager/latest/userguide/determine-acccess_examine-iam-policies.html)
33+
for further details.
34+
35+
MSK SASL/SCRAM credentials must be encrypted with a customer managed key (CMK). The application Service Role must also
36+
provide permissions to use the CMK to decrypt the secret (`kms:Decrypt`).
37+
38+
Here is an example of an IAM Policy to allow the application to fetch and decrypt the secret:
39+
40+
```json
41+
{
42+
"Version": "2012-10-17",
43+
"Statement": [
44+
{
45+
"Sid": "AllowFetchSecret",
46+
"Effect": "Allow",
47+
"Action": "secretsmanager:GetSecretValue",
48+
"Resource": "arn:aws:secretsmanager:<region>:<account>:secret:<secretName>-*"
49+
},
50+
{
51+
"Sid": "AllowDecryptSecret",
52+
"Effect": "Allow",
53+
"Action": "kms:Decrypt",
54+
"Resource": "arn:aws:kms:<region>:<account>:key/<key-id>"
55+
}
56+
]
57+
}
58+
```
59+
60+
⚠️ Note that the KMS Key Policy may also restrict access to the CMK.
61+
If you are using a restrictive Key Policy, you also need to allow your Managed Flink application to decrypt.
62+
Add the following snippet to the KMS Key Policy, in addition to other permissions:
63+
64+
```json
65+
{
66+
"Sid": "AllowDecrypting",
67+
"Effect": "Allow",
68+
"Principal": {
69+
"Service": "kinesisanalytics.amazonaws.com"
70+
},
71+
"Action": "kms:Decrypt",
72+
"Resource": "*"
73+
}
74+
```
75+
76+
#### Managed Flink Application VPC Networking
77+
78+
To be able to connect to the MSK cluster, the Managed Flink application must have VPC networking configured, and must
79+
be able to reach the MSK cluster. For the sake of this example, the simplest setup is using the same VPC, Subnets, and Security Group
80+
as the MSK cluster.
81+
82+
### Runtime Configuration
83+
84+
When running on Amazon Managed Service for Apache Flink, the runtime configuration is read from *Runtime Properties*.
85+
86+
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder.
87+
88+
Runtime parameters:
89+
90+
| Group ID | Key | Description |
91+
|------------------|----------------------|--------------------------------------------------------------------------|
92+
| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) |
93+
| `Output0` | `bootstrap.servers` | Kafka bootstrap servers |
94+
| `Output0` | `topic` | Target Kafka topic (default: "stock-prices") |
95+
| `AuthProperties` | `secret.name` | AWS Secrets Manager secret name containing username/password credentials |
96+
97+
The `bootstrap.servers` should be the one for SASL/SCRAM (port 9096).
98+
99+
### Testing Locally
100+
101+
The application cannot be run locally, unless you provide networking from your machine to an MSK cluster supporting
102+
SASL/SCRAM authentication, for example via VPN.
103+
104+
Fetching the secret from Secrets Manager works from your machine, as long as you have an authenticated AWS CLI profile
105+
which allows fetching the secret, and you let your application use the profile using the IDE AWS Plugin.
106+
107+
108+
### Known Limitations
109+
110+
Credentials can be fetched only once, when the job starts.
111+
Flink does not have any easy way to dynamically update an operator, for example the Kafka Sink, while the job is running.
112+
113+
If you implement any credential rotation, the new credentials will not be used by the application unless you restart the job.

java/FetchSecrets/pom.xml

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>fetch-secrets</artifactId>
9+
<version>1.0</version>
10+
<packaging>jar</packaging>
11+
12+
<properties>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<buildDirectory>${project.basedir}/target</buildDirectory>
15+
<jar.finalName>${project.name}-${project.version}</jar.finalName>
16+
<target.java.version>11</target.java.version>
17+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
18+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
19+
<flink.version>1.20.0</flink.version>
20+
<kafka.connector.version>3.2.0-1.19</kafka.connector.version>
21+
<kda.runtime.version>1.2.0</kda.runtime.version>
22+
<log4j.version>2.17.2</log4j.version>
23+
</properties>
24+
25+
<dependencyManagement>
26+
<dependencies>
27+
<dependency>
28+
<groupId>software.amazon.awssdk</groupId>
29+
<artifactId>bom</artifactId>
30+
<version>2.20.162</version>
31+
<type>pom</type>
32+
<scope>import</scope>
33+
</dependency>
34+
</dependencies>
35+
</dependencyManagement>
36+
37+
<dependencies>
38+
<!-- Flink Core dependencies -->
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-streaming-java</artifactId>
42+
<version>${flink.version}</version>
43+
<scope>provided</scope>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.flink</groupId>
47+
<artifactId>flink-clients</artifactId>
48+
<version>${flink.version}</version>
49+
<scope>provided</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-connector-base</artifactId>
54+
<version>${flink.version}</version>
55+
<scope>provided</scope>
56+
</dependency>
57+
58+
<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
59+
<dependency>
60+
<groupId>com.amazonaws</groupId>
61+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
62+
<version>${kda.runtime.version}</version>
63+
<scope>provided</scope>
64+
</dependency>
65+
66+
<!-- AWS SDK for Secrets Manager -->
67+
<dependency>
68+
<groupId>software.amazon.awssdk</groupId>
69+
<artifactId>secretsmanager</artifactId>
70+
</dependency>
71+
72+
<!-- DataGen connector -->
73+
<dependency>
74+
<groupId>org.apache.flink</groupId>
75+
<artifactId>flink-connector-datagen</artifactId>
76+
<version>${flink.version}</version>
77+
</dependency>
78+
79+
<!-- Flink Kafka connector -->
80+
<dependency>
81+
<groupId>org.apache.flink</groupId>
82+
<artifactId>flink-connector-kafka</artifactId>
83+
<version>${kafka.connector.version}</version>
84+
</dependency>
85+
86+
<dependency>
87+
<groupId>org.apache.flink</groupId>
88+
<artifactId>flink-json</artifactId>
89+
<version>${flink.version}</version>
90+
<scope>provided</scope>
91+
</dependency>
92+
93+
<!-- Logging framework -->
94+
<dependency>
95+
<groupId>org.apache.logging.log4j</groupId>
96+
<artifactId>log4j-slf4j-impl</artifactId>
97+
<version>${log4j.version}</version>
98+
</dependency>
99+
<dependency>
100+
<groupId>org.apache.logging.log4j</groupId>
101+
<artifactId>log4j-api</artifactId>
102+
<version>${log4j.version}</version>
103+
</dependency>
104+
<dependency>
105+
<groupId>org.apache.logging.log4j</groupId>
106+
<artifactId>log4j-core</artifactId>
107+
<version>${log4j.version}</version>
108+
</dependency>
109+
110+
<!-- JUnit 5 -->
111+
<dependency>
112+
<groupId>org.junit.jupiter</groupId>
113+
<artifactId>junit-jupiter</artifactId>
114+
<version>5.10.0</version>
115+
<scope>test</scope>
116+
</dependency>
117+
118+
</dependencies>
119+
120+
<build>
121+
<directory>${buildDirectory}</directory>
122+
<finalName>${jar.finalName}</finalName>
123+
124+
<plugins>
125+
<!-- Java Compiler -->
126+
<plugin>
127+
<groupId>org.apache.maven.plugins</groupId>
128+
<artifactId>maven-compiler-plugin</artifactId>
129+
<version>3.8.1</version>
130+
<configuration>
131+
<source>${target.java.version}</source>
132+
<target>${target.java.version}</target>
133+
</configuration>
134+
</plugin>
135+
136+
<!-- Shade plugin to build the fat-jar including all required dependencies -->
137+
<plugin>
138+
<groupId>org.apache.maven.plugins</groupId>
139+
<artifactId>maven-shade-plugin</artifactId>
140+
<version>3.2.1</version>
141+
<executions>
142+
<execution>
143+
<phase>package</phase>
144+
<goals>
145+
<goal>shade</goal>
146+
</goals>
147+
<configuration>
148+
<artifactSet>
149+
<excludes>
150+
<exclude>org.apache.flink:force-shading</exclude>
151+
<exclude>com.google.code.findbugs:jsr305</exclude>
152+
<exclude>org.slf4j:*</exclude>
153+
<exclude>log4j:*</exclude>
154+
</excludes>
155+
</artifactSet>
156+
<filters>
157+
<filter>
158+
<artifact>*:*</artifact>
159+
<excludes>
160+
<exclude>META-INF/*.SF</exclude>
161+
<exclude>META-INF/*.DSA</exclude>
162+
<exclude>META-INF/*.RSA</exclude>
163+
</excludes>
164+
</filter>
165+
</filters>
166+
<transformers>
167+
<transformer
168+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
169+
<transformer
170+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
171+
<mainClass>com.amazonaws.services.msf.FetchSecretsJob</mainClass>
172+
</transformer>
173+
</transformers>
174+
</configuration>
175+
</execution>
176+
</executions>
177+
</plugin>
178+
</plugins>
179+
</build>
180+
</project>

0 commit comments

Comments
 (0)