Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ public enum Property {
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
MANAGER_ASYNC_CLIENTPORT("manager.port.async.client", "8999", PropertyType.PORT,
"The port used for handling client connections on the manager for async services.", "4.0.0"),
MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down Expand Up @@ -1165,7 +1167,11 @@ public enum Property {
@Experimental
COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
"compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION,
"The interval at which to check the tservers for external compactions.", "2.1.0");
"The interval at which to check the tservers for external compactions.", "2.1.0"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed some of the existing props were unused when looking at this PR and opened #4932

COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME(
"compaction.coordinator.wait.time.job.request.max", "2m", PropertyType.TIMEDURATION,
"The maximum amount of time the coordinator will wait for a requested job from the job queue.",
"4.0.0");

private final String key;
private final String defaultValue;
Expand Down
49 changes: 49 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.rpc;

import static org.apache.accumulo.core.rpc.clients.ThriftClientTypes.COORDINATOR;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;

import java.io.IOException;
Expand All @@ -26,14 +27,18 @@
import java.nio.channels.ClosedByInterruptException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSaslClientTransport;
Expand All @@ -44,6 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;
import com.google.common.net.HostAndPort;

/**
Expand Down Expand Up @@ -113,6 +119,21 @@ public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> type,
HostAndPort address, ClientContext context) throws TTransportException {
TTransport transport = context.getTransportPool().getTransport(type, address,
context.getClientTimeoutInMillis(), context, true);

// TODO - This is temporary until we support Async multiplexing
// THRIFT-2427 is tracking this issue and the plan is to reopen a new PR
// to add support in the next version. Once we support multiplexing we can
// remove this special case.
// Note: you could have a sync server that doesn't use SSL or SASL by using
// ThriftServerType.THREADPOOL on the server but the client wouldn't know that
// For now this should be good enough as we will be able to remove this anyways
// when we can multiplex async. So this currently will not work if using that mode.
boolean isSync = context.getConfiguration().getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)
|| context.getConfiguration().getBoolean(Property.INSTANCE_RPC_SSL_ENABLED);
if (type == COORDINATOR && !isSync) {
return type.getClientFactory().getClient(protocolFactory.getProtocol(transport));
}

return createClient(type, transport);
}

Expand Down Expand Up @@ -382,4 +403,32 @@ public static void checkIOExceptionCause(IOException e) {
throw new UncheckedIOException(e);
}
}

public static <T> AsyncMethodCallback<T> asyncMethodFuture(CompletableFuture<T> future) {
return new AsyncMethodCallback<T>() {
@Override
public void onComplete(T response) {
future.complete(response);
}

@Override
public void onError(Exception exception) {
future.completeExceptionally(exception);
}
};
}

public static <T> T getFutureThriftResult(CompletableFuture<T> future) throws TException {
try {
return future.get();
} catch (Exception e) {
if (e instanceof ExecutionException) {
Throwable rootCause = Throwables.getRootCause(e);
if (rootCause instanceof TException) {
throw (TException) rootCause;
}
}
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.net.Socket;
import java.nio.channels.SelectionKey;

import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerTransport;
Expand Down Expand Up @@ -99,11 +102,11 @@ public CustomSelectAcceptThread(TNonblockingServerTransport serverTransport)
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey, final AbstractSelectThread selectThread)
throws TTransportException {
if (processorFactory_.isAsyncProcessor()) {
throw new IllegalStateException("This implementation does not support AsyncProcessors");
}

return new CustomFrameBuffer(trans, selectionKey, selectThread);
TProcessor processor = processorFactory_.getProcessor(null);
return processor instanceof TAsyncProcessor
? new CustomAsyncFrameBuffer(trans, selectionKey, selectThread)
: new CustomFrameBuffer(trans, selectionKey, selectThread);
}
}

Expand All @@ -118,7 +121,7 @@ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
AbstractSelectThread selectThread) throws TTransportException {
super(trans, selectionKey, selectThread);
// Store the clientAddress in the buffer so it can be referenced for logging during read/write
this.clientAddress = getClientAddress();
this.clientAddress = getClientAddress(trans_, "CustomFrameBuffer");
}

@Override
Expand Down Expand Up @@ -148,33 +151,78 @@ public boolean write() {
}
return result;
}
}

/**
* Custom wrapper around
* {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} to extract the
* client's network location before accepting the request.
*/
private class CustomAsyncFrameBuffer extends AsyncFrameBuffer {
private final String clientAddress;

public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
AbstractSelectThread selectThread) throws TTransportException {
super(trans, selectionKey, selectThread);
// Store the clientAddress in the buffer so it can be referenced for logging during read/write
this.clientAddress = getClientAddress(trans_, "CustomAsyncFrameBuffer");
}

@Override
public void invoke() {
// On invoke() set the clientAddress on the ThreadLocal so that it can be accessed elsewhere
// in the same thread that called invoke() on the buffer
TServerUtils.clientAddress.set(clientAddress);
super.invoke();
}

@Override
public boolean read() {
boolean result = super.read();
if (!result) {
log.trace("CustomAsyncFrameBuffer.read returned false when reading data from client: {}",
clientAddress);
}
return result;
}

/*
* Helper method used to capture the client address inside the CustomFrameBuffer constructor so
* that it can be referenced inside the read/write methods for logging purposes. It previously
* was only set on the ThreadLocal in the invoke() method but that does not work because A) the
* method isn't called until after reading is finished so the value will be null inside of
* read() and B) The other problem is that invoke() is called on a different thread than
* read()/write() so even if the order was correct it would not be available.
*
* Since a new FrameBuffer is created for each request we can use it to capture the client
* address earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read
* data and write a response back to the client and as part of creation of the buffer the
* TNonblockingSocket is stored as a final variable and won't change so we can safely capture
* the clientAddress in the constructor and use it for logging during read/write and then use
* the value inside of invoke() to set the ThreadLocal so the client address will still be
* available on the thread that called invoke().
*/
private String getClientAddress() {
String clientAddress = null;
if (trans_ instanceof TNonblockingSocket) {
TNonblockingSocket tsock = (TNonblockingSocket) trans_;
Socket sock = tsock.getSocketChannel().socket();
clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort();
log.trace("CustomFrameBuffer captured client address: {}", clientAddress);
@Override
public boolean write() {
boolean result = super.write();
if (!result) {
log.trace("CustomAsyncFrameBuffer.write returned false when writing data to client: {}",
clientAddress);
}
return clientAddress;
return result;
}
}

/*
* Helper method used to capture the client address inside the CustomFrameBuffer and
* CustomAsyncFrameBuffer constructors so that it can be referenced inside the read/write methods
* for logging purposes. It previously was only set on the ThreadLocal in the invoke() method but
* that does not work because A) the method isn't called until after reading is finished so the
* value will be null inside of read() and B) The other problem is that invoke() is called on a
* different thread than read()/write() so even if the order was correct it would not be
* available.
*
* Since a new FrameBuffer is created for each request we can use it to capture the client address
* earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read data and
* write a response back to the client and as part of creation of the buffer the
* TNonblockingSocket is stored as a final variable and won't change so we can safely capture the
* clientAddress in the constructor and use it for logging during read/write and then use the
* value inside of invoke() to set the ThreadLocal so the client address will still be available
* on the thread that called invoke().
*/
private static String getClientAddress(TNonblockingTransport transport, String name) {
String clientAddress = null;
if (transport instanceof TNonblockingSocket) {
TNonblockingSocket tsock = (TNonblockingSocket) transport;
Socket sock = tsock.getSocketChannel().socket();
clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort();
log.trace("{} captured client address: {}", name, clientAddress);
}
return clientAddress;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.rpc.TimedProcessor.AsyncTimedProcessor;
import org.apache.accumulo.server.rpc.TimedProcessor.SyncTimedProcessor;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocolFactory;
Expand Down Expand Up @@ -170,7 +173,7 @@ public static ServerAddress startServer(ServerContext context, String hostname,
// create the TimedProcessor outside the port search loop so we don't try to
// register the same
// metrics mbean more than once
TimedProcessor timedProcessor = new TimedProcessor(processor, context.getMetricsInfo());
TimedProcessor timedProcessor = newTimedProcessor(processor, context.getMetricsInfo());

HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
try {
Expand Down Expand Up @@ -574,7 +577,7 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf,
}

try {
return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName,
return startTServer(serverType, newTimedProcessor(processor, metricsInfo), serverName,
threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize,
sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses);
} catch (TTransportException e) {
Expand Down Expand Up @@ -702,4 +705,10 @@ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProc

return new UGIAssumingProcessor(processor);
}

private static TimedProcessor newTimedProcessor(TProcessor processor, MetricsInfo metricsInfo) {
return processor instanceof TAsyncProcessor
? new AsyncTimedProcessor((TAsyncProcessor) processor, metricsInfo)
: new SyncTimedProcessor(processor, metricsInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package org.apache.accumulo.server.rpc;

import java.util.Optional;

import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncProcessor;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Iface;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.gc.thrift.GCMonitorService;
import org.apache.accumulo.core.manager.thrift.FateService;
Expand All @@ -32,7 +36,6 @@
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.thrift.TBaseProcessor;
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TServiceClient;
Expand All @@ -47,8 +50,8 @@ public ThriftProcessorTypes(ThriftClientTypes<C> type) {
}

@VisibleForTesting
public <I,H extends I,P extends TBaseProcessor<?>> TProcessor getTProcessor(
Class<P> processorClass, Class<I> interfaceClass, H serviceHandler, ServerContext context) {
public <I,H extends I,P extends TProcessor> P getTProcessor(Class<P> processorClass,
Class<I> interfaceClass, H serviceHandler, ServerContext context) {
I rpcProxy = TraceUtil.wrapService(serviceHandler);
if (context.getThriftServerType() == ThriftServerType.SASL) {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -114,20 +117,32 @@ public static TMultiplexedProcessor getGcTProcessor(GCMonitorService.Iface servi
}

public static TMultiplexedProcessor getManagerTProcessor(FateService.Iface fateServiceHandler,
CompactionCoordinatorService.Iface coordinatorServiceHandler,
ManagerClientService.Iface managerServiceHandler, ServerContext context) {
ManagerClientService.Iface managerServiceHandler,
Optional<CompactionCoordinatorService.Iface> coordinatorServiceHandler,
ServerContext context) {
TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
muxProcessor.registerProcessor(FATE.getServiceName(), FATE.getTProcessor(
FateService.Processor.class, FateService.Iface.class, fateServiceHandler, context));
muxProcessor.registerProcessor(COORDINATOR.getServiceName(),
COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class,
CompactionCoordinatorService.Iface.class, coordinatorServiceHandler, context));
coordinatorServiceHandler
.ifPresent(csh -> muxProcessor.registerProcessor(COORDINATOR.getServiceName(),
COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class, Iface.class,
csh, context)));
muxProcessor.registerProcessor(MANAGER.getServiceName(),
MANAGER.getTProcessor(ManagerClientService.Processor.class,
ManagerClientService.Iface.class, managerServiceHandler, context));
return muxProcessor;
}

public static AsyncProcessor<?> getManagerTAsyncProcessor(
CompactionCoordinatorService.AsyncIface coordinatorServiceHandler, ServerContext context) {
// TODO - Right now this is temporarily returning the single AsyncProcessor
// Once Thrift supports Async multiplexing then we can switch to using
// TMultiplexedAsyncProcessor and use multiplexing like we do for sync processors
// THRIFT-2427 is tracking this issue
return COORDINATOR.getTProcessor(CompactionCoordinatorService.AsyncProcessor.class,
CompactionCoordinatorService.AsyncIface.class, coordinatorServiceHandler, context);
}

public static TMultiplexedProcessor getScanServerTProcessor(ClientServiceHandler clientHandler,
TabletScanClientService.Iface tserverHandler, ServerContext context) {
TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,14 @@ public String toString() {
public static ThriftServerType getDefault() {
return CUSTOM_HS_HA;
}

public static boolean supportsAsync(ThriftServerType thriftServerType) {
switch (thriftServerType) {
case CUSTOM_HS_HA:
case THREADED_SELECTOR:
return true;
default:
return false;
}
}
}
Loading