Skip to content

Commit 0738e4d

Browse files
author
Felix John
committed
Upgrade to Iceberg 1.9.1
1 parent ecd3380 commit 0738e4d

File tree

5 files changed

+104
-73
lines changed

5 files changed

+104
-73
lines changed

java/Iceberg/S3TableSink/README.md

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@
22

33
* Flink version: 1.19.0
44
* Flink API: DataStream API
5-
* Iceberg 1.8.1
5+
* Iceberg 1.9.1
66
* Language: Java (11)
77
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
88
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
99

1010
This example demonstrate how to use
1111
[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with S3 Tables.
1212

13-
For simplicity, the application generates synthetic data, random stock prices, internally.
14-
Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records
15-
serialized with AVRO.
13+
For simplicity, the application generates synthetic data, random stock prices, internally. Data is generated as AVRO Generic Record, simulating a real source, e.g. a Kafka Source, that receives records serialized with AVRO.
1614

1715
### Prerequisites
1816

@@ -49,8 +47,7 @@ The application must have IAM permissions to:
4947

5048
### Runtime configuration
5149

52-
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
53-
50+
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties.
5451
When running locally, the configuration is read from the
5552
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
5653

@@ -68,10 +65,7 @@ Runtime parameters:
6865

6966
### Checkpoints
7067

71-
Checkpointing must be enabled. Iceberg commits writes on checkpoint.
72-
73-
When running locally, the application enables checkpoints programmatically, every 10 seconds.
74-
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
68+
Checkpointing must be enabled. Iceberg commits writes on checkpoint. When running locally, the application enables checkpoints programmatically, every 10 seconds.When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
7569

7670

7771
### Known limitations
@@ -83,9 +77,7 @@ At the moment there are current limitations concerning Flink Iceberg integration
8377

8478
### Schema and schema evolution
8579

86-
The application must "know" the AVRO schema on start.
87-
The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry.
88-
This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
80+
The application must "know" the AVRO schema on start. The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
8981

9082
This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible.
9183
Schema changes are not propagated to Iceberg.

java/Iceberg/S3TableSink/pom.xml

Lines changed: 94 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
<target.java.version>11</target.java.version>
1515
<maven.compiler.source>${target.java.version}</maven.compiler.source>
1616
<maven.compiler.target>${target.java.version}</maven.compiler.target>
17-
1817
<flink.version>1.19.0</flink.version>
19-
<avro.version>1.11.3</avro.version>
18+
<avro.version>1.12.0</avro.version>
2019
<scala.version>2.12</scala.version>
2120
<hadoop.version>3.4.0</hadoop.version>
22-
<iceberg.version>1.6.1</iceberg.version>
21+
<iceberg.version>1.9.1</iceberg.version>
2322
<kda.runtime.version>1.2.0</kda.runtime.version>
2423
<log4j.version>2.23.1</log4j.version>
2524
<junit5.version>5.8.1</junit5.version>
@@ -31,81 +30,96 @@
3130
<groupId>org.apache.flink</groupId>
3231
<artifactId>flink-runtime-web</artifactId>
3332
<version>${flink.version}</version>
34-
<scope>provided</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.flink</groupId>
36+
<artifactId>flink-table-planner_${scala.version}</artifactId>
37+
<version>${flink.version}</version>
38+
</dependency>
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-metrics-dropwizard</artifactId>
42+
<version>${flink.version}</version>
3543
</dependency>
3644
<dependency>
3745
<groupId>org.apache.flink</groupId>
3846
<artifactId>flink-streaming-java</artifactId>
3947
<version>${flink.version}</version>
40-
<scope>provided</scope>
4148
</dependency>
4249
<dependency>
4350
<groupId>org.apache.flink</groupId>
4451
<artifactId>flink-table-runtime</artifactId>
4552
<version>${flink.version}</version>
46-
<scope>provided</scope>
4753
</dependency>
4854
<dependency>
4955
<groupId>org.apache.flink</groupId>
50-
<artifactId>flink-table-api-java-bridge</artifactId>
56+
<artifactId>flink-connector-datagen</artifactId>
5157
<version>${flink.version}</version>
5258
</dependency>
5359
<dependency>
5460
<groupId>org.apache.flink</groupId>
55-
<artifactId>flink-table-common</artifactId>
61+
<artifactId>flink-table-api-java-bridge</artifactId>
5662
<version>${flink.version}</version>
5763
</dependency>
5864
<dependency>
5965
<groupId>org.apache.flink</groupId>
60-
<artifactId>flink-metrics-dropwizard</artifactId>
66+
<artifactId>flink-table-common</artifactId>
6167
<version>${flink.version}</version>
6268
</dependency>
6369
<dependency>
6470
<groupId>org.apache.flink</groupId>
6571
<artifactId>flink-avro</artifactId>
6672
<version>${flink.version}</version>
73+
<exclusions>
74+
<exclusion>
75+
<groupId>org.apache.avro</groupId>
76+
<artifactId>avro</artifactId>
77+
</exclusion>
78+
</exclusions>
6779
</dependency>
6880

69-
<!-- Flink Table Dependencies -->
70-
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
81+
<!-- Avro Dependencies -->
7182
<dependency>
72-
<groupId>org.apache.flink</groupId>
73-
<artifactId>flink-table-planner_${scala.version}</artifactId>
74-
<version>${flink.version}</version>
75-
<scope>provided</scope>
83+
<groupId>org.apache.avro</groupId>
84+
<artifactId>avro</artifactId>
85+
<version>${avro.version}</version>
7686
</dependency>
7787

78-
7988
<!-- MSF Dependencies -->
8089
<dependency>
8190
<groupId>com.amazonaws</groupId>
8291
<artifactId>aws-kinesisanalytics-runtime</artifactId>
8392
<version>${kda.runtime.version}</version>
84-
<scope>provided</scope>
8593
</dependency>
8694

8795
<!-- S3 Tables Dependencies -->
8896
<dependency>
8997
<groupId>software.amazon.awssdk</groupId>
9098
<artifactId>s3tables</artifactId>
9199
<version>2.29.26</version>
100+
<exclusions>
101+
<exclusion>
102+
<groupId>ch.qos.logback</groupId>
103+
<artifactId>logback-classic</artifactId>
104+
</exclusion>
105+
</exclusions>
92106
</dependency>
93107
<dependency>
94108
<groupId>software.amazon.s3tables</groupId>
95109
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
96-
<version>0.1.3</version>
97-
</dependency>
98-
<dependency>
99-
<groupId>org.apache.flink</groupId>
100-
<artifactId>flink-connector-files</artifactId>
101-
<version>${flink.version}</version>
102-
<scope>provided</scope>
110+
<version>0.1.5</version>
111+
<exclusions>
112+
<exclusion>
113+
<groupId>ch.qos.logback</groupId>
114+
<artifactId>logback-classic</artifactId>
115+
</exclusion>
116+
</exclusions>
103117
</dependency>
104118

105119
<!-- Hadoop Dependencies -->
106120
<dependency>
107121
<groupId>org.apache.hadoop</groupId>
108-
<artifactId>hadoop-client</artifactId>
122+
<artifactId>hadoop-common</artifactId>
109123
<version>${hadoop.version}</version>
110124
<exclusions>
111125
<exclusion>
@@ -116,52 +130,73 @@
116130
<groupId>org.slf4j</groupId>
117131
<artifactId>slf4j-reload4j</artifactId>
118132
</exclusion>
133+
<exclusion>
134+
<groupId>org.slf4j</groupId>
135+
<artifactId>slf4j-log4j12</artifactId>
136+
</exclusion>
137+
<exclusion>
138+
<groupId>ch.qos.logback</groupId>
139+
<artifactId>logback-classic</artifactId>
140+
</exclusion>
141+
<exclusion>
142+
<groupId>ch.qos.logback</groupId>
143+
<artifactId>logback-core</artifactId>
144+
</exclusion>
119145
</exclusions>
120146
</dependency>
147+
148+
<!-- Testing Dependencies -->
121149
<dependency>
122-
<groupId>org.apache.hadoop</groupId>
123-
<artifactId>hadoop-common</artifactId>
124-
<version>${hadoop.version}</version>
125-
</dependency>
126-
<dependency>
127-
<groupId>org.apache.hadoop</groupId>
128-
<artifactId>hadoop-mapreduce-client-core</artifactId>
129-
<version>${hadoop.version}</version>
150+
<groupId>org.junit.jupiter</groupId>
151+
<artifactId>junit-jupiter</artifactId>
152+
<version>${junit5.version}</version>
153+
<scope>test</scope>
130154
</dependency>
131155

132156
<!-- Iceberg Dependencies -->
133157
<dependency>
134158
<groupId>org.apache.iceberg</groupId>
135159
<artifactId>iceberg-core</artifactId>
136160
<version>${iceberg.version}</version>
161+
<exclusions>
162+
<exclusion>
163+
<groupId>org.apache.avro</groupId>
164+
<artifactId>avro</artifactId>
165+
</exclusion>
166+
</exclusions>
137167
</dependency>
138168
<dependency>
139169
<groupId>org.apache.iceberg</groupId>
140-
<artifactId>iceberg-flink</artifactId>
141-
<version>${iceberg.version}</version>
142-
</dependency>
143-
<dependency>
144-
<groupId>org.apache.iceberg</groupId>
145-
<artifactId>iceberg-flink</artifactId>
170+
<artifactId>iceberg-aws</artifactId>
146171
<version>${iceberg.version}</version>
172+
<exclusions>
173+
<exclusion>
174+
<groupId>org.apache.avro</groupId>
175+
<artifactId>avro</artifactId>
176+
</exclusion>
177+
</exclusions>
147178
</dependency>
148179
<dependency>
149180
<groupId>org.apache.iceberg</groupId>
150-
<artifactId>iceberg-aws-bundle</artifactId>
181+
<artifactId>iceberg-flink-1.19</artifactId>
151182
<version>${iceberg.version}</version>
183+
<exclusions>
184+
<exclusion>
185+
<groupId>org.apache.avro</groupId>
186+
<artifactId>avro</artifactId>
187+
</exclusion>
188+
</exclusions>
152189
</dependency>
153190
<dependency>
154191
<groupId>org.apache.iceberg</groupId>
155-
<artifactId>iceberg-aws</artifactId>
192+
<artifactId>iceberg-api</artifactId>
156193
<version>${iceberg.version}</version>
157-
</dependency>
158-
159-
<!-- Testing Dependencies -->
160-
<dependency>
161-
<groupId>org.junit.jupiter</groupId>
162-
<artifactId>junit-jupiter</artifactId>
163-
<version>${junit5.version}</version>
164-
<scope>test</scope>
194+
<exclusions>
195+
<exclusion>
196+
<groupId>org.apache.avro</groupId>
197+
<artifactId>avro</artifactId>
198+
</exclusion>
199+
</exclusions>
165200
</dependency>
166201

167202
<!-- Logging Dependencies -->
@@ -179,13 +214,6 @@
179214
<groupId>org.apache.logging.log4j</groupId>
180215
<artifactId>log4j-core</artifactId>
181216
<version>${log4j.version}</version>
182-
<scope>runtime</scope>
183-
</dependency>
184-
<dependency>
185-
<groupId>org.apache.iceberg</groupId>
186-
<artifactId>iceberg-flink-1.19</artifactId>
187-
<version>1.7.0</version>
188-
<scope>compile</scope>
189217
</dependency>
190218
</dependencies>
191219

@@ -206,7 +234,7 @@
206234
<plugin>
207235
<groupId>org.apache.maven.plugins</groupId>
208236
<artifactId>maven-shade-plugin</artifactId>
209-
<version>3.2.1</version>
237+
<version>3.5.0</version>
210238
<executions>
211239
<execution>
212240
<phase>package</phase>
@@ -219,7 +247,7 @@
219247
<exclude>org.apache.flink:force-shading</exclude>
220248
<exclude>com.google.code.findbugs:jsr305</exclude>
221249
<exclude>org.slf4j:*</exclude>
222-
<exclude>log4j:*</exclude>
250+
<exclude>org.apache.logging.log4j:*</exclude>
223251
</excludes>
224252
</artifactSet>
225253
<filters>
@@ -229,9 +257,17 @@
229257
<exclude>META-INF/*.SF</exclude>
230258
<exclude>META-INF/*.DSA</exclude>
231259
<exclude>META-INF/*.RSA</exclude>
260+
<exclude>META-INF/versions/**</exclude>
232261
</excludes>
233262
</filter>
234263
</filters>
264+
<!-- Shading org.apache.avro, see: https://github.com/apache/iceberg/issues/13481 -->
265+
<relocations>
266+
<relocation>
267+
<pattern>org.apache.avro</pattern>
268+
<shadedPattern>shaded.org.apache.avro</shadedPattern>
269+
</relocation>
270+
</relocations>
235271
<transformers>
236272
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
237273
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.util.Map;
2929
import java.util.Objects;
30+
import java.util.Optional;
3031
import java.util.Properties;
3132

3233
public class StreamingJob {
@@ -58,7 +59,7 @@ private static DataGeneratorSource<GenericRecord> createDataGenerator(Properties
5859
double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0"));
5960
Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0");
6061

61-
LOG.info("Data generator: {} record/sec", recordsPerSecond);
62+
LOG.info("Data generator: {} record/sec", Optional.of(recordsPerSecond));
6263
return new DataGeneratorSource<>(
6364
new AvroGenericStockTradeGeneratorFunction(avroSchema),
6465
Long.MAX_VALUE,
@@ -98,6 +99,7 @@ public static void main(String[] args) throws Exception {
9899

99100
// Flink Sink Builder
100101
FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder(icebergProperties, genericRecordDataStream, avroSchema);
102+
101103
// Sink to Iceberg Table
102104
icebergSinkBuilder.append();
103105

java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.iceberg.catalog.Catalog;
1313
import org.apache.iceberg.catalog.TableIdentifier;
1414
import org.apache.iceberg.flink.*;
15+
import org.apache.iceberg.io.FileIO;
1516
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
1617
import org.apache.iceberg.flink.sink.FlinkSink;
1718
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;

java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"PropertyGroupId": "Iceberg",
1010
"PropertyMap": {
1111

12-
"table.bucket.arn": "<<ARN>>",
12+
"table.bucket.arn": "your-table-arn",
1313
"catalog.db": "default",
1414
"catalog.table": "prices_s3table",
1515
"partition.fields": "symbol",

0 commit comments

Comments
 (0)