Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
protected static final String ALLOW_SHADED_LEDGER_MANAGER_FACTORY_CLASS = "allowShadedLedgerManagerFactoryClass";
protected static final String SHADED_LEDGER_MANAGER_FACTORY_CLASS_PREFIX = "shadedLedgerManagerFactoryClassPrefix";
protected static final String METADATA_SERVICE_URI = "metadataServiceUri";
protected static final String METADATA_SERVICE_CONFIG = "metadataServiceConfig";
protected static final String ZK_LEDGERS_ROOT_PATH = "zkLedgersRootPath";
protected static final String ZK_REQUEST_RATE_LIMIT = "zkRequestRateLimit";
protected static final String AVAILABLE_NODE = "available";
Expand Down Expand Up @@ -295,6 +296,15 @@ public T setMetadataServiceUri(String serviceUri) {
return getThis();
}

public String getMetadataServiceConfig() {
return getString(METADATA_SERVICE_CONFIG);
}

public T setMetadataServiceConfig(String config) {
setProperty(METADATA_SERVICE_CONFIG, config);
return getThis();
}

/**
* Get zookeeper servers to connect.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.metadata.etcd;

import io.netty.handler.ssl.SslProvider;
import org.apache.commons.configuration.CompositeConfiguration;

class EtcdConfig extends CompositeConfiguration {
private static final String USE_TLS = "useTls";
private static final String TLS_PROVIDER = "tlsProvider";
private static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
private static final String TLS_KEY_FILE_PATH = "tlsKeyFilePath";
private static final String TLS_CERTIFICATE_FILE_PATH = "tlsCertificateFilePath";
private static final String AUTHORITY = "authority";

public boolean isUseTls() {
return getBoolean(USE_TLS, false);
}

public SslProvider getTlsProvider() {
return SslProvider.valueOf(getString(TLS_PROVIDER));
}

public String getTlsTrustCertsFilePath() {
return getString(TLS_TRUST_CERTS_FILE_PATH);
}

public String getTlsKeyFilePath() {
return getString(TLS_KEY_FILE_PATH);
}

public String getTlsCertificateFilePath() {
return getString(TLS_CERTIFICATE_FILE_PATH);
}

public String getAuthority() {
return getString(AUTHORITY);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import com.google.common.collect.Lists;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.grpc.netty.GrpcSslContexts;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.conf.AbstractConfiguration;
Expand All @@ -31,6 +35,8 @@
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;

/**
* This is a mixin class for supporting etcd based metadata drivers.
Expand Down Expand Up @@ -77,18 +83,47 @@ protected void initialize(AbstractConfiguration<?> conf, StatsLogger statsLogger
ServiceURI serviceURI = ServiceURI.create(metadataServiceUriStr);
this.keyPrefix = serviceURI.getServicePath();

EtcdConfig config = new EtcdConfig();
if (StringUtils.isNotEmpty(conf.getMetadataServiceConfig())) {
try {
PropertiesConfiguration propsConf = new PropertiesConfiguration(conf.getMetadataServiceConfig());
config.addConfiguration(propsConf);
} catch (ConfigurationException e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
}

List<String> etcdEndpoints = Lists.newArrayList(serviceURI.getServiceHosts())
.stream()
.map(host -> String.format("http://%s", host))
.map(host -> config.isUseTls() ? String.format("https://%s", host) : String.format("http://%s", host))
.collect(Collectors.toList());

log.info("Initializing etcd metadata driver : etcd endpoints = {}, key scope = {}",
etcdEndpoints, keyPrefix);

synchronized (this) {
this.client = Client.builder()
.endpoints(etcdEndpoints.toArray(new String[etcdEndpoints.size()]))
.build();
ClientBuilder builder = Client.builder()
.endpoints(etcdEndpoints.toArray(new String[etcdEndpoints.size()]));
if (config.isUseTls()) {
File trustCertsFile = new File(config.getTlsTrustCertsFilePath());
File keyFile = new File(config.getTlsKeyFilePath());
File certFile = new File(config.getTlsCertificateFilePath());
try {
builder.sslContext(GrpcSslContexts.forClient()
.trustManager(trustCertsFile)
.sslProvider(config.getTlsProvider())
.keyManager(certFile, keyFile)
.build());
} catch (SSLException e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
}

if (StringUtils.isNotEmpty(config.getAuthority())) {
builder.authority(config.getAuthority());
}

this.client = builder.build();
}

this.layoutManager = new EtcdLayoutManager(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.metadata.etcd.integration;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.metadata.etcd.testing.EtcdContainer;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;

@Slf4j
public class TlsSmokeTest extends SmokeTest {

@BeforeClass
public static void setupCluster() throws Exception {
etcdContainer = new EtcdContainer(RandomStringUtils.randomAlphabetic(8), true);
etcdContainer.start();
log.info("Successfully started etcd1 at {}", etcdContainer.getClientEndpoint());
setupCluster(NUM_BOOKIES);
}

@AfterClass
public static void teardownCluster() throws Exception {
if (null != etcdContainer) {
etcdContainer.stop();
etcdContainer = null;
log.info("Successfully stopped etcd.");
}
}

@Override
public void setUp() throws Exception {
conf = new ClientConfiguration()
.setMetadataServiceUri(etcdContainer.getExternalServiceUri())
.setMetadataServiceConfig(getMetadataServiceConfig());
bk = BookKeeper.newBuilder(conf).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ protected static void setupCluster(int numBookies) throws Exception {

ServiceURI uri = ServiceURI.create(etcdContainer.getExternalServiceUri());

String metadataServiceConfig = getMetadataServiceConfig();
baseClientConf = new ClientConfiguration()
.setMetadataServiceUri(uri.getUri().toString());
.setMetadataServiceUri(uri.getUri().toString())
.setMetadataServiceConfig(metadataServiceConfig);
baseServerConf = TestBKConfiguration.newServerConfiguration()
.setMetadataServiceUri(uri.getUri().toString());
.setMetadataServiceUri(uri.getUri().toString())
.setMetadataServiceConfig(metadataServiceConfig);
// format the cluster
assertTrue(BookKeeperAdmin.format(baseServerConf, false, true));
// start bookies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.LogContainerCmd;
import com.github.dockerjava.api.model.Frame;
import io.grpc.netty.GrpcSslContexts;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.LogUtils;
import org.testcontainers.utility.MountableFile;

/**
* Etcd test container.
Expand All @@ -52,10 +58,13 @@ public void onNext(Frame frame) {
public static final int CLIENT_PORT = 2379;

private final String clusterName;
@Getter
private final boolean secure;

public EtcdContainer(String clusterName) {
public EtcdContainer(String clusterName, boolean secure) {
super("quay.io/coreos/etcd:v3.3");
this.clusterName = clusterName;
this.secure = secure;
}

public String getExternalServiceUri() {
Expand All @@ -70,25 +79,46 @@ public String getInternalServiceUri() {
protected void configure() {
super.configure();

String[] command = new String[] {
"/usr/local/bin/etcd",
"--name", NAME + "0",
"--initial-advertise-peer-urls", "http://" + NAME + ":2380",
"--listen-peer-urls", "http://0.0.0.0:2380",
"--advertise-client-urls", "http://" + NAME + ":2379",
"--listen-client-urls", "http://0.0.0.0:2379",
"--initial-cluster", NAME + "0=http://" + NAME + ":2380"
};
if (secure) {
withCommand(
"/usr/local/bin/etcd",
"--name", NAME + "0",
"--initial-advertise-peer-urls", "http://" + NAME + ":2380",
"--listen-peer-urls", "http://0.0.0.0:2380",
"--advertise-client-urls", "https://" + NAME + ":2379",
"--listen-client-urls", "https://0.0.0.0:2379",
"--initial-cluster", NAME + "0=http://" + NAME + ":2380",
"--client-cert-auth",
"--trusted-ca-file", "/ca.pem",
"--cert-file", "/server.pem",
"--key-file", "/server-key.pem"
);
} else {
withCommand(
"/usr/local/bin/etcd",
"--name", NAME + "0",
"--initial-advertise-peer-urls", "http://" + NAME + ":2380",
"--listen-peer-urls", "http://0.0.0.0:2380",
"--advertise-client-urls", "http://" + NAME + ":2379",
"--listen-client-urls", "http://0.0.0.0:2379",
"--initial-cluster", NAME + "0=http://" + NAME + ":2380"
);
}

this.withNetworkAliases(NAME)
.withExposedPorts(CLIENT_PORT)
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(clusterName + "-" + NAME);
})
.withCommand(command)
.withNetworkAliases(NAME)
.waitingFor(waitStrategy());
if (secure) {
this.withCopyFileToContainer(MountableFile.forClasspathResource("ssl/cert/ca.pem"), "/ca.pem")
.withCopyFileToContainer(MountableFile.forClasspathResource("ssl/cert/server.pem"), "/server.pem")
.withCopyFileToContainer(MountableFile.forClasspathResource("ssl/cert/server-key.pem"),
"/server-key.pem");
}
tailContainerLog();
}

Expand All @@ -113,7 +143,11 @@ public int getEtcdClientPort() {
}

public String getClientEndpoint() {
return String.format("http://%s:%d", getHost(), getEtcdClientPort());
if (secure) {
return String.format("https://%s:%d", getHost(), getEtcdClientPort());
} else {
return String.format("http://%s:%d", getHost(), getEtcdClientPort());
}
}

private WaitStrategy waitStrategy() {
Expand All @@ -139,5 +173,23 @@ protected void waitUntilReady() {
};
}

public SslContext getSslContext() throws SSLException {
if (!secure) {
return null;
}
return GrpcSslContexts.forClient()
.sslProvider(SslProvider.OPENSSL)
.trustManager(EtcdContainer.class.getClassLoader().getResourceAsStream("ssl/cert/ca.pem"))
.keyManager(
EtcdContainer.class.getClassLoader().getResourceAsStream("ssl/cert/client.pem"),
EtcdContainer.class.getClassLoader().getResourceAsStream("ssl/cert/client-key-pk8.pem")
).build();
}

public String getAuthority() {
if (!secure) {
return null;
}
return "etcd-ssl";
}
}
Loading