diff --git a/java/beam/pom.xml b/java/beam/pom.xml index 82ed8440d..f139af84a 100644 --- a/java/beam/pom.xml +++ b/java/beam/pom.xml @@ -31,6 +31,10 @@ org.apache.kafka * + + org.apache.avro + avro + diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 9c10b52b3..ed36531e2 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -43,12 +43,8 @@ * - org.scala-lang - * - - - org.apache.kafka - * + org.apache.avro + avro @@ -85,6 +81,7 @@ org.apache.flink flink-avro ${flink.version} + provided org.apache.flink diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index 46b5fd4ac..e28973585 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -30,11 +30,9 @@ import com.logicalclocks.hsfs.metadata.HopsworksExternalClient; import com.logicalclocks.hsfs.metadata.HopsworksInternalClient; import com.logicalclocks.hsfs.metadata.StorageConnectorApi; -import com.twitter.chill.Base64; import lombok.Getter; import org.apache.avro.generic.GenericRecord; -import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; @@ -47,15 +45,18 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.Certificate; import java.security.cert.CertificateEncodingException; +import java.util.Base64; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; public class FlinkEngine extends EngineBase { private static FlinkEngine INSTANCE = null; @@ -79,23 +80,32 @@ private FlinkEngine() { public DataStreamSink writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream dataStream, Map writeOptions) throws FeatureStoreException, IOException { - DataStream genericDataStream = (DataStream) dataStream; Properties properties = new Properties(); properties.putAll(getKafkaConfig(streamFeatureGroup, writeOptions)); + KafkaRecordSerializer serializer = new KafkaRecordSerializer(streamFeatureGroup); + + // Generate transaction id from the kafka headers (unique for all ingestions) + String transactionalId = serializer.headerMap.entrySet().stream() + .map(e -> new String(e.getValue(), StandardCharsets.UTF_8)) + .collect(Collectors.joining("_")); + + // MUST setTransactionalIdPrefix when DeliveryGuarantee is not AT_LEAST_ONCE and it must be unique per sink KafkaSink sink = KafkaSink.builder() .setBootstrapServers(properties.getProperty("bootstrap.servers")) .setKafkaProducerConfig(properties) - .setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup)) - .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setRecordSerializer(serializer) + .setTransactionalIdPrefix(transactionalId) .build(); Map complexFeatureSchemas = new HashMap<>(); for (String featureName : streamFeatureGroup.getComplexFeatures()) { complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName)); } + DataStream genericDataStream = (DataStream) dataStream; DataStream avroRecordDataStream = - genericDataStream.map(new PojoToAvroRecord( + genericDataStream + .map(new PojoToAvroRecord( streamFeatureGroup.getAvroSchema(), streamFeatureGroup.getEncodedAvroSchema(), complexFeatureSchemas)) @@ -191,7 +201,7 @@ private String getKey(KeyStore keyStore, String password) throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException { String keyAlias = keyStore.aliases().nextElement(); return "-----BEGIN PRIVATE KEY-----\n" - + Base64.encodeBytes(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded()) + + Base64.getEncoder().encodeToString(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded()) + "\n-----END PRIVATE KEY-----"; } @@ -202,7 +212,7 @@ private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, StringBuilder certificateChainBuilder = new StringBuilder(); for (Certificate certificate : certificateChain) { certificateChainBuilder.append("-----BEGIN CERTIFICATE-----\n") - .append(Base64.encodeBytes(certificate.getEncoded())) + .append(Base64.getEncoder().encodeToString(certificate.getEncoded())) .append("\n-----END CERTIFICATE-----\n"); } @@ -212,7 +222,7 @@ private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, private String getRootCA(KeyStore trustStore) throws KeyStoreException, CertificateEncodingException { String rootCaAlias = trustStore.aliases().nextElement(); return "-----BEGIN CERTIFICATE-----\n" - + Base64.encodeBytes(trustStore.getCertificate(rootCaAlias).getEncoded()) + + Base64.getEncoder().encodeToString(trustStore.getCertificate(rootCaAlias).getEncoded()) + "\n-----END CERTIFICATE-----"; } diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java index b793e9a40..9f28ac306 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java @@ -41,7 +41,7 @@ public class KafkaRecordSerializer implements KafkaRecordSerializationSchema primaryKeys; - private final Map headerMap; + final Map headerMap; public KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException { this.topic = streamFeatureGroup.getOnlineTopicName(); diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java index cd5fb6d59..bb938072b 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/PojoToAvroRecord.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import java.util.Map; @@ -36,7 +35,7 @@ public class PojoToAvroRecord extends RichMapFunction imple private final String encodedFeatureGroupSchemaStr; private final Map complexFeatureSchemasStr; - // org.apache.avro.Schema$Field is not serializable. Create in open() and reused later on + // org.apache.avro.Schema$Field is not serializable. Create once and reused later on private transient Schema featureGroupSchema; private transient Schema encodedFeatureGroupSchema; private transient Map complexFeatureSchemas; @@ -51,14 +50,7 @@ public PojoToAvroRecord(String featureGroupSchema, this.complexFeatureSchemasStr = complexFeatureSchemas; } - @Override - public GenericRecord map(T input) throws Exception { - return PojoToAvroUtils.convertPojoToGenericRecord( - input, featureGroupSchema, encodedFeatureGroupSchema, complexFeatureSchemas); - } - - @Override - public void open(Configuration configuration) throws Exception { + public void init() throws Exception { this.featureGroupSchema = new Schema.Parser().parse(this.featureGroupSchemaStr); this.encodedFeatureGroupSchema = new Schema.Parser().parse(this.encodedFeatureGroupSchemaStr); this.complexFeatureSchemas = this.complexFeatureSchemasStr @@ -69,9 +61,18 @@ public void open(Configuration configuration) throws Exception { this.producedType = new GenericRecordAvroTypeInfo(encodedFeatureGroupSchema); } + @Override + public GenericRecord map(T input) throws Exception { + // handle serialization across task managers + if (featureGroupSchema == null || encodedFeatureGroupSchema == null || complexFeatureSchemas == null) { + init(); + } + return PojoToAvroUtils.convertPojoToGenericRecord( + input, featureGroupSchema, encodedFeatureGroupSchema, complexFeatureSchemas); + } + @Override public TypeInformation getProducedType() { return producedType; } - } diff --git a/java/pom.xml b/java/pom.xml index e20d1cc8f..e029e98be 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -306,7 +306,6 @@ 2.0.7-spark-3.5 spark3.5 2.15.2 - 1.11.4 3.4.1 diff --git a/java/spark/pom.xml b/java/spark/pom.xml index ebe3b1b7d..63f95d4be 100644 --- a/java/spark/pom.xml +++ b/java/spark/pom.xml @@ -49,6 +49,10 @@ com.fasterxml.jackson.core jackson-databind + + org.apache.avro + avro + diff --git a/python/pyproject.toml b/python/pyproject.toml index 28704d8be..cbe28ba8a 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -45,7 +45,7 @@ dependencies = [ "numpy<2", "pyjks", "mock", - "avro==1.11.3", + "avro==1.12.0", "PyMySQL[rsa]", "tzlocal", "fsspec", @@ -65,7 +65,7 @@ hopsworks-mcp = "hopsworks.mcp.main:main" python = [ "pyarrow>=17.0", "confluent-kafka<=2.11.1", - "fastavro>=1.4.11,<=1.11.1", + "fastavro>=1.4.11,<=1.12.0", "tqdm", "hops-deltalake; python_version >= '3.9' and (sys_platform == 'linux' or (sys_platform == 'darwin' and platform_machine == 'arm64'))", ]