Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
9 changes: 3 additions & 6 deletions java/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>*</artifactId>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -85,6 +81,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import com.logicalclocks.hsfs.metadata.HopsworksExternalClient;
import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.twitter.chill.Base64;
import lombok.Getter;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
Expand All @@ -47,15 +45,18 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

public class FlinkEngine extends EngineBase {
private static FlinkEngine INSTANCE = null;
Expand All @@ -79,23 +80,32 @@ private FlinkEngine() {
public DataStreamSink<?> writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream<?> dataStream,
Map<String, String> writeOptions) throws FeatureStoreException, IOException {

DataStream<Object> genericDataStream = (DataStream<Object>) dataStream;
Properties properties = new Properties();
properties.putAll(getKafkaConfig(streamFeatureGroup, writeOptions));

KafkaRecordSerializer serializer = new KafkaRecordSerializer(streamFeatureGroup);

// Generate transaction id from the kafka headers (unique for all ingestions)
String transactionalId = serializer.headerMap.entrySet().stream()
.map(e -> new String(e.getValue(), StandardCharsets.UTF_8))
.collect(Collectors.joining("_"));

// MUST setTransactionalIdPrefix when DeliveryGuarantee is not AT_LEAST_ONCE and it must be unique per sink
KafkaSink<GenericRecord> sink = KafkaSink.<GenericRecord>builder()
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(properties)
.setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(serializer)
.setTransactionalIdPrefix(transactionalId)
.build();
Map<String, String> complexFeatureSchemas = new HashMap<>();
for (String featureName : streamFeatureGroup.getComplexFeatures()) {
complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName));
}

DataStream<Object> genericDataStream = (DataStream<Object>) dataStream;
DataStream<GenericRecord> avroRecordDataStream =
genericDataStream.map(new PojoToAvroRecord(
genericDataStream
.map(new PojoToAvroRecord<Object>(
streamFeatureGroup.getAvroSchema(),
streamFeatureGroup.getEncodedAvroSchema(),
complexFeatureSchemas))
Expand Down Expand Up @@ -191,7 +201,7 @@ private String getKey(KeyStore keyStore, String password)
throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException {
String keyAlias = keyStore.aliases().nextElement();
return "-----BEGIN PRIVATE KEY-----\n"
+ Base64.encodeBytes(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded())
+ Base64.getEncoder().encodeToString(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded())
+ "\n-----END PRIVATE KEY-----";
}

Expand All @@ -202,7 +212,7 @@ private String getCertificateChain(KeyStore keyStore) throws KeyStoreException,
StringBuilder certificateChainBuilder = new StringBuilder();
for (Certificate certificate : certificateChain) {
certificateChainBuilder.append("-----BEGIN CERTIFICATE-----\n")
.append(Base64.encodeBytes(certificate.getEncoded()))
.append(Base64.getEncoder().encodeToString(certificate.getEncoded()))
.append("\n-----END CERTIFICATE-----\n");
}

Expand All @@ -212,7 +222,7 @@ private String getCertificateChain(KeyStore keyStore) throws KeyStoreException,
private String getRootCA(KeyStore trustStore) throws KeyStoreException, CertificateEncodingException {
String rootCaAlias = trustStore.aliases().nextElement();
return "-----BEGIN CERTIFICATE-----\n"
+ Base64.encodeBytes(trustStore.getCertificate(rootCaAlias).getEncoded())
+ Base64.getEncoder().encodeToString(trustStore.getCertificate(rootCaAlias).getEncoded())
+ "\n-----END CERTIFICATE-----";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class KafkaRecordSerializer implements KafkaRecordSerializationSchema<Gen

private final String topic;
private final List<String> primaryKeys;
private final Map<String, byte[]> headerMap;
final Map<String, byte[]> headerMap;

public KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException {
this.topic = streamFeatureGroup.getOnlineTopicName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;

import java.util.Map;
Expand All @@ -36,7 +35,7 @@ public class PojoToAvroRecord<T> extends RichMapFunction<T, GenericRecord> imple
private final String encodedFeatureGroupSchemaStr;
private final Map<String, String> complexFeatureSchemasStr;

// org.apache.avro.Schema$Field is not serializable. Create in open() and reused later on
// org.apache.avro.Schema$Field is not serializable. Create once and reused later on
private transient Schema featureGroupSchema;
private transient Schema encodedFeatureGroupSchema;
private transient Map<String, Schema> complexFeatureSchemas;
Expand All @@ -51,14 +50,7 @@ public PojoToAvroRecord(String featureGroupSchema,
this.complexFeatureSchemasStr = complexFeatureSchemas;
}

@Override
public GenericRecord map(T input) throws Exception {
return PojoToAvroUtils.convertPojoToGenericRecord(
input, featureGroupSchema, encodedFeatureGroupSchema, complexFeatureSchemas);
}

@Override
public void open(Configuration configuration) throws Exception {
public void init() throws Exception {
this.featureGroupSchema = new Schema.Parser().parse(this.featureGroupSchemaStr);
this.encodedFeatureGroupSchema = new Schema.Parser().parse(this.encodedFeatureGroupSchemaStr);
this.complexFeatureSchemas = this.complexFeatureSchemasStr
Expand All @@ -69,9 +61,18 @@ public void open(Configuration configuration) throws Exception {
this.producedType = new GenericRecordAvroTypeInfo(encodedFeatureGroupSchema);
}

@Override
public GenericRecord map(T input) throws Exception {
// handle serialization across task managers
if (featureGroupSchema == null || encodedFeatureGroupSchema == null || complexFeatureSchemas == null) {
init();
}
return PojoToAvroUtils.convertPojoToGenericRecord(
input, featureGroupSchema, encodedFeatureGroupSchema, complexFeatureSchemas);
}

@Override
public TypeInformation<GenericRecord> getProducedType() {
return producedType;
}

}
1 change: 0 additions & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@
<deequ.version>2.0.7-spark-3.5</deequ.version>
<artifact.spark.version>spark3.5</artifact.spark.version>
<fasterxml.jackson.databind.version>2.15.2</fasterxml.jackson.databind.version>
<avro.version>1.11.4</avro.version>
<kafka.version>3.4.1</kafka.version>
</properties>
</profile>
Expand Down
4 changes: 4 additions & 0 deletions java/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dependencies = [
"numpy<2",
"pyjks",
"mock",
"avro==1.11.3",
"avro==1.12.0",
"PyMySQL[rsa]",
"tzlocal",
"fsspec",
Expand All @@ -65,7 +65,7 @@ hopsworks-mcp = "hopsworks.mcp.main:main"
python = [
"pyarrow>=17.0",
"confluent-kafka<=2.11.1",
"fastavro>=1.4.11,<=1.11.1",
"fastavro>=1.4.11,<=1.12.0",
"tqdm",
"hops-deltalake; python_version >= '3.9' and (sys_platform == 'linux' or (sys_platform == 'darwin' and platform_machine == 'arm64'))",
]
Expand Down
Loading