Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
<artifactId>curator-framework</artifactId>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
12 changes: 11 additions & 1 deletion common/src/main/java/org/apache/omid/zk/ZKUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,16 +32,25 @@ public class ZKUtils {

private static final Logger LOG = LoggerFactory.getLogger(ZKUtils.class);

public static CuratorFramework initZKClient(String zkCluster, String namespace, int zkConnectionTimeoutInSec)
public static CuratorFramework initZKClient(String zkCluster, String namespace, int zkConnectionTimeoutInSec, String zkLoginContextName)
throws IOException {

LOG.info("Creating Zookeeper Client connecting to {}", zkCluster);

ZKClientConfig zkConfig = new ZKClientConfig();
if (zkLoginContextName != null) {
// TODO should we check if this exists ?
// Or just error out with an unsuccessful connection, as we do now ?
LOG.info("Using Login Context {} for Zookeeper", zkCluster);
zkConfig.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, zkLoginContextName);
}

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.namespace(namespace)
.connectString(zkCluster)
.retryPolicy(retryPolicy)
.zkClientConfig(zkConfig)
.build();

zkClient.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

import javax.security.auth.login.Configuration;

import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -32,6 +34,7 @@
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.tools.hbase.HBaseLogin;
import org.apache.omid.tools.hbase.OmidKerberosTicketCacheConfiguration;
import org.apache.omid.tso.client.CellId;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.apache.omid.tso.client.TSOClient;
Expand Down Expand Up @@ -73,6 +76,11 @@ public static TransactionManager newInstance(HBaseOmidClientConfiguration config
throws IOException, InterruptedException {
//Logging in to Secure HBase if required
HBaseLogin.loginIfNeeded(configuration);
//Installing Jaas Configuration if magic app name is configured
if (OmidKerberosTicketCacheConfiguration.APP_NAME.equals(configuration.getOmidClientConfiguration().getZkLoginContextName())) {
LOG.info("Installing OmidKerberosTicketCacheConfiguration");
Configuration.setConfiguration(new OmidKerberosTicketCacheConfiguration());
}
return builder(configuration).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class TestHALeaseManagementModule extends AbstractModule {

@Override
protected void configure() {
install(new ZKModule(zkCluster, zkNamespace));
install(new ZKModule(zkCluster, zkNamespace, null));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tools.hbase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;

import org.apache.hadoop.hbase.security.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Programmatic Jaas configuration object, which adds a new app configuration called
* _omid_kerberos_ticket_cache, and falls back to the Configuration in effect when this was called.
* To be used for connecting to Zookeeper, to reuse the kerberos ticket cache set by Hadoop/HBase
* Based on Hadoop's JaasConfiguration
*/
public class OmidKerberosTicketCacheConfiguration extends Configuration {

private static final Logger LOG =
LoggerFactory.getLogger(OmidKerberosTicketCacheConfiguration.class);

private final javax.security.auth.login.Configuration baseConfig =
javax.security.auth.login.Configuration.getConfiguration();

private final AppConfigurationEntry[] entry;

public static String APP_NAME = "_omid_kerberos_ticket_cache";

public OmidKerberosTicketCacheConfiguration() {
super();
Map<String, String> options = new HashMap<>();
options.put("useTicketCache", "true");
options.putAll(getPrincipalIfNeeded());
entry =
new AppConfigurationEntry[] { new AppConfigurationEntry(getKrb5LoginModuleName(),
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options) };
}

@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return (APP_NAME.equals(name)) ? entry
: ((baseConfig != null) ? baseConfig.getAppConfigurationEntry(name) : null);
}

private String getKrb5LoginModuleName() {
String krb5LoginModuleName;
if (System.getProperty("java.vendor").contains("IBM")) {
krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
} else {
krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
}
return krb5LoginModuleName;
}

private Map<String, String> getPrincipalIfNeeded() {
HashMap<String, String> principalOptions = new HashMap<>();
if (System.getProperty("java.vendor").contains("IBM")) {
// The IBM Kerberos implementation does not pick up the cache entry without the
// principal
String principalName;
try {
principalName = User.getCurrent().getName();
if (principalName == null || principalName.isEmpty()) {
LOG.warn("Got empty principal name on IMB JVM");
} else {
principalOptions.put("principal", principalName);
}
} catch (IOException e) {
LOG.warn("Could not determine principal on IBM JVM.", e);
}

}
return principalOptions;
}

}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
<guava.version>32.1.3-jre</guava.version>
<!-- 2.12+ shades guava -->
<curator.version>5.6.0</curator.version>
<zookeeper.version>3.8.4</zookeeper.version>
<snakeyaml.version>2.2</snakeyaml.version>
<beanutils.version>1.9.4</beanutils.version>
<commons-io.version>2.18.0</commons-io.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ public class DefaultZKTimestampStorageModule extends AbstractModule {

private String zkCluster = "localhost:2181";
private String namespace = "omid";
private String zkLoginContextName;

@Override
public void configure() {
install(new ZKModule(zkCluster, namespace));
install(new ZKModule(zkCluster, namespace, zkLoginContextName));
install(new ZKTimestampStorageModule());
}

Expand All @@ -54,4 +55,12 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}

public String getZkLoginContextName() {
return zkLoginContextName;
}

public void setZkLoginContextName(String zkLoginContextName) {
this.zkLoginContextName = zkLoginContextName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
import javax.inject.Singleton;
import java.io.IOException;

//TODO:IK: move to common?
// TODO:IK: move to common?
// The problem is that common doesn't depend on Guice, and we want to keep that that way.
// Ideally, this would be in a new module, something like omid-server-common
public class ZKModule extends AbstractModule {

private final String zkCluster;
private final String namespace;
private final String zkLoginContextName;

public ZKModule(String zkCluster, String namespace) {
public ZKModule(String zkCluster, String namespace, String zkLoginContextName) {
this.zkCluster = zkCluster;
this.namespace = namespace;
this.zkLoginContextName = zkLoginContextName;
}

@Override
Expand All @@ -43,7 +47,7 @@ public void configure() {
@Provides
@Singleton
CuratorFramework provideInitializedZookeeperClient() throws IOException {
return ZKUtils.initZKClient(zkCluster, namespace, 10);
return ZKUtils.initZKClient(zkCluster, namespace, 10, zkLoginContextName);
}

// ----------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public enum ConflictDetectionLevel {CELL, ROW}
private String zkCurrentTsoPath;
private String zkNamespace;
private int zkConnectionTimeoutInSecs;
private String zkLoginContextName;

// Communication protocol related params

Expand Down Expand Up @@ -202,6 +203,16 @@ public void setZkNamespace(String zkNamespace) {
this.zkNamespace = zkNamespace;
}

public String getZkLoginContextName() {
return zkLoginContextName;
}

@Inject(optional = true)
@Named("omid.ha.zkLoginContextName")
public void setZkLoginContextName(String zkLoginContextName) {
this.zkLoginContextName = zkLoginContextName;
}

public PostCommitMode getPostCommitMode() {
return postCommitMode;
}
Expand Down Expand Up @@ -353,6 +364,4 @@ public void setTsConfigProtocols(String tlsConfigProtocols) {
this.tlsConfigProtocols = tlsConfigProtocols;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private TSOClient(OmidClientConfiguration omidConf) throws IOException {
case HA:
zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
omidConf.getZkNamespace(),
omidConf.getZkConnectionTimeoutInSecs());
omidConf.getZkConnectionTimeoutInSecs(),
omidConf.getZkLoginContextName());
zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class HALeaseManagementModule extends AbstractModule {
private String currentTsoPath = "/current-tso";
private String zkCluster = "localhost:2181";
private String zkNamespace = "omid";
private String zkLoginContextName;

// ----------------------------------------------------------------------------------------------------------------
// WARNING: Do not remove empty constructor, needed by snake_yaml!
Expand All @@ -51,20 +52,20 @@ public HALeaseManagementModule() {

@VisibleForTesting
public HALeaseManagementModule(long leasePeriodInMs, String tsoLeasePath, String currentTsoPath,
String zkCluster, String zkNamespace) {
String zkCluster, String zkNamespace, String zkLoginContextName) {

this.leasePeriodInMs = leasePeriodInMs;
this.tsoLeasePath = tsoLeasePath;
this.currentTsoPath = currentTsoPath;
this.zkCluster = zkCluster;
this.zkNamespace = zkNamespace;
this.zkLoginContextName = zkLoginContextName;

}

@Override
protected void configure() {

install(new ZKModule(zkCluster, zkNamespace));
install(new ZKModule(zkCluster, zkNamespace, zkLoginContextName));

}

Expand Down Expand Up @@ -133,4 +134,12 @@ public void setZkNamespace(String zkNamespace) {
this.zkNamespace = zkNamespace;
}

public String getZkLoginContextName() {
return zkLoginContextName;
}

public void setZkLoginContextName(String zkLoginContextName) {
this.zkLoginContextName = zkLoginContextName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
TSOServerConfig config = new TSOServerConfig();
config.setConflictMapSize(1000);
config.setPort(tsoPortForTest);
config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid", null));
injector = Guice.createInjector(new TSOMockModule(config));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
Expand Down Expand Up @@ -629,7 +629,7 @@ public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing()
TSOServerConfig config = new TSOServerConfig();
config.setConflictMapSize(1000);
config.setPort(tsoPortForTest);
config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid", null));
injector = Guice.createInjector(new TSOMockModule(config));
LOG.info("Starting Initial TSO");
tsoServer = injector.getInstance(TSOServer.class);
Expand Down