Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.beam.engine.BeamEngine;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.Credentials;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
Expand Down Expand Up @@ -55,6 +57,7 @@ public HopsworksConnection(String host, int port, String project, Region region,
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
EngineBase.setInstance(BeamEngine.getInstance());
Credentials credentials = HopsworksClient.getInstance().getCredentials();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
hopsworksHttpClient.setTrustStorePath(credentials.gettStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public class BeamEngine extends EngineBase {
private static BeamEngine INSTANCE = null;
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

public static synchronized BeamEngine getInstance() throws FeatureStoreException {
public static synchronized BeamEngine getInstance() {
if (INSTANCE == null) {
INSTANCE = new BeamEngine();
}
return INSTANCE;
}

private BeamEngine() throws FeatureStoreException {
private BeamEngine() {
}

public BeamProducer insertStream(StreamFeatureGroup streamFeatureGroup, Map<String, String> writeOptions)
Expand Down Expand Up @@ -87,8 +87,6 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.kafkaOptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.flink.engine.FlinkEngine;
import com.logicalclocks.hsfs.metadata.Credentials;
import com.logicalclocks.hsfs.metadata.HopsworksClient;

Expand Down Expand Up @@ -56,6 +58,7 @@ public HopsworksConnection(String host, int port, String project, Region region,
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
EngineBase.setInstance(FlinkEngine.getInstance());
if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) {
Credentials credentials = HopsworksClient.getInstance().getCredentials();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.kafkaOptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,36 @@

package com.logicalclocks.hsfs;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.util.Constants;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.utils.CollectionUtils;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@NoArgsConstructor
@ToString
Expand Down Expand Up @@ -395,13 +399,13 @@ public static class KafkaConnector extends StorageConnector {
@Getter @Setter
protected SecurityProtocol securityProtocol;

@Getter @Setter
@Getter
protected String sslTruststoreLocation;

@Getter @Setter
protected String sslTruststorePassword;

@Getter @Setter
@Getter
protected String sslKeystoreLocation;

@Getter @Setter
Expand All @@ -413,12 +417,36 @@ public static class KafkaConnector extends StorageConnector {
@Getter @Setter
protected SslEndpointIdentificationAlgorithm sslEndpointIdentificationAlgorithm;

@Getter @Setter
@Getter
protected List<Option> options;

@Getter @Setter
protected Boolean externalKafka;

public void setSslTruststoreLocation(String sslTruststoreLocation) throws IOException, FeatureStoreException {
this.sslTruststoreLocation = EngineBase.getInstance().addFile(sslTruststoreLocation);
}

public void setSslKeystoreLocation(String sslKeystoreLocation) throws IOException, FeatureStoreException {
this.sslKeystoreLocation = EngineBase.getInstance().addFile(sslKeystoreLocation);
}

public void setOptions(List<Option> options) throws IOException, FeatureStoreException {
// add keytab file
for (Option option: options) {
if (option.getName().equals("sasl.jaas.config")) {
Pattern pattern = Pattern.compile("keyTab=[\"'](.+?)[\"']");
Matcher matcher = pattern.matcher(option.getValue());
while (matcher.find()) {
String originalKeytabLocation = matcher.group(1);
String newKeytabLocation = EngineBase.getInstance().addFile(originalKeytabLocation);
option.setValue(option.getValue().replace(originalKeytabLocation, newKeytabLocation));
}
}
}
this.options = options;
}

public Map<String, String> kafkaOptions() throws FeatureStoreException {
HopsworksHttpClient client = HopsworksClient.getInstance().getHopsworksHttpClient();
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@

public abstract class EngineBase {

private static EngineBase instance;

public static synchronized void setInstance(EngineBase instance) {
EngineBase.instance = instance;
}

public static synchronized EngineBase getInstance() {
return instance;
}

protected static final Logger LOGGER = LoggerFactory.getLogger(EngineBase.class);

public StorageConnectorApi storageConnectorApi = new StorageConnectorApi();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2025. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.logicalclocks.hsfs;

import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.Option;

import java.io.IOException;
import java.util.Collections;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;


class TestStorageConnector {
@Test
public void testOptions() throws FeatureStoreException, IOException {
// Arrange
EngineBase engineBase = Mockito.mock(EngineBase.class);
Mockito.when(engineBase.addFile(Mockito.anyString())).thenReturn("result_from_add_file");
EngineBase.setInstance(engineBase);

StorageConnector.KafkaConnector kafkaConnector = new StorageConnector.KafkaConnector();

// Act
kafkaConnector.setOptions(Collections.singletonList(new Option("test", "test")));

// Assert
Mockito.verify(engineBase, Mockito.times(0)).addFile(Mockito.anyString());
}

@Test
public void testOptionsSASLAuthentication() throws FeatureStoreException, IOException {
// Arrange
EngineBase engineBase = Mockito.mock(EngineBase.class);
Mockito.when(engineBase.addFile(Mockito.anyString())).thenReturn("result_from_add_file");
EngineBase.setInstance(engineBase);

StorageConnector.KafkaConnector kafkaConnector = new StorageConnector.KafkaConnector();

// Act
kafkaConnector.setOptions(Collections.singletonList(new Option("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/home/laurent/my.keytab\" storeKey=true useTicketCache=false serviceName=\"kafka\" principal=\"laurent@kafka.com\";")));

// Assert
Mockito.verify(engineBase, Mockito.times(1)).addFile(Mockito.anyString());
Assertions.assertEquals("com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"result_from_add_file\" storeKey=true useTicketCache=false serviceName=\"kafka\" principal=\"laurent@kafka.com\";", kafkaConnector.getOptions().get(0).getValue());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;

import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
Expand Down Expand Up @@ -53,6 +54,7 @@ public HopsworksConnection(String host, int port, String project, Region region,
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
EngineBase.setInstance(SparkEngine.getInstance());
if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) {
SparkEngine.getInstance().validateSparkConfiguration();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
import com.logicalclocks.hsfs.metadata.OnlineIngestionApi;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.util.Constants;
import com.logicalclocks.hsfs.spark.ExternalFeatureGroup;
Expand Down Expand Up @@ -125,7 +124,6 @@ public class SparkEngine extends EngineBase {

private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils();
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();
private OnlineIngestionApi onlineIngestionApi = new OnlineIngestionApi();

private static SparkEngine INSTANCE = null;

Expand Down Expand Up @@ -923,8 +921,6 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.sparkOptions();

Expand Down
52 changes: 46 additions & 6 deletions python/hsfs/storage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,19 @@ def __init__(
self._external_kafka = external_kafka
self._pem_files_created = False

# add keytab file
sasl_jaas_config = self._options.get("sasl.jaas.config")
if sasl_jaas_config:
for option in re.findall("keyTab=[\"'](.+?)[\"']", sasl_jaas_config):
original_keytab_location = option
new_keytab_location = engine.get_instance().add_file(
original_keytab_location
)
sasl_jaas_config = sasl_jaas_config.replace(
original_keytab_location, new_keytab_location
)
self._options["sasl.jaas.config"] = sasl_jaas_config

@property
def bootstrap_servers(self) -> Optional[List[str]]:
"""Bootstrap servers string."""
Expand Down Expand Up @@ -1245,11 +1258,17 @@ def confluent_options(self) -> Dict[str, Any]:
config["ssl.key.location"] = client_key_path
elif key == "sasl.jaas.config":
groups = re.search(
"(.+?) .*username=[\"'](.+?)[\"'] .*password=[\"'](.+?)[\"']",
value,
"(.+) (required|requisite|sufficient|optional)(.*)", value
)
mechanism = groups.group(1)
# flag = groups.group(2)
options = groups.group(3)

option_dict = {}
for option in re.findall(r"\s(\w+)=[\"'](.+?)[\"']", options):
option_dict[option[0]] = option[1]

if "sasl.mechanisms" not in config:
mechanism = groups.group(1)
mechanism_value = None
if (
mechanism
Expand All @@ -1266,9 +1285,30 @@ def confluent_options(self) -> Dict[str, Any]:
== "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"
):
mechanism_value = "OAUTHBEARER"
config["sasl.mechanisms"] = mechanism_value
config["sasl.username"] = groups.group(2)
config["sasl.password"] = groups.group(3)
else:
mechanism_value = "GSSAPI"
config["sasl.mechanisms"] = mechanism_value

if mechanism_value == "GSSAPI":
service_name = option_dict.get("serviceName")
if service_name:
config["sasl.kerberos.service.name"] = service_name

principal = option_dict.get("principal")
if principal:
config["sasl.kerberos.principal"] = principal

key_tab = option_dict.get("keyTab")
if key_tab:
config["sasl.kerberos.keytab"] = key_tab
else:
username = option_dict.get("username")
if username:
config["sasl.username"] = username

password = option_dict.get("password")
if password:
config["sasl.password"] = password
elif key == "ssl.endpoint.identification.algorithm":
config[key] = "none" if value == "" else value
elif key == "queued.max.requests":
Expand Down
Loading
Loading