diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md
index 07ef1e3..480e318 100644
--- a/java/Iceberg/S3TableSink/README.md
+++ b/java/Iceberg/S3TableSink/README.md
@@ -2,7 +2,7 @@
* Flink version: 1.19.0
* Flink API: DataStream API
-* Iceberg 1.8.1
+* Iceberg 1.9.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
@@ -10,9 +10,7 @@
This example demonstrate how to use
[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with S3 Tables.
-For simplicity, the application generates synthetic data, random stock prices, internally.
-Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records
-serialized with AVRO.
+For simplicity, the application generates synthetic data, random stock prices, internally. Data is generated as AVRO Generic Record, simulating a real source, e.g. a Kafka Source, that receives records serialized with AVRO.
### Prerequisites
@@ -49,10 +47,10 @@ The application must have IAM permissions to:
### Runtime configuration
-When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
+When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. Make sure that you pass the mandatory parameter `table.bucket.arn`.
When running locally, the configuration is read from the
-[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
+[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. Make sure that the scope of the dependencides in `pom.xml` is set to `compile`when running locally.
Runtime parameters:
@@ -67,25 +65,17 @@ Runtime parameters:
| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. |
### Checkpoints
-
-Checkpointing must be enabled. Iceberg commits writes on checkpoint.
-
-When running locally, the application enables checkpoints programmatically, every 10 seconds.
-When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
+Checkpointing must be enabled. Iceberg commits writes on checkpoint. When running locally, the application enables checkpoints programmatically, every 10 seconds.When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
### Known limitations
-
At the moment there are current limitations concerning Flink Iceberg integration with S3 Tables:
* * Currently, this example needs to be in Flink v1.19, v1.20 isn't supported with the S3 Table Sink yet.
* Doesn't support Iceberg Table with hidden partitioning
* Doesn't support adding columns, removing columns, renaming columns or changing columns.
### Schema and schema evolution
-
-The application must "know" the AVRO schema on start.
-The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry.
-This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
+The application must "know" the AVRO schema on start. The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible.
Schema changes are not propagated to Iceberg.
@@ -98,7 +88,6 @@ It is technically possible to fetch the schema on application start from an exte
schema definition file in an S3 bucket. This is beyond the scope of this example.
### Running locally, in IntelliJ
-
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
diff --git a/java/Iceberg/S3TableSink/pom.xml b/java/Iceberg/S3TableSink/pom.xml
index f43c091..6da10e1 100644
--- a/java/Iceberg/S3TableSink/pom.xml
+++ b/java/Iceberg/S3TableSink/pom.xml
@@ -14,12 +14,11 @@
11
${target.java.version}
${target.java.version}
-
1.19.0
- 1.11.3
+ 1.12.0
2.12
3.4.0
- 1.6.1
+ 1.9.1
1.2.0
2.23.1
5.8.1
@@ -35,46 +34,66 @@
org.apache.flink
- flink-streaming-java
+ flink-table-planner_${scala.version}
${flink.version}
provided
+
org.apache.flink
- flink-table-runtime
+ flink-metrics-dropwizard
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java
${flink.version}
provided
org.apache.flink
- flink-table-api-java-bridge
+ flink-table-runtime
${flink.version}
+ provided
org.apache.flink
- flink-table-common
+ flink-connector-datagen
${flink.version}
+ provided
org.apache.flink
- flink-metrics-dropwizard
+ flink-table-api-java-bridge
${flink.version}
+ provided
org.apache.flink
- flink-avro
+ flink-table-common
${flink.version}
+ provided
-
-
+
org.apache.flink
- flink-table-planner_${scala.version}
+ flink-avro
${flink.version}
- provided
+
+
+ org.apache.avro
+ avro
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
@@ -88,24 +107,30 @@
software.amazon.awssdk
s3tables
- 2.29.26
+ 2.38.2
+
+
+ ch.qos.logback
+ logback-classic
+
+
software.amazon.s3tables
s3-tables-catalog-for-iceberg
- 0.1.3
-
-
- org.apache.flink
- flink-connector-files
- ${flink.version}
- provided
+ 0.1.8
+
+
+ ch.qos.logback
+ logback-classic
+
+
org.apache.hadoop
- hadoop-client
+ hadoop-common
${hadoop.version}
@@ -116,17 +141,27 @@
org.slf4j
slf4j-reload4j
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ ch.qos.logback
+ logback-core
+
+
+
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-core
- ${hadoop.version}
+ org.junit.jupiter
+ junit-jupiter
+ ${junit5.version}
+ test
@@ -134,34 +169,45 @@
org.apache.iceberg
iceberg-core
${iceberg.version}
+
+
+ org.apache.avro
+ avro
+
+
org.apache.iceberg
- iceberg-flink
- ${iceberg.version}
-
-
- org.apache.iceberg
- iceberg-flink
+ iceberg-aws
${iceberg.version}
+
+
+ org.apache.avro
+ avro
+
+
org.apache.iceberg
- iceberg-aws-bundle
+ iceberg-flink-1.19
${iceberg.version}
+
+
+ org.apache.avro
+ avro
+
+
org.apache.iceberg
- iceberg-aws
+ iceberg-api
${iceberg.version}
-
-
-
-
- org.junit.jupiter
- junit-jupiter
- ${junit5.version}
- test
+
+
+ org.apache.avro
+ avro
+
+
@@ -179,13 +225,6 @@
org.apache.logging.log4j
log4j-core
${log4j.version}
- runtime
-
-
- org.apache.iceberg
- iceberg-flink-1.19
- 1.7.0
- compile
@@ -206,7 +245,7 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.2.1
+ 3.5.0
package
@@ -219,7 +258,7 @@
org.apache.flink:force-shading
com.google.code.findbugs:jsr305
org.slf4j:*
- log4j:*
+ org.apache.logging.log4j:*
@@ -229,6 +268,7 @@
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
+ META-INF/versions/**
diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java
index ff7a960..de40d29 100644
--- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java
+++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
public class StreamingJob {
@@ -58,7 +59,7 @@ private static DataGeneratorSource createDataGenerator(Properties
double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0"));
Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0");
- LOG.info("Data generator: {} record/sec", recordsPerSecond);
+ LOG.info("Data generator: {} record/sec", Optional.of(recordsPerSecond));
return new DataGeneratorSource<>(
new AvroGenericStockTradeGeneratorFunction(avroSchema),
Long.MAX_VALUE,
@@ -79,7 +80,7 @@ public static void main(String[] args) throws Exception {
}
Map applicationProperties = loadApplicationProperties(env);
- icebergProperties = applicationProperties.get("Iceberg");
+ icebergProperties = applicationProperties.getOrDefault("Iceberg", new Properties());
// Get AVRO Schema from the definition bundled with the application
// Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded
@@ -98,6 +99,7 @@ public static void main(String[] args) throws Exception {
// Flink Sink Builder
FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder(icebergProperties, genericRecordDataStream, avroSchema);
+
// Sink to Iceberg Table
icebergSinkBuilder.append();
@@ -105,7 +107,7 @@ public static void main(String[] args) throws Exception {
}
private static DataStream createDataStream(StreamExecutionEnvironment env, Map applicationProperties, Schema avroSchema) {
- Properties dataGeneratorProperties = applicationProperties.get("DataGen");
+ Properties dataGeneratorProperties = applicationProperties.getOrDefault("DataGen", new Properties());
return env.fromSource(
createDataGenerator(dataGeneratorProperties, avroSchema),
WatermarkStrategy.noWatermarks(),
diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java
index 923cab4..04d2bf7 100644
--- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java
+++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java
@@ -12,6 +12,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.*;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
@@ -25,7 +26,7 @@ public class IcebergSinkBuilder {
private static final String DEFAULT_S3_CATALOG_DB = "default";
private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg";
private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol";
- private static final String DEFAULT_ICEBERG_OPERATION = "upsert";
+ private static final String DEFAULT_ICEBERG_OPERATION = "append";
private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol";
/**
diff --git a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json
index b941e5a..7021ac1 100644
--- a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json
+++ b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json
@@ -9,7 +9,7 @@
"PropertyGroupId": "Iceberg",
"PropertyMap": {
- "table.bucket.arn": "<>",
+ "table.bucket.arn": "",
"catalog.db": "default",
"catalog.table": "prices_s3table",
"partition.fields": "symbol",