Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class ServiceURI {
public static final ServiceURI DEFAULT_LOCAL_STREAM_STORAGE_SERVICE_URI =
ServiceURI.create("bk://localhost:4181");

/**
* Service string for bookies.
*/
public static final String SERVICE_BOOKIE = "bookie";
public static final int SERVICE_BOOKIE_PORT = 3181;

private static final String SERVICE_SEP = "+";
private static final String SERVICE_DLOG_SEP = "-";

Expand Down Expand Up @@ -227,6 +233,8 @@ private static String validateHostName(String serviceName, String hostname) {
return hostname;
} else if (parts.length == 1 && serviceName.toLowerCase().equals(SERVICE_BK)) {
return hostname + ":" + SERVICE_BK_PORT;
} else if (parts.length == 1 && serviceName.toLowerCase().equals(SERVICE_BOOKIE)) {
return hostname + ":" + SERVICE_BOOKIE_PORT;
} else {
return hostname;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo

// initialize bookie client
this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool,
scheduler, rootStatsLogger, this.bookieWatcher.getBookieAddressResolver());
scheduler, rootStatsLogger, bookieAddressResolver);

if (conf.getDiskWeightBasedPlacementEnabled()) {
LOG.info("Weighted ledger placement enabled");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client.api;

/**
* The <tt>ConnectionMode</tt> defines how a bookkeeper client connects to a bookie.
*/
public enum ConnectionMode {

/**
* Connect to bookie directly.
*/
DIRECT,

/**
* Connect to a proxy layer with SNI routing.
*/
SNI_ROUTING

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.client.api.ConnectionMode;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
Expand Down Expand Up @@ -76,6 +77,10 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
protected static final String ZK_TIMEOUT = "zkTimeout";
protected static final String ZK_SERVERS = "zkServers";

// BK Service URI.
protected static final String BK_SERVICE_URI = "bookieServiceUri";
protected static final String BK_CONNECTION_MODE = "bookieConnectionMode";

// Ledger Manager
protected static final String LEDGER_MANAGER_TYPE = "ledgerManagerType";
protected static final String LEDGER_MANAGER_FACTORY_CLASS = "ledgerManagerFactoryClass";
Expand Down Expand Up @@ -238,6 +243,52 @@ public void loadConf(CompositeConfiguration baseConf) {
}
}

/**
* Get service uri to connect to bookies.
*
* <p>The service uri should be pointed to the proxy service if {@link ConnectionMode#SNI_ROUTING} is configured.
*
* @return metadata service uri.
*/
public String getBookieServiceUri() {
return getString(BK_SERVICE_URI);
}

/**
* Set the service uri to connect to bookies.
*
* @param serviceUri the bookie service uri.
* @return the configuration object.
* @see #getBookieServiceUri()
*/
public T setBookieServiceUri(String serviceUri) {
setProperty(BK_SERVICE_URI, serviceUri);
return getThis();
}

/**
* Get the {@link ConnectionMode} that the bookkeeper client uses to connect to bookies.
*
* @return the {@link ConnectionMode} that the bookkeeper client used to connect to bookies.
*/
public ConnectionMode getBookieConnectionMode() {
String modeStr = getString(BK_CONNECTION_MODE, ConnectionMode.DIRECT.name());
return ConnectionMode.valueOf(modeStr);
}

/**
* Set the {@link ConnectionMode} that the bookkeeper client uses to connect to bookies.
*
* @param connectionMode connection mode that the bookkeeper client uses to connect to bookies.
* @return the configuration object
* @see #getBookieConnectionMode()
* @see #getBookieServiceUri()
*/
public T setBookieConnectionMode(ConnectionMode connectionMode) {
setProperty(BK_CONNECTION_MODE, connectionMode.name());
return getThis();
}

/**
* Get metadata service uri.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.ConnectionMode;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.common.util.MdcUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
Expand Down Expand Up @@ -346,6 +348,9 @@ enum ConnectionState {
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
private final ConnectionMode connectionMode;
private final BookieSocketAddress proxyAddress;
private volatile BookieSocketAddress targetBookieAddress;

public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException {
Expand Down Expand Up @@ -377,7 +382,8 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor
EventLoopGroup eventLoopGroup,
ByteBufAllocator allocator,
BookieId bookieId,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
StatsLogger parentStatsLogger,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory,
Expand All @@ -387,6 +393,19 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor
this.bookieId = bookieId;
this.bookieAddressResolver = bookieAddressResolver;
this.executor = executor;
this.connectionMode = conf.getBookieConnectionMode();
if (ConnectionMode.SNI_ROUTING == connectionMode) {
ServiceURI serviceURI = ServiceURI.create(conf.getBookieServiceUri());
String hostName = serviceURI.getServiceHosts()[0];
try {
this.proxyAddress = new BookieSocketAddress(hostName);
} catch (UnknownHostException e) {
throw new RuntimeException("Invalid bookie service uri : " + serviceURI, e);
}
} else {
this.proxyAddress = null;
}

if (LocalBookiesRegistry.isLocalBookie(bookieId)) {
this.eventLoopGroup = new DefaultEventLoopGroup();
} else {
Expand Down Expand Up @@ -538,9 +557,8 @@ protected ChannelFuture connect() {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to bookie: {}", bookieId);
}
BookieSocketAddress addr;
try {
addr = bookieAddressResolver.resolve(bookieId);
targetBookieAddress = bookieAddressResolver.resolve(bookieId);
} catch (BookieAddressResolver.BookieIdNotResolvedException err) {
LOG.error("Cannot connect to {} as endpopint resolution failed", bookieId, err);
return processBookieNotResolvedError(startTime, err);
Expand Down Expand Up @@ -599,9 +617,15 @@ protected void initChannel(Channel ch) throws Exception {
}
});

SocketAddress bookieAddr = addr.getSocketAddress();
SocketAddress bookieAddr;
if (eventLoopGroup instanceof DefaultEventLoopGroup) {
bookieAddr = new LocalAddress(bookieId.toString());
} else {
if (ConnectionMode.SNI_ROUTING == connectionMode) {
bookieAddr = proxyAddress.getSocketAddress();
} else {
bookieAddr = targetBookieAddress.getSocketAddress();
}
}

ChannelFuture future = bootstrap.connect(bookieAddr);
Expand Down Expand Up @@ -1488,7 +1512,20 @@ public String toString() {
void initTLSHandshake() {
// create TLS handler
PerChannelBookieClient parentObj = PerChannelBookieClient.this;
SslHandler handler = parentObj.shFactory.newTLSHandler();
SslHandler handler;
BookieSocketAddress bookieSocketAddress = targetBookieAddress;
if (ConnectionMode.SNI_ROUTING == connectionMode) {
if (bookieSocketAddress != null) {
handler = parentObj.shFactory.newTLSHandler(
bookieSocketAddress.getHostName(), bookieSocketAddress.getPort());
} else {
LOG.warn("No target bookie address for BookieId [{}] is resolved"
+ " when connecting to proxy [{}] using SNI routing", bookieId, proxyAddress);
handler = parentObj.shFactory.newTLSHandler();
}
} else {
handler = parentObj.shFactory.newTLSHandler();
}
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ enum NodeType {
void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator) throws SecurityException;

SslHandler newTLSHandler();

SslHandler newTLSHandler(String sniHostName, int sniHostPort);
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,16 @@ public synchronized void init(NodeType type, AbstractConfiguration conf, ByteBuf
@Override
public SslHandler newTLSHandler() {
SslHandler sslHandler = getSSLContext().newHandler(allocator);
return newTLSHandler(sslHandler);
}

@Override
public SslHandler newTLSHandler(String sniHostName, int sniHostPort) {
SslHandler sslHandler = getSSLContext().newHandler(allocator, sniHostName, sniHostPort);
return newTLSHandler(sslHandler);
}

private SslHandler newTLSHandler(SslHandler sslHandler) {
if (protocols != null && protocols.length != 0) {
sslHandler.engine().setEnabledProtocols(protocols);
}
Expand Down