Skip to content

Commit 17a4927

Browse files
author
Felix John
committed
Fix
1 parent 0738e4d commit 17a4927

File tree

5 files changed

+18
-17
lines changed

5 files changed

+18
-17
lines changed

java/Iceberg/S3TableSink/README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ The application must have IAM permissions to:
4747

4848
### Runtime configuration
4949

50-
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties.
50+
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. Make sure that you pass the mandatory parameter `table.bucket.arn`.
51+
5152
When running locally, the configuration is read from the
52-
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
53+
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. Make sure that the scope of the dependencides in `pom.xml` is set to `default`when running locally.
5354

5455
Runtime parameters:
5556

@@ -64,19 +65,16 @@ Runtime parameters:
6465
| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. |
6566

6667
### Checkpoints
67-
6868
Checkpointing must be enabled. Iceberg commits writes on checkpoint. When running locally, the application enables checkpoints programmatically, every 10 seconds.When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
6969

7070

7171
### Known limitations
72-
7372
At the moment there are current limitations concerning Flink Iceberg integration with S3 Tables:
7473
* * Currently, this example needs to be in Flink v1.19, v1.20 isn't supported with the S3 Table Sink yet.
7574
* Doesn't support Iceberg Table with hidden partitioning
7675
* Doesn't support adding columns, removing columns, renaming columns or changing columns.
7776

7877
### Schema and schema evolution
79-
8078
The application must "know" the AVRO schema on start. The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
8179

8280
This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible.
@@ -90,7 +88,6 @@ It is technically possible to fetch the schema on application start from an exte
9088
schema definition file in an S3 bucket. This is beyond the scope of this example.
9189

9290
### Running locally, in IntelliJ
93-
9491
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
9592

9693
See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.

java/Iceberg/S3TableSink/pom.xml

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@
3030
<groupId>org.apache.flink</groupId>
3131
<artifactId>flink-runtime-web</artifactId>
3232
<version>${flink.version}</version>
33+
<scope>provided</scope>
3334
</dependency>
3435
<dependency>
3536
<groupId>org.apache.flink</groupId>
3637
<artifactId>flink-table-planner_${scala.version}</artifactId>
3738
<version>${flink.version}</version>
39+
<scope>provided</scope>
3840
</dependency>
41+
<!-- Flink Iceberg uses DropWizard metrics -->
3942
<dependency>
4043
<groupId>org.apache.flink</groupId>
4144
<artifactId>flink-metrics-dropwizard</artifactId>
@@ -45,27 +48,34 @@
4548
<groupId>org.apache.flink</groupId>
4649
<artifactId>flink-streaming-java</artifactId>
4750
<version>${flink.version}</version>
51+
<scope>provided</scope>
4852
</dependency>
4953
<dependency>
5054
<groupId>org.apache.flink</groupId>
5155
<artifactId>flink-table-runtime</artifactId>
5256
<version>${flink.version}</version>
57+
<scope>provided</scope>
5358
</dependency>
5459
<dependency>
5560
<groupId>org.apache.flink</groupId>
5661
<artifactId>flink-connector-datagen</artifactId>
5762
<version>${flink.version}</version>
63+
<scope>provided</scope>
5864
</dependency>
5965
<dependency>
6066
<groupId>org.apache.flink</groupId>
6167
<artifactId>flink-table-api-java-bridge</artifactId>
6268
<version>${flink.version}</version>
69+
<scope>provided</scope>
6370
</dependency>
6471
<dependency>
6572
<groupId>org.apache.flink</groupId>
6673
<artifactId>flink-table-common</artifactId>
6774
<version>${flink.version}</version>
75+
<scope>provided</scope>
6876
</dependency>
77+
78+
<!-- Flink-Avro Dependencies -->
6979
<dependency>
7080
<groupId>org.apache.flink</groupId>
7181
<artifactId>flink-avro</artifactId>
@@ -90,6 +100,7 @@
90100
<groupId>com.amazonaws</groupId>
91101
<artifactId>aws-kinesisanalytics-runtime</artifactId>
92102
<version>${kda.runtime.version}</version>
103+
<scope>provided</scope>
93104
</dependency>
94105

95106
<!-- S3 Tables Dependencies -->
@@ -261,13 +272,6 @@
261272
</excludes>
262273
</filter>
263274
</filters>
264-
<!-- Shading org.apache.avro, see: https://github.com/apache/iceberg/issues/13481 -->
265-
<relocations>
266-
<relocation>
267-
<pattern>org.apache.avro</pattern>
268-
<shadedPattern>shaded.org.apache.avro</shadedPattern>
269-
</relocation>
270-
</relocations>
271275
<transformers>
272276
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
273277
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public static void main(String[] args) throws Exception {
8080
}
8181

8282
Map<String, Properties> applicationProperties = loadApplicationProperties(env);
83-
icebergProperties = applicationProperties.get("Iceberg");
83+
icebergProperties = applicationProperties.getOrDefault("Iceberg", new Properties());
8484

8585
// Get AVRO Schema from the definition bundled with the application
8686
// Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded
@@ -107,7 +107,7 @@ public static void main(String[] args) throws Exception {
107107
}
108108

109109
private static DataStream<GenericRecord> createDataStream(StreamExecutionEnvironment env, Map<String, Properties> applicationProperties, Schema avroSchema) {
110-
Properties dataGeneratorProperties = applicationProperties.get("DataGen");
110+
Properties dataGeneratorProperties = applicationProperties.getOrDefault("DataGen", new Properties());
111111
return env.fromSource(
112112
createDataGenerator(dataGeneratorProperties, avroSchema),
113113
WatermarkStrategy.noWatermarks(),

java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class IcebergSinkBuilder {
2626
private static final String DEFAULT_S3_CATALOG_DB = "default";
2727
private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg";
2828
private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol";
29-
private static final String DEFAULT_ICEBERG_OPERATION = "upsert";
29+
private static final String DEFAULT_ICEBERG_OPERATION = "append";
3030
private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol";
3131

3232
/**

java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"PropertyGroupId": "Iceberg",
1010
"PropertyMap": {
1111

12-
"table.bucket.arn": "your-table-arn",
12+
"table.bucket.arn": "<your-table-arn>",
1313
"catalog.db": "default",
1414
"catalog.table": "prices_s3table",
1515
"partition.fields": "symbol",

0 commit comments

Comments
 (0)