diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java index 8b19103f57..4cf340ae48 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java @@ -20,6 +20,8 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.HopsworksConnectionBase; import com.logicalclocks.hsfs.SecretStore; +import com.logicalclocks.hsfs.beam.engine.BeamEngine; +import com.logicalclocks.hsfs.engine.EngineBase; import com.logicalclocks.hsfs.metadata.Credentials; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; @@ -55,6 +57,7 @@ public HopsworksConnection(String host, int port, String project, Region region, hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue); this.projectObj = getProject(); HopsworksClient.getInstance().setProject(this.projectObj); + EngineBase.setInstance(BeamEngine.getInstance()); Credentials credentials = HopsworksClient.getInstance().getCredentials(); HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); hopsworksHttpClient.setTrustStorePath(credentials.gettStore()); diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java index 13ff573a12..f8d710384e 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java @@ -37,14 +37,14 @@ public class BeamEngine extends EngineBase { private static BeamEngine INSTANCE = null; private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils(); - public static synchronized BeamEngine getInstance() throws FeatureStoreException { + public static synchronized BeamEngine getInstance() { if (INSTANCE == null) { INSTANCE = new BeamEngine(); } return INSTANCE; } - private BeamEngine() throws FeatureStoreException { + private BeamEngine() { } public BeamProducer insertStream(StreamFeatureGroup streamFeatureGroup, Map writeOptions) @@ -87,8 +87,6 @@ public Map getKafkaConfig(FeatureGroupBase featureGroup, Map config = storageConnector.kafkaOptions(); diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java index 68195c77f8..d981ed57f5 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java @@ -20,6 +20,8 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.HopsworksConnectionBase; import com.logicalclocks.hsfs.SecretStore; +import com.logicalclocks.hsfs.engine.EngineBase; +import com.logicalclocks.hsfs.flink.engine.FlinkEngine; import com.logicalclocks.hsfs.metadata.Credentials; import com.logicalclocks.hsfs.metadata.HopsworksClient; @@ -56,6 +58,7 @@ public HopsworksConnection(String host, int port, String project, Region region, hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue); this.projectObj = getProject(); HopsworksClient.getInstance().setProject(this.projectObj); + EngineBase.setInstance(FlinkEngine.getInstance()); if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) { Credentials credentials = HopsworksClient.getInstance().getCredentials(); HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index b8e43908ed..e748f567fc 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -128,8 +128,6 @@ public Map getKafkaConfig(FeatureGroupBase featureGroup, Map config = storageConnector.kafkaOptions(); diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 3f6a64c627..b5c20cb3b5 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -17,14 +17,27 @@ package com.logicalclocks.hsfs; +import java.io.IOException; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Strings; +import com.logicalclocks.hsfs.engine.EngineBase; +import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; import com.logicalclocks.hsfs.metadata.Option; import com.logicalclocks.hsfs.metadata.StorageConnectorApi; -import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.util.Constants; import lombok.AllArgsConstructor; @@ -32,17 +45,8 @@ import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.utils.CollectionUtils; -import java.io.IOException; -import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - @AllArgsConstructor @NoArgsConstructor @ToString @@ -395,13 +399,13 @@ public static class KafkaConnector extends StorageConnector { @Getter @Setter protected SecurityProtocol securityProtocol; - @Getter @Setter + @Getter protected String sslTruststoreLocation; @Getter @Setter protected String sslTruststorePassword; - @Getter @Setter + @Getter protected String sslKeystoreLocation; @Getter @Setter @@ -413,12 +417,36 @@ public static class KafkaConnector extends StorageConnector { @Getter @Setter protected SslEndpointIdentificationAlgorithm sslEndpointIdentificationAlgorithm; - @Getter @Setter + @Getter protected List