From ac3bbcf7250992f201cee240ce0f4b14749c4513 Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 4 Aug 2025 17:59:25 +0300 Subject: [PATCH 1/8] init --- java/flink/pom.xml | 15 +++++++++------ .../hsfs/flink/engine/FlinkEngine.java | 10 +++++----- .../hsfs/flink/engine/PojoToAvroRecord.java | 6 +++--- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 9c10b52b3b..1ae8be063e 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -12,7 +12,7 @@ hsfs-flink - 1.17.1.0 + 2.0.0 2.13.4.2 1.79 14.0.1 @@ -42,10 +42,6 @@ com.databricks * - - org.scala-lang - * - org.apache.kafka * @@ -66,11 +62,18 @@ + + + org.apache.flink + flink-connector-base + ${flink.version} + + org.apache.flink flink-connector-kafka - ${flink.version} + 4.0.0-2.0 provided diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index 602dae476a..b1ed85efcc 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -30,7 +30,6 @@ import com.logicalclocks.hsfs.metadata.HopsworksExternalClient; import com.logicalclocks.hsfs.metadata.HopsworksInternalClient; import com.logicalclocks.hsfs.metadata.StorageConnectorApi; -import com.twitter.chill.Base64; import lombok.Getter; import org.apache.avro.generic.GenericRecord; @@ -53,6 +52,7 @@ import java.security.UnrecoverableKeyException; import java.security.cert.Certificate; import java.security.cert.CertificateEncodingException; +import java.util.Base64; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -87,7 +87,7 @@ public DataStreamSink writeDataStream(StreamFeatureGroup streamFeatureGroup, .setBootstrapServers(properties.getProperty("bootstrap.servers")) .setKafkaProducerConfig(properties) .setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup)) - .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); Map complexFeatureSchemas = new HashMap<>(); for (String featureName : streamFeatureGroup.getComplexFeatures()) { @@ -187,7 +187,7 @@ private String getKey(KeyStore keyStore, String password) throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException { String keyAlias = keyStore.aliases().nextElement(); return "-----BEGIN PRIVATE KEY-----\n" - + Base64.encodeBytes(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded()) + + Base64.getEncoder().encodeToString(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded()) + "\n-----END PRIVATE KEY-----"; } @@ -198,7 +198,7 @@ private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, StringBuilder certificateChainBuilder = new StringBuilder(); for (Certificate certificate : certificateChain) { certificateChainBuilder.append("-----BEGIN CERTIFICATE-----\n") - .append(Base64.encodeBytes(certificate.getEncoded())) + .append(Base64.getEncoder().encodeToString(certificate.getEncoded())) .append("\n-----END CERTIFICATE-----\n"); } @@ -208,7 +208,7 @@ private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, private String getRootCA(KeyStore trustStore) throws KeyStoreException, CertificateEncodingException { String rootCaAlias = trustStore.aliases().nextElement(); return "-----BEGIN CERTIFICATE-----\n" - + Base64.encodeBytes(trustStore.getCertificate(rootCaAlias).getEncoded()) + + Base64.getEncoder().encodeToString(trustStore.getCertificate(rootCaAlias).getEncoded()) + "\n-----END CERTIFICATE-----"; } diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java index d2a37c26ec..fa0890821d 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java @@ -28,8 +28,8 @@ import org.apache.avro.reflect.ReflectData; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import java.io.ByteArrayOutputStream; @@ -98,8 +98,8 @@ public GenericRecord map(T input) throws Exception { } @Override - public void open(Configuration configuration) throws Exception { - super.open(configuration); + public void open(OpenContext openContext) throws Exception { + super.open(openContext); this.deserializedSchema = new Schema.Parser().parse(this.schema); this.deserializedEncodedSchema = new Schema.Parser().parse(this.encodedSchema); this.deserializedComplexFeatureSchemas = new HashMap<>(); From 7500077a31964cc2ed9c06e077c9ad22815b3163 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 6 Aug 2025 10:17:49 +0300 Subject: [PATCH 2/8] change flink-connector-base to provided --- java/flink/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 1ae8be063e..1def9976e6 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -67,6 +67,7 @@ org.apache.flink flink-connector-base ${flink.version} + provided From e072c465cac3796b48cbb8b8dcee3f8ad9dec41a Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 13 Aug 2025 14:52:53 +0300 Subject: [PATCH 3/8] make code version independent --- java/flink/pom.xml | 16 ++----------- .../hsfs/flink/engine/FlinkEngine.java | 20 ++++++++++++---- .../flink/engine/KafkaRecordSerializer.java | 2 +- .../hsfs/flink/engine/PojoToAvroRecord.java | 23 ++++++++++--------- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 1def9976e6..8056fd2765 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -12,7 +12,7 @@ hsfs-flink - 2.0.0 + 1.17.1.0 2.13.4.2 1.79 14.0.1 @@ -42,10 +42,6 @@ com.databricks * - - org.apache.kafka - * - @@ -62,19 +58,11 @@ - - - org.apache.flink - flink-connector-base - ${flink.version} - provided - - org.apache.flink flink-connector-kafka - 4.0.0-2.0 + ${flink.version} provided diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index 150fe14739..1a99475cf6 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -33,7 +33,6 @@ import lombok.Getter; import org.apache.avro.generic.GenericRecord; -import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; @@ -46,6 +45,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -56,6 +56,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; public class FlinkEngine extends EngineBase { private static FlinkEngine INSTANCE = null; @@ -79,23 +80,32 @@ private FlinkEngine() { public DataStreamSink writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream dataStream, Map writeOptions) throws FeatureStoreException, IOException { - DataStream genericDataStream = (DataStream) dataStream; Properties properties = new Properties(); properties.putAll(getKafkaConfig(streamFeatureGroup, writeOptions)); + KafkaRecordSerializer serializer = new KafkaRecordSerializer(streamFeatureGroup); + + // Generate transaction id from the kafka headers (unique for all ingestions) + String transactionalId = serializer.headerMap.entrySet().stream() + .map(e -> new String(e.getValue(), StandardCharsets.UTF_8)) + .collect(Collectors.joining("_")); + + // MUST setTransactionalIdPrefix when DeliveryGuarantee is not AT_LEAST_ONCE and it must be unique per sink KafkaSink sink = KafkaSink.builder() .setBootstrapServers(properties.getProperty("bootstrap.servers")) .setKafkaProducerConfig(properties) - .setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup)) - .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setRecordSerializer(serializer) + .setTransactionalIdPrefix(transactionalId) .build(); Map complexFeatureSchemas = new HashMap<>(); for (String featureName : streamFeatureGroup.getComplexFeatures()) { complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName)); } + DataStream genericDataStream = (DataStream) dataStream; DataStream avroRecordDataStream = - genericDataStream.map(new PojoToAvroRecord( + genericDataStream + .map(new PojoToAvroRecord( streamFeatureGroup.getAvroSchema(), streamFeatureGroup.getEncodedAvroSchema(), complexFeatureSchemas)) diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java index 7409ed94e8..270eeb430f 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java @@ -41,7 +41,7 @@ public class KafkaRecordSerializer implements KafkaRecordSerializationSchema primaryKeys; - private final Map headerMap; + final Map headerMap; KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException { this.topic = streamFeatureGroup.getOnlineTopicName(); diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java index bff24131d8..c3fecd68e4 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java @@ -22,7 +22,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; @@ -36,7 +35,7 @@ public class PojoToAvroRecord extends RichMapFunction imple private final String encodedFeatureGroupSchemaStr; private final Map complexFeatureSchemasStr; - // org.apache.avro.Schema$Field is not serializable. Create in open() and reused later on + // org.apache.avro.Schema$Field is not serializable. Create in once and reused later on private transient Schema featureGroupSchema; private transient Schema encodedFeatureGroupSchema; private transient Map complexFeatureSchemas; @@ -51,14 +50,7 @@ public PojoToAvroRecord(String featureGroupSchema, this.complexFeatureSchemasStr = complexFeatureSchemas; } - @Override - public GenericRecord map(T input) throws Exception { - return PojoToAvroUtils.convertPojoToGenericRecord( - input, featureGroupSchema, encodedFeatureGroupSchema, complexFeatureSchemas); - } - - @Override - public void open(OpenContext openContext) throws Exception { + public void init() throws Exception { this.featureGroupSchema = new Schema.Parser().parse(this.featureGroupSchemaStr); this.encodedFeatureGroupSchema = new Schema.Parser().parse(this.encodedFeatureGroupSchemaStr); this.complexFeatureSchemas = this.complexFeatureSchemasStr @@ -69,9 +61,18 @@ public void open(OpenContext openContext) throws Exception { this.producedType = new GenericRecordAvroTypeInfo(encodedFeatureGroupSchema); } + @Override + public GenericRecord map(T input) throws Exception { + // handle serialization across task managers + if (featureGroupSchema == null || encodedFeatureGroupSchema == null || complexFeatureSchemas == null) { + init(); + } + return PojoToAvroUtils.convertPojoToGenericRecord( + input, featureGroupSchema, encodedFeatureGroupSchema, complexFeatureSchemas); + } + @Override public TypeInformation getProducedType() { return producedType; } - } From e0c5f9f0eea4f89986bc9eb63ac1b96f6c3cbbc6 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 13 Aug 2025 14:54:52 +0300 Subject: [PATCH 4/8] small fix --- .../com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java index c3fecd68e4..bb938072b6 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java @@ -35,7 +35,7 @@ public class PojoToAvroRecord extends RichMapFunction imple private final String encodedFeatureGroupSchemaStr; private final Map complexFeatureSchemasStr; - // org.apache.avro.Schema$Field is not serializable. Create in once and reused later on + // org.apache.avro.Schema$Field is not serializable. Create once and reused later on private transient Schema featureGroupSchema; private transient Schema encodedFeatureGroupSchema; private transient Map complexFeatureSchemas; From 4eeaf1f17beb3edebbf687c154c6d9caf53266cc Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 25 Aug 2025 14:27:57 +0300 Subject: [PATCH 5/8] bump avro (did not change anything for spark) --- java/flink/pom.xml | 1 + python/pyproject.toml | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 8056fd2765..41fba05d8a 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -77,6 +77,7 @@ org.apache.flink flink-avro ${flink.version} + provided org.apache.flink diff --git a/python/pyproject.toml b/python/pyproject.toml index 058b967105..c1aedbc276 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -45,7 +45,7 @@ dependencies = [ "numpy<2", "pyjks", "mock", - "avro==1.11.3", + "avro==1.12.0", "PyMySQL[rsa]", "tzlocal", "fsspec", @@ -62,7 +62,7 @@ dependencies = [ python = [ "pyarrow>=17.0", "confluent-kafka<=2.6.1", - "fastavro>=1.4.11,<=1.11.1", + "fastavro>=1.4.11,<=1.12.0", "tqdm", ] sqlalchemy-1 = [ From 31ace56f4216fe290f42ba91ec74dad93eafe208 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 9 Oct 2025 14:54:26 +0300 Subject: [PATCH 6/8] bump avro --- java/pom.xml | 4 ++-- python/pyproject.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index e20d1cc8f5..cf8d20d2a3 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -39,7 +39,7 @@ 5.9.1 2.22.0 4.3.1 - 1.11.4 + 1.12.0 1.13.1 true @@ -306,7 +306,7 @@ 2.0.7-spark-3.5 spark3.5 2.15.2 - 1.11.4 + 1.12.0 3.4.1 diff --git a/python/pyproject.toml b/python/pyproject.toml index 3ff84523a3..976b27c25f 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -64,7 +64,7 @@ hopsworks-mcp = "hopsworks.mcp.main:main" python = [ "pyarrow>=17.0", "confluent-kafka<=2.11.1", - "fastavro>=1.4.11,<=1.11.1", + "fastavro>=1.4.11,<=1.12.0", "tqdm", "hops-deltalake; python_version >= '3.9' and (sys_platform == 'linux' or (sys_platform == 'darwin' and platform_machine == 'arm64'))", ] From 0b86296650403f57977a6b2670fcb867792015ad Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 9 Oct 2025 14:57:04 +0300 Subject: [PATCH 7/8] keep 1.11.4 avro for jdk 8 --- java/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index cf8d20d2a3..e20d1cc8f5 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -39,7 +39,7 @@ 5.9.1 2.22.0 4.3.1 - 1.12.0 + 1.11.4 1.13.1 true @@ -306,7 +306,7 @@ 2.0.7-spark-3.5 spark3.5 2.15.2 - 1.12.0 + 1.11.4 3.4.1 From 4040673bc696765a8e57098327db30f675053655 Mon Sep 17 00:00:00 2001 From: Ralf Date: Wed, 15 Oct 2025 10:44:51 +0300 Subject: [PATCH 8/8] Make avro version dependent on framework Co-authored-by: Fabio Buso --- java/beam/pom.xml | 4 ++++ java/flink/pom.xml | 4 ++++ java/pom.xml | 1 - java/spark/pom.xml | 4 ++++ 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/java/beam/pom.xml b/java/beam/pom.xml index 82ed8440df..f139af84a1 100644 --- a/java/beam/pom.xml +++ b/java/beam/pom.xml @@ -31,6 +31,10 @@ org.apache.kafka * + + org.apache.avro + avro + diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 41fba05d8a..ed36531e24 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -42,6 +42,10 @@ com.databricks * + + org.apache.avro + avro + diff --git a/java/pom.xml b/java/pom.xml index e20d1cc8f5..e029e98be8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -306,7 +306,6 @@ 2.0.7-spark-3.5 spark3.5 2.15.2 - 1.11.4 3.4.1 diff --git a/java/spark/pom.xml b/java/spark/pom.xml index ebe3b1b7d6..63f95d4bec 100644 --- a/java/spark/pom.xml +++ b/java/spark/pom.xml @@ -49,6 +49,10 @@ com.fasterxml.jackson.core jackson-databind + + org.apache.avro + avro +