From 7b2a1f3654fddb11b28cdb0d17367ebf2d35244f Mon Sep 17 00:00:00 2001 From: Francisco Date: Tue, 27 May 2025 16:58:33 +0200 Subject: [PATCH] first commit --- java/Iceberg/S3TableSQLJSON/README.md | 85 +++++++ java/Iceberg/S3TableSQLJSON/pom.xml | 238 ++++++++++++++++++ .../src/main/java/S3TableSQLJSONExample.java | 187 ++++++++++++++ .../src/main/java/StockPrice.java | 60 +++++ .../java/StockPriceGeneratorFunction.java | 24 ++ .../flink-application-properties-dev.json | 16 ++ .../src/main/resources/log4j2.properties | 13 + .../src/main/resources/price.avsc | 23 ++ 8 files changed, 646 insertions(+) create mode 100644 java/Iceberg/S3TableSQLJSON/README.md create mode 100644 java/Iceberg/S3TableSQLJSON/pom.xml create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/resources/log4j2.properties create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/resources/price.avsc diff --git a/java/Iceberg/S3TableSQLJSON/README.md b/java/Iceberg/S3TableSQLJSON/README.md new file mode 100644 index 0000000..89b6ebd --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/README.md @@ -0,0 +1,85 @@ +# Flink Iceberg Sink using SQL API with S3 Tables + +* Flink version: 1.19.0 +* Flink API: SQL API +* Iceberg 1.8.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [S3 Tables](https://docs.aws.amazon.com/s3/latest/userguide/s3-tables.html) + +This example demonstrates how to use +[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Amazon S3 Tables Catalog. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records +that can be converted to table format for SQL operations. + +### Prerequisites + +#### Create a Table Bucket +The sample application expects the S3 Table Bucket to exist and to have the ARN in the local environment: +```bash +aws s3tables create-table-bucket --name flink-example +{ + "arn": "arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example" + +} +``` + +If you already did this, you can query to get the ARN like this: + +```bash +aws s3tables list-table-buckets +``` + +This will show you the list of table buckets. Select the one you wish to write to and paste it into the config file in this project. + +#### Create a Namespace in the Table Bucket (Database) +The sample application expects the Namespace in the Table Bucket to exist +```bash +aws s3tables create-namespace \ + --table-bucket-arn arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example \ + --namespace default +``` + +#### IAM Permissions + +The application must have IAM permissions to: +* Write and Read from the S3 Table + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | +| `Iceberg` | `table.bucket.arn` | (mandatory) | ARN of the S3 Tables bucket, for example `arn:aws:s3tables:region:account:bucket/my-bucket`. | +| `Iceberg` | `catalog.db` | `iceberg` | Name of the S3 Tables Catalog database. | +| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the S3 Tables Catalog table. | + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 30 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +Make sure to configure the appropriate AWS credentials and region when running locally, and ensure the provided S3 Tables bucket ARN is valid and accessible. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. \ No newline at end of file diff --git a/java/Iceberg/S3TableSQLJSON/pom.xml b/java/Iceberg/S3TableSQLJSON/pom.xml new file mode 100644 index 0000000..511ba19 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/pom.xml @@ -0,0 +1,238 @@ + + + 4.0.0 + + com.amazonaws + s3-table-sql-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.19.0 + 1.11.3 + 2.12 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + software.amazon.awssdk + s3tables + 2.31.50 + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + 0.1.6 + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink-1.19 + ${iceberg.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + S3TableSQLJSONExample + + + + + + + + + diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java new file mode 100644 index 0000000..d3f8faf --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java @@ -0,0 +1,187 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: MIT-0 + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this + * software and associated documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the rights to use, copy, modify, + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.*; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class S3TableSQLJSONExample { + // Constants + private static final String CATALOG_NAME = "s3"; + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final Logger LOG = LoggerFactory.getLogger(S3TableSQLJSONExample.class); + + // Configuration properties + private static String tableBucketArn; + private static String s3TableDatabase; + private static String s3Table; + + public static void main(String[] args) throws Exception { + // 1. Initialize environments - using standard environment instead of WebUI for production consistency + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2. Load properties and configure environment + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + + // Configure local development settings if needed + if (isLocal(env)) { + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + // 3. Setup configuration properties with validation + setupS3TableProperties(icebergProperties); + Catalog s3Catalog = createS3Catalog(tableEnv); + + tableEnv.registerCatalog(CATALOG_NAME,s3Catalog); + + // 4. Create data generator source + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // 5. Convert DataStream to Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + String sinkTableName = CATALOG_NAME + "." + s3TableDatabase + "." + s3Table; + + // Define and create table with schema matching AVRO schema from DataStream example + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + " (" + + "`timestamp` STRING, " + + "symbol STRING," + + "price FLOAT," + + "volumes INT" + + ") PARTITIONED BY (symbol)"; + + LOG.info("Creating table with statement: {}", createTableStatement); + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations - Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + + " SELECT `timestamp`, symbol, price, volumes FROM stockPriceTable"; + LOG.info("Executing insert statement: {}", insertQuery); + TableResult insertResult = tableEnv.executeSql(insertQuery); + + // Keep the job running to continuously insert data + LOG.info("Application started successfully. Inserting data into S3 table: {}", sinkTableName); + + } + + private static void setupS3TableProperties(Properties icebergProperties) { + tableBucketArn = icebergProperties.getProperty("table.bucket.arn"); + s3TableDatabase = icebergProperties.getProperty("catalog.db", "iceberg"); + s3Table = icebergProperties.getProperty("catalog.table", "prices_s3tables"); + + Preconditions.checkNotNull(tableBucketArn, "You must supply a table bucket ARN."); + Preconditions.checkNotNull(s3TableDatabase, "You must supply a database name"); + Preconditions.checkNotNull(s3Table, "You must supply a table name"); + + // Validate ARN format + validateArn(tableBucketArn); + + LOG.info("S3 Tables configuration: bucket={}, database={}, table={}", + tableBucketArn, s3TableDatabase, s3Table); + } + + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + LOG.info("Data generator: {} record/sec", recordsPerSecond); + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } + + /** + * Defines a config object with S3 Table specific catalog and io implementations + * Then, uses that to create the Flink catalog + */ + private static Catalog createS3Catalog(StreamTableEnvironment tableEnv) { + + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", tableBucketArn); + catalogProperties.put("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); + //Loading Glue Data Catalog + CatalogLoader glueCatalogLoader = CatalogLoader.custom( + CATALOG_NAME, + catalogProperties, + new org.apache.hadoop.conf.Configuration(), + "software.amazon.s3tables.iceberg.S3TablesCatalog"); + + + FlinkCatalog flinkCatalog = new FlinkCatalog(CATALOG_NAME,s3TableDatabase, Namespace.empty(),glueCatalogLoader,true,1000); + return flinkCatalog; + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime + * or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(S3TableSQLJSONExample.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void validateArn(String arn) { + String arnPattern = "^arn:aws[a-zA-Z-]*:[a-zA-Z0-9-]+:[a-zA-Z0-9-]*:[0-9]{12}:[a-zA-Z0-9-_/:.]+$"; + Preconditions.checkArgument(arn != null && arn.matches(arnPattern), + "Invalid ARN format: %s. ARN must match pattern: arn:partition:service:region:account-id:resource", arn); + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java new file mode 100644 index 0000000..c386c2a --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java @@ -0,0 +1,60 @@ +import java.time.Instant; + +public class StockPrice { + private String timestamp; + private String symbol; + private Float price; + private Integer volumes; + + public StockPrice() { + } + + public StockPrice(String timestamp, String symbol, Float price, Integer volumes) { + this.timestamp = timestamp; + this.symbol = symbol; + this.price = price; + this.volumes = volumes; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public Float getPrice() { + return price; + } + + public void setPrice(Float price) { + this.price = price; + } + + public Integer getVolumes() { + return volumes; + } + + public void setVolumes(Integer volumes) { + this.volumes = volumes; + } + + @Override + public String toString() { + return "StockPrice{" + + "timestamp='" + timestamp + '\'' + + ", symbol='" + symbol + '\'' + + ", price=" + price + + ", volumes=" + volumes + + '}'; + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..38e236b --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java @@ -0,0 +1,24 @@ +import org.apache.commons.lang3.RandomUtils; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import java.time.Instant; + +/** + * Function used by DataGen source to generate random records as StockPrice POJOs. + * + * The generator mimics the behavior of AvroGenericStockTradeGeneratorFunction + * from the IcebergDataStreamSink example. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + + @Override + public StockPrice map(Long sequence) throws Exception { + String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)]; + float price = RandomUtils.nextFloat(0, 10); + int volumes = RandomUtils.nextInt(0, 1000000); + String timestamp = Instant.now().toString(); + + return new StockPrice(timestamp, symbol, price, volumes); + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..bce4273 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,16 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 10.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "table.bucket.arn": "arn:aws:s3tables:us-east-1:111111111:bucket/iceberg", + "catalog.db": "iceberg", + "catalog.table": "prices_s3table" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/log4j2.properties b/java/Iceberg/S3TableSQLJSON/src/main/resources/log4j2.properties new file mode 100644 index 0000000..a6cccce --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/log4j2.properties @@ -0,0 +1,13 @@ +# Log4j2 configuration +status = warn +name = PropertiesConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = ConsoleAppender +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# Root logger configuration +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/price.avsc b/java/Iceberg/S3TableSQLJSON/src/main/resources/price.avsc new file mode 100644 index 0000000..6303e0d --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/price.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "Price", + "namespace": "com.amazonaws.services.msf.avro", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "symbol", + "type": "string" + }, + { + "name": "price", + "type": "float" + }, + { + "name": "volumes", + "type": "int" + } + ] +} \ No newline at end of file