Skip to content

Commit 51ccc61

Browse files
Merge pull request #146 from Madabaru/main
Upgrade to Iceberg 1.9.1
2 parents ecd3380 + 6d3776b commit 51ccc61

File tree

5 files changed

+109
-77
lines changed

5 files changed

+109
-77
lines changed

java/Iceberg/S3TableSink/README.md

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@
22

33
* Flink version: 1.19.0
44
* Flink API: DataStream API
5-
* Iceberg 1.8.1
5+
* Iceberg 1.9.1
66
* Language: Java (11)
77
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
88
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
99

1010
This example demonstrate how to use
1111
[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with S3 Tables.
1212

13-
For simplicity, the application generates synthetic data, random stock prices, internally.
14-
Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records
15-
serialized with AVRO.
13+
For simplicity, the application generates synthetic data, random stock prices, internally. Data is generated as AVRO Generic Record, simulating a real source, e.g. a Kafka Source, that receives records serialized with AVRO.
1614

1715
### Prerequisites
1816

@@ -49,10 +47,10 @@ The application must have IAM permissions to:
4947

5048
### Runtime configuration
5149

52-
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
50+
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. Make sure that you pass the mandatory parameter `table.bucket.arn`.
5351

5452
When running locally, the configuration is read from the
55-
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
53+
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. Make sure that the scope of the dependencides in `pom.xml` is set to `compile`when running locally.
5654

5755
Runtime parameters:
5856

@@ -67,25 +65,17 @@ Runtime parameters:
6765
| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. |
6866

6967
### Checkpoints
70-
71-
Checkpointing must be enabled. Iceberg commits writes on checkpoint.
72-
73-
When running locally, the application enables checkpoints programmatically, every 10 seconds.
74-
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
68+
Checkpointing must be enabled. Iceberg commits writes on checkpoint. When running locally, the application enables checkpoints programmatically, every 10 seconds.When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
7569

7670

7771
### Known limitations
78-
7972
At the moment there are current limitations concerning Flink Iceberg integration with S3 Tables:
8073
* * Currently, this example needs to be in Flink v1.19, v1.20 isn't supported with the S3 Table Sink yet.
8174
* Doesn't support Iceberg Table with hidden partitioning
8275
* Doesn't support adding columns, removing columns, renaming columns or changing columns.
8376

8477
### Schema and schema evolution
85-
86-
The application must "know" the AVRO schema on start.
87-
The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry.
88-
This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
78+
The application must "know" the AVRO schema on start. The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
8979

9080
This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible.
9181
Schema changes are not propagated to Iceberg.
@@ -98,7 +88,6 @@ It is technically possible to fetch the schema on application start from an exte
9888
schema definition file in an S3 bucket. This is beyond the scope of this example.
9989

10090
### Running locally, in IntelliJ
101-
10291
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
10392

10493
See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.

java/Iceberg/S3TableSink/pom.xml

Lines changed: 95 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
<target.java.version>11</target.java.version>
1515
<maven.compiler.source>${target.java.version}</maven.compiler.source>
1616
<maven.compiler.target>${target.java.version}</maven.compiler.target>
17-
1817
<flink.version>1.19.0</flink.version>
19-
<avro.version>1.11.3</avro.version>
18+
<avro.version>1.12.0</avro.version>
2019
<scala.version>2.12</scala.version>
2120
<hadoop.version>3.4.0</hadoop.version>
22-
<iceberg.version>1.6.1</iceberg.version>
21+
<iceberg.version>1.9.1</iceberg.version>
2322
<kda.runtime.version>1.2.0</kda.runtime.version>
2423
<log4j.version>2.23.1</log4j.version>
2524
<junit5.version>5.8.1</junit5.version>
@@ -35,46 +34,66 @@
3534
</dependency>
3635
<dependency>
3736
<groupId>org.apache.flink</groupId>
38-
<artifactId>flink-streaming-java</artifactId>
37+
<artifactId>flink-table-planner_${scala.version}</artifactId>
3938
<version>${flink.version}</version>
4039
<scope>provided</scope>
4140
</dependency>
41+
<!-- Flink Iceberg uses DropWizard metrics -->
4242
<dependency>
4343
<groupId>org.apache.flink</groupId>
44-
<artifactId>flink-table-runtime</artifactId>
44+
<artifactId>flink-metrics-dropwizard</artifactId>
45+
<version>${flink.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-streaming-java</artifactId>
4550
<version>${flink.version}</version>
4651
<scope>provided</scope>
4752
</dependency>
4853
<dependency>
4954
<groupId>org.apache.flink</groupId>
50-
<artifactId>flink-table-api-java-bridge</artifactId>
55+
<artifactId>flink-table-runtime</artifactId>
5156
<version>${flink.version}</version>
57+
<scope>provided</scope>
5258
</dependency>
5359
<dependency>
5460
<groupId>org.apache.flink</groupId>
55-
<artifactId>flink-table-common</artifactId>
61+
<artifactId>flink-connector-datagen</artifactId>
5662
<version>${flink.version}</version>
63+
<scope>provided</scope>
5764
</dependency>
5865
<dependency>
5966
<groupId>org.apache.flink</groupId>
60-
<artifactId>flink-metrics-dropwizard</artifactId>
67+
<artifactId>flink-table-api-java-bridge</artifactId>
6168
<version>${flink.version}</version>
69+
<scope>provided</scope>
6270
</dependency>
6371
<dependency>
6472
<groupId>org.apache.flink</groupId>
65-
<artifactId>flink-avro</artifactId>
73+
<artifactId>flink-table-common</artifactId>
6674
<version>${flink.version}</version>
75+
<scope>provided</scope>
6776
</dependency>
6877

69-
<!-- Flink Table Dependencies -->
70-
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
78+
<!-- Flink-Avro Dependencies -->
7179
<dependency>
7280
<groupId>org.apache.flink</groupId>
73-
<artifactId>flink-table-planner_${scala.version}</artifactId>
81+
<artifactId>flink-avro</artifactId>
7482
<version>${flink.version}</version>
75-
<scope>provided</scope>
83+
<exclusions>
84+
<exclusion>
85+
<groupId>org.apache.avro</groupId>
86+
<artifactId>avro</artifactId>
87+
</exclusion>
88+
</exclusions>
7689
</dependency>
7790

91+
<!-- Avro Dependencies -->
92+
<dependency>
93+
<groupId>org.apache.avro</groupId>
94+
<artifactId>avro</artifactId>
95+
<version>${avro.version}</version>
96+
</dependency>
7897

7998
<!-- MSF Dependencies -->
8099
<dependency>
@@ -88,24 +107,30 @@
88107
<dependency>
89108
<groupId>software.amazon.awssdk</groupId>
90109
<artifactId>s3tables</artifactId>
91-
<version>2.29.26</version>
110+
<version>2.38.2</version>
111+
<exclusions>
112+
<exclusion>
113+
<groupId>ch.qos.logback</groupId>
114+
<artifactId>logback-classic</artifactId>
115+
</exclusion>
116+
</exclusions>
92117
</dependency>
93118
<dependency>
94119
<groupId>software.amazon.s3tables</groupId>
95120
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
96-
<version>0.1.3</version>
97-
</dependency>
98-
<dependency>
99-
<groupId>org.apache.flink</groupId>
100-
<artifactId>flink-connector-files</artifactId>
101-
<version>${flink.version}</version>
102-
<scope>provided</scope>
121+
<version>0.1.8</version>
122+
<exclusions>
123+
<exclusion>
124+
<groupId>ch.qos.logback</groupId>
125+
<artifactId>logback-classic</artifactId>
126+
</exclusion>
127+
</exclusions>
103128
</dependency>
104129

105130
<!-- Hadoop Dependencies -->
106131
<dependency>
107132
<groupId>org.apache.hadoop</groupId>
108-
<artifactId>hadoop-client</artifactId>
133+
<artifactId>hadoop-common</artifactId>
109134
<version>${hadoop.version}</version>
110135
<exclusions>
111136
<exclusion>
@@ -116,52 +141,73 @@
116141
<groupId>org.slf4j</groupId>
117142
<artifactId>slf4j-reload4j</artifactId>
118143
</exclusion>
144+
<exclusion>
145+
<groupId>org.slf4j</groupId>
146+
<artifactId>slf4j-log4j12</artifactId>
147+
</exclusion>
148+
<exclusion>
149+
<groupId>ch.qos.logback</groupId>
150+
<artifactId>logback-classic</artifactId>
151+
</exclusion>
152+
<exclusion>
153+
<groupId>ch.qos.logback</groupId>
154+
<artifactId>logback-core</artifactId>
155+
</exclusion>
119156
</exclusions>
120157
</dependency>
158+
159+
<!-- Testing Dependencies -->
121160
<dependency>
122-
<groupId>org.apache.hadoop</groupId>
123-
<artifactId>hadoop-common</artifactId>
124-
<version>${hadoop.version}</version>
125-
</dependency>
126-
<dependency>
127-
<groupId>org.apache.hadoop</groupId>
128-
<artifactId>hadoop-mapreduce-client-core</artifactId>
129-
<version>${hadoop.version}</version>
161+
<groupId>org.junit.jupiter</groupId>
162+
<artifactId>junit-jupiter</artifactId>
163+
<version>${junit5.version}</version>
164+
<scope>test</scope>
130165
</dependency>
131166

132167
<!-- Iceberg Dependencies -->
133168
<dependency>
134169
<groupId>org.apache.iceberg</groupId>
135170
<artifactId>iceberg-core</artifactId>
136171
<version>${iceberg.version}</version>
172+
<exclusions>
173+
<exclusion>
174+
<groupId>org.apache.avro</groupId>
175+
<artifactId>avro</artifactId>
176+
</exclusion>
177+
</exclusions>
137178
</dependency>
138179
<dependency>
139180
<groupId>org.apache.iceberg</groupId>
140-
<artifactId>iceberg-flink</artifactId>
141-
<version>${iceberg.version}</version>
142-
</dependency>
143-
<dependency>
144-
<groupId>org.apache.iceberg</groupId>
145-
<artifactId>iceberg-flink</artifactId>
181+
<artifactId>iceberg-aws</artifactId>
146182
<version>${iceberg.version}</version>
183+
<exclusions>
184+
<exclusion>
185+
<groupId>org.apache.avro</groupId>
186+
<artifactId>avro</artifactId>
187+
</exclusion>
188+
</exclusions>
147189
</dependency>
148190
<dependency>
149191
<groupId>org.apache.iceberg</groupId>
150-
<artifactId>iceberg-aws-bundle</artifactId>
192+
<artifactId>iceberg-flink-1.19</artifactId>
151193
<version>${iceberg.version}</version>
194+
<exclusions>
195+
<exclusion>
196+
<groupId>org.apache.avro</groupId>
197+
<artifactId>avro</artifactId>
198+
</exclusion>
199+
</exclusions>
152200
</dependency>
153201
<dependency>
154202
<groupId>org.apache.iceberg</groupId>
155-
<artifactId>iceberg-aws</artifactId>
203+
<artifactId>iceberg-api</artifactId>
156204
<version>${iceberg.version}</version>
157-
</dependency>
158-
159-
<!-- Testing Dependencies -->
160-
<dependency>
161-
<groupId>org.junit.jupiter</groupId>
162-
<artifactId>junit-jupiter</artifactId>
163-
<version>${junit5.version}</version>
164-
<scope>test</scope>
205+
<exclusions>
206+
<exclusion>
207+
<groupId>org.apache.avro</groupId>
208+
<artifactId>avro</artifactId>
209+
</exclusion>
210+
</exclusions>
165211
</dependency>
166212

167213
<!-- Logging Dependencies -->
@@ -179,13 +225,6 @@
179225
<groupId>org.apache.logging.log4j</groupId>
180226
<artifactId>log4j-core</artifactId>
181227
<version>${log4j.version}</version>
182-
<scope>runtime</scope>
183-
</dependency>
184-
<dependency>
185-
<groupId>org.apache.iceberg</groupId>
186-
<artifactId>iceberg-flink-1.19</artifactId>
187-
<version>1.7.0</version>
188-
<scope>compile</scope>
189228
</dependency>
190229
</dependencies>
191230

@@ -206,7 +245,7 @@
206245
<plugin>
207246
<groupId>org.apache.maven.plugins</groupId>
208247
<artifactId>maven-shade-plugin</artifactId>
209-
<version>3.2.1</version>
248+
<version>3.5.0</version>
210249
<executions>
211250
<execution>
212251
<phase>package</phase>
@@ -219,7 +258,7 @@
219258
<exclude>org.apache.flink:force-shading</exclude>
220259
<exclude>com.google.code.findbugs:jsr305</exclude>
221260
<exclude>org.slf4j:*</exclude>
222-
<exclude>log4j:*</exclude>
261+
<exclude>org.apache.logging.log4j:*</exclude>
223262
</excludes>
224263
</artifactSet>
225264
<filters>
@@ -229,6 +268,7 @@
229268
<exclude>META-INF/*.SF</exclude>
230269
<exclude>META-INF/*.DSA</exclude>
231270
<exclude>META-INF/*.RSA</exclude>
271+
<exclude>META-INF/versions/**</exclude>
232272
</excludes>
233273
</filter>
234274
</filters>

java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.util.Map;
2929
import java.util.Objects;
30+
import java.util.Optional;
3031
import java.util.Properties;
3132

3233
public class StreamingJob {
@@ -58,7 +59,7 @@ private static DataGeneratorSource<GenericRecord> createDataGenerator(Properties
5859
double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0"));
5960
Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0");
6061

61-
LOG.info("Data generator: {} record/sec", recordsPerSecond);
62+
LOG.info("Data generator: {} record/sec", Optional.of(recordsPerSecond));
6263
return new DataGeneratorSource<>(
6364
new AvroGenericStockTradeGeneratorFunction(avroSchema),
6465
Long.MAX_VALUE,
@@ -79,7 +80,7 @@ public static void main(String[] args) throws Exception {
7980
}
8081

8182
Map<String, Properties> applicationProperties = loadApplicationProperties(env);
82-
icebergProperties = applicationProperties.get("Iceberg");
83+
icebergProperties = applicationProperties.getOrDefault("Iceberg", new Properties());
8384

8485
// Get AVRO Schema from the definition bundled with the application
8586
// Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded
@@ -98,14 +99,15 @@ public static void main(String[] args) throws Exception {
9899

99100
// Flink Sink Builder
100101
FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder(icebergProperties, genericRecordDataStream, avroSchema);
102+
101103
// Sink to Iceberg Table
102104
icebergSinkBuilder.append();
103105

104106
env.execute("Flink S3 Table Sink");
105107
}
106108

107109
private static DataStream<GenericRecord> createDataStream(StreamExecutionEnvironment env, Map<String, Properties> applicationProperties, Schema avroSchema) {
108-
Properties dataGeneratorProperties = applicationProperties.get("DataGen");
110+
Properties dataGeneratorProperties = applicationProperties.getOrDefault("DataGen", new Properties());
109111
return env.fromSource(
110112
createDataGenerator(dataGeneratorProperties, avroSchema),
111113
WatermarkStrategy.noWatermarks(),

java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.iceberg.catalog.Catalog;
1313
import org.apache.iceberg.catalog.TableIdentifier;
1414
import org.apache.iceberg.flink.*;
15+
import org.apache.iceberg.io.FileIO;
1516
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
1617
import org.apache.iceberg.flink.sink.FlinkSink;
1718
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
@@ -25,7 +26,7 @@ public class IcebergSinkBuilder {
2526
private static final String DEFAULT_S3_CATALOG_DB = "default";
2627
private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg";
2728
private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol";
28-
private static final String DEFAULT_ICEBERG_OPERATION = "upsert";
29+
private static final String DEFAULT_ICEBERG_OPERATION = "append";
2930
private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol";
3031

3132
/**

0 commit comments

Comments
 (0)