diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java index 3d419f3ccfc..d237d49c9c7 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java @@ -131,6 +131,12 @@ public class ServiceURI { public static final ServiceURI DEFAULT_LOCAL_STREAM_STORAGE_SERVICE_URI = ServiceURI.create("bk://localhost:4181"); + /** + * Service string for bookies. + */ + public static final String SERVICE_BOOKIE = "bookie"; + public static final int SERVICE_BOOKIE_PORT = 3181; + private static final String SERVICE_SEP = "+"; private static final String SERVICE_DLOG_SEP = "-"; @@ -227,6 +233,8 @@ private static String validateHostName(String serviceName, String hostname) { return hostname; } else if (parts.length == 1 && serviceName.toLowerCase().equals(SERVICE_BK)) { return hostname + ":" + SERVICE_BK_PORT; + } else if (parts.length == 1 && serviceName.toLowerCase().equals(SERVICE_BOOKIE)) { + return hostname + ":" + SERVICE_BOOKIE_PORT; } else { return hostname; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index b7656d3586c..5ebdda7b81d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -503,7 +503,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo // initialize bookie client this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool, - scheduler, rootStatsLogger, this.bookieWatcher.getBookieAddressResolver()); + scheduler, rootStatsLogger, bookieAddressResolver); if (conf.getDiskWeightBasedPlacementEnabled()) { LOG.info("Weighted ledger placement enabled"); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ConnectionMode.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ConnectionMode.java new file mode 100644 index 00000000000..709c9dd62fe --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ConnectionMode.java @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client.api; + +/** + * The ConnectionMode defines how a bookkeeper client connects to a bookie. + */ +public enum ConnectionMode { + + /** + * Connect to bookie directly. + */ + DIRECT, + + /** + * Connect to a proxy layer with SNI routing. + */ + SNI_ROUTING + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index d3ce780578c..a3bee9592d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.ConnectionMode; import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; import org.apache.bookkeeper.common.allocator.PoolingPolicy; @@ -76,6 +77,10 @@ public abstract class AbstractConfiguration protected static final String ZK_TIMEOUT = "zkTimeout"; protected static final String ZK_SERVERS = "zkServers"; + // BK Service URI. + protected static final String BK_SERVICE_URI = "bookieServiceUri"; + protected static final String BK_CONNECTION_MODE = "bookieConnectionMode"; + // Ledger Manager protected static final String LEDGER_MANAGER_TYPE = "ledgerManagerType"; protected static final String LEDGER_MANAGER_FACTORY_CLASS = "ledgerManagerFactoryClass"; @@ -238,6 +243,52 @@ public void loadConf(CompositeConfiguration baseConf) { } } + /** + * Get service uri to connect to bookies. + * + *

The service uri should be pointed to the proxy service if {@link ConnectionMode#SNI_ROUTING} is configured. + * + * @return metadata service uri. + */ + public String getBookieServiceUri() { + return getString(BK_SERVICE_URI); + } + + /** + * Set the service uri to connect to bookies. + * + * @param serviceUri the bookie service uri. + * @return the configuration object. + * @see #getBookieServiceUri() + */ + public T setBookieServiceUri(String serviceUri) { + setProperty(BK_SERVICE_URI, serviceUri); + return getThis(); + } + + /** + * Get the {@link ConnectionMode} that the bookkeeper client uses to connect to bookies. + * + * @return the {@link ConnectionMode} that the bookkeeper client used to connect to bookies. + */ + public ConnectionMode getBookieConnectionMode() { + String modeStr = getString(BK_CONNECTION_MODE, ConnectionMode.DIRECT.name()); + return ConnectionMode.valueOf(modeStr); + } + + /** + * Set the {@link ConnectionMode} that the bookkeeper client uses to connect to bookies. + * + * @param connectionMode connection mode that the bookkeeper client uses to connect to bookies. + * @return the configuration object + * @see #getBookieConnectionMode() + * @see #getBookieServiceUri() + */ + public T setBookieConnectionMode(ConnectionMode connectionMode) { + setProperty(BK_CONNECTION_MODE, connectionMode.name()); + return getThis(); + } + /** * Get metadata service uri. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 88239dcdfd8..f8a771cb6f1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -92,7 +92,9 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.client.api.ConnectionMode; import org.apache.bookkeeper.client.api.WriteFlag; +import org.apache.bookkeeper.common.net.ServiceURI; import org.apache.bookkeeper.common.util.MdcUtils; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -346,6 +348,9 @@ enum ConnectionState { private final SecurityHandlerFactory shFactory; private volatile boolean isWritable = true; private long lastBookieUnavailableLogTimestamp = 0; + private final ConnectionMode connectionMode; + private final BookieSocketAddress proxyAddress; + private volatile BookieSocketAddress targetBookieAddress; public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException { @@ -377,7 +382,8 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor EventLoopGroup eventLoopGroup, ByteBufAllocator allocator, BookieId bookieId, - StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, + StatsLogger parentStatsLogger, + ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory shFactory, @@ -387,6 +393,19 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor this.bookieId = bookieId; this.bookieAddressResolver = bookieAddressResolver; this.executor = executor; + this.connectionMode = conf.getBookieConnectionMode(); + if (ConnectionMode.SNI_ROUTING == connectionMode) { + ServiceURI serviceURI = ServiceURI.create(conf.getBookieServiceUri()); + String hostName = serviceURI.getServiceHosts()[0]; + try { + this.proxyAddress = new BookieSocketAddress(hostName); + } catch (UnknownHostException e) { + throw new RuntimeException("Invalid bookie service uri : " + serviceURI, e); + } + } else { + this.proxyAddress = null; + } + if (LocalBookiesRegistry.isLocalBookie(bookieId)) { this.eventLoopGroup = new DefaultEventLoopGroup(); } else { @@ -538,9 +557,8 @@ protected ChannelFuture connect() { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to bookie: {}", bookieId); } - BookieSocketAddress addr; try { - addr = bookieAddressResolver.resolve(bookieId); + targetBookieAddress = bookieAddressResolver.resolve(bookieId); } catch (BookieAddressResolver.BookieIdNotResolvedException err) { LOG.error("Cannot connect to {} as endpopint resolution failed", bookieId, err); return processBookieNotResolvedError(startTime, err); @@ -599,9 +617,15 @@ protected void initChannel(Channel ch) throws Exception { } }); - SocketAddress bookieAddr = addr.getSocketAddress(); + SocketAddress bookieAddr; if (eventLoopGroup instanceof DefaultEventLoopGroup) { bookieAddr = new LocalAddress(bookieId.toString()); + } else { + if (ConnectionMode.SNI_ROUTING == connectionMode) { + bookieAddr = proxyAddress.getSocketAddress(); + } else { + bookieAddr = targetBookieAddress.getSocketAddress(); + } } ChannelFuture future = bootstrap.connect(bookieAddr); @@ -1488,7 +1512,20 @@ public String toString() { void initTLSHandshake() { // create TLS handler PerChannelBookieClient parentObj = PerChannelBookieClient.this; - SslHandler handler = parentObj.shFactory.newTLSHandler(); + SslHandler handler; + BookieSocketAddress bookieSocketAddress = targetBookieAddress; + if (ConnectionMode.SNI_ROUTING == connectionMode) { + if (bookieSocketAddress != null) { + handler = parentObj.shFactory.newTLSHandler( + bookieSocketAddress.getHostName(), bookieSocketAddress.getPort()); + } else { + LOG.warn("No target bookie address for BookieId [{}] is resolved" + + " when connecting to proxy [{}] using SNI routing", bookieId, proxyAddress); + handler = parentObj.shFactory.newTLSHandler(); + } + } else { + handler = parentObj.shFactory.newTLSHandler(); + } channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler); handler.handshakeFuture().addListener(new GenericFutureListener>() { @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java index 5b43744fd1e..c266401afd7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java @@ -41,4 +41,6 @@ enum NodeType { void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator) throws SecurityException; SslHandler newTLSHandler(); + + SslHandler newTLSHandler(String sniHostName, int sniHostPort); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java index a9734862429..5b7ebc3fe9a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java @@ -465,7 +465,16 @@ public synchronized void init(NodeType type, AbstractConfiguration conf, ByteBuf @Override public SslHandler newTLSHandler() { SslHandler sslHandler = getSSLContext().newHandler(allocator); + return newTLSHandler(sslHandler); + } + + @Override + public SslHandler newTLSHandler(String sniHostName, int sniHostPort) { + SslHandler sslHandler = getSSLContext().newHandler(allocator, sniHostName, sniHostPort); + return newTLSHandler(sslHandler); + } + private SslHandler newTLSHandler(SslHandler sslHandler) { if (protocols != null && protocols.length != 0) { sslHandler.engine().setEnabledProtocols(protocols); }