diff --git a/modules/accord/.gradle/8.5/checksums/checksums.lock b/modules/accord/.gradle/8.5/checksums/checksums.lock new file mode 100644 index 000000000000..5c4647862ae9 Binary files /dev/null and b/modules/accord/.gradle/8.5/checksums/checksums.lock differ diff --git a/modules/accord/.gradle/8.5/checksums/md5-checksums.bin b/modules/accord/.gradle/8.5/checksums/md5-checksums.bin new file mode 100644 index 000000000000..0b7db9bb653d Binary files /dev/null and b/modules/accord/.gradle/8.5/checksums/md5-checksums.bin differ diff --git a/modules/accord/.gradle/8.5/checksums/sha1-checksums.bin b/modules/accord/.gradle/8.5/checksums/sha1-checksums.bin new file mode 100644 index 000000000000..645faffae070 Binary files /dev/null and b/modules/accord/.gradle/8.5/checksums/sha1-checksums.bin differ diff --git a/modules/accord/.gradle/8.5/checksums/sha256-checksums.bin b/modules/accord/.gradle/8.5/checksums/sha256-checksums.bin new file mode 100644 index 000000000000..d88b19d91dfe Binary files /dev/null and b/modules/accord/.gradle/8.5/checksums/sha256-checksums.bin differ diff --git a/modules/accord/.gradle/8.5/checksums/sha512-checksums.bin b/modules/accord/.gradle/8.5/checksums/sha512-checksums.bin new file mode 100644 index 000000000000..2c9c9b10052e Binary files /dev/null and b/modules/accord/.gradle/8.5/checksums/sha512-checksums.bin differ diff --git a/modules/accord/.gradle/8.5/dependencies-accessors/dependencies-accessors.lock b/modules/accord/.gradle/8.5/dependencies-accessors/dependencies-accessors.lock new file mode 100644 index 000000000000..f99b824b3a55 Binary files /dev/null and b/modules/accord/.gradle/8.5/dependencies-accessors/dependencies-accessors.lock differ diff --git a/modules/accord/.gradle/8.5/dependencies-accessors/gc.properties b/modules/accord/.gradle/8.5/dependencies-accessors/gc.properties new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/modules/accord/.gradle/8.5/executionHistory/executionHistory.bin b/modules/accord/.gradle/8.5/executionHistory/executionHistory.bin new file mode 100644 index 000000000000..d0b02c3f86e7 Binary files /dev/null and b/modules/accord/.gradle/8.5/executionHistory/executionHistory.bin differ diff --git a/modules/accord/.gradle/8.5/executionHistory/executionHistory.lock b/modules/accord/.gradle/8.5/executionHistory/executionHistory.lock new file mode 100644 index 000000000000..e3732ccd28c1 Binary files /dev/null and b/modules/accord/.gradle/8.5/executionHistory/executionHistory.lock differ diff --git a/modules/accord/.gradle/8.5/fileChanges/last-build.bin b/modules/accord/.gradle/8.5/fileChanges/last-build.bin new file mode 100644 index 000000000000..f76dd238ade0 Binary files /dev/null and b/modules/accord/.gradle/8.5/fileChanges/last-build.bin differ diff --git a/modules/accord/.gradle/8.5/fileHashes/fileHashes.bin b/modules/accord/.gradle/8.5/fileHashes/fileHashes.bin new file mode 100644 index 000000000000..34a2df6e27e5 Binary files /dev/null and b/modules/accord/.gradle/8.5/fileHashes/fileHashes.bin differ diff --git a/modules/accord/.gradle/8.5/fileHashes/fileHashes.lock b/modules/accord/.gradle/8.5/fileHashes/fileHashes.lock new file mode 100644 index 000000000000..1f5865dcd0fd Binary files /dev/null and b/modules/accord/.gradle/8.5/fileHashes/fileHashes.lock differ diff --git a/modules/accord/.gradle/8.5/fileHashes/resourceHashesCache.bin b/modules/accord/.gradle/8.5/fileHashes/resourceHashesCache.bin new file mode 100644 index 000000000000..7798f9e965dc Binary files /dev/null and b/modules/accord/.gradle/8.5/fileHashes/resourceHashesCache.bin differ diff --git a/modules/accord/.gradle/8.5/gc.properties b/modules/accord/.gradle/8.5/gc.properties new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/modules/accord/.gradle/buildOutputCleanup/buildOutputCleanup.lock b/modules/accord/.gradle/buildOutputCleanup/buildOutputCleanup.lock new file mode 100644 index 000000000000..7b6979253b8e Binary files /dev/null and b/modules/accord/.gradle/buildOutputCleanup/buildOutputCleanup.lock differ diff --git a/modules/accord/.gradle/buildOutputCleanup/cache.properties b/modules/accord/.gradle/buildOutputCleanup/cache.properties new file mode 100644 index 000000000000..c980bf2a0d69 --- /dev/null +++ b/modules/accord/.gradle/buildOutputCleanup/cache.properties @@ -0,0 +1,2 @@ +#Wed Oct 22 21:00:52 PDT 2025 +gradle.version=8.5 diff --git a/modules/accord/.gradle/buildOutputCleanup/outputFiles.bin b/modules/accord/.gradle/buildOutputCleanup/outputFiles.bin new file mode 100644 index 000000000000..046e64920110 Binary files /dev/null and b/modules/accord/.gradle/buildOutputCleanup/outputFiles.bin differ diff --git a/modules/accord/.gradle/file-system.probe b/modules/accord/.gradle/file-system.probe new file mode 100644 index 000000000000..1175b78e10b8 Binary files /dev/null and b/modules/accord/.gradle/file-system.probe differ diff --git a/modules/accord/.gradle/noVersion/buildLogic.lock b/modules/accord/.gradle/noVersion/buildLogic.lock new file mode 100644 index 000000000000..d4c8496c8ac9 Binary files /dev/null and b/modules/accord/.gradle/noVersion/buildLogic.lock differ diff --git a/modules/accord/.gradle/vcs-1/gc.properties b/modules/accord/.gradle/vcs-1/gc.properties new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/modules/accord/buildSrc/.gradle/8.5/executionHistory/executionHistory.bin b/modules/accord/buildSrc/.gradle/8.5/executionHistory/executionHistory.bin new file mode 100644 index 000000000000..31fec527d3e5 Binary files /dev/null and b/modules/accord/buildSrc/.gradle/8.5/executionHistory/executionHistory.bin differ diff --git a/modules/accord/buildSrc/.gradle/8.5/executionHistory/executionHistory.lock b/modules/accord/buildSrc/.gradle/8.5/executionHistory/executionHistory.lock new file mode 100644 index 000000000000..b2afdcc402cf Binary files /dev/null and b/modules/accord/buildSrc/.gradle/8.5/executionHistory/executionHistory.lock differ diff --git a/modules/accord/buildSrc/.gradle/buildOutputCleanup/buildOutputCleanup.lock b/modules/accord/buildSrc/.gradle/buildOutputCleanup/buildOutputCleanup.lock new file mode 100644 index 000000000000..98bb9f43f74c Binary files /dev/null and b/modules/accord/buildSrc/.gradle/buildOutputCleanup/buildOutputCleanup.lock differ diff --git a/modules/accord/buildSrc/.gradle/buildOutputCleanup/cache.properties b/modules/accord/buildSrc/.gradle/buildOutputCleanup/cache.properties new file mode 100644 index 000000000000..c980bf2a0d69 --- /dev/null +++ b/modules/accord/buildSrc/.gradle/buildOutputCleanup/cache.properties @@ -0,0 +1,2 @@ +#Wed Oct 22 21:00:52 PDT 2025 +gradle.version=8.5 diff --git a/modules/accord/buildSrc/.gradle/buildOutputCleanup/outputFiles.bin b/modules/accord/buildSrc/.gradle/buildOutputCleanup/outputFiles.bin new file mode 100644 index 000000000000..aa1261c8d552 Binary files /dev/null and b/modules/accord/buildSrc/.gradle/buildOutputCleanup/outputFiles.bin differ diff --git a/modules/accord/buildSrc/.gradle/file-system.probe b/modules/accord/buildSrc/.gradle/file-system.probe new file mode 100644 index 000000000000..f5f180f39bb4 Binary files /dev/null and b/modules/accord/buildSrc/.gradle/file-system.probe differ diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index cfb9f1ffc03e..4260fc8f1a13 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -34,6 +34,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.CountDownLatch; import org.slf4j.Logger; @@ -61,6 +62,7 @@ import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.net.InternodeConnectionUtils.isSSLError; import static org.apache.cassandra.net.MessagingService.current_version; import static org.apache.cassandra.net.OutboundConnectionInitiator.*; @@ -127,15 +129,15 @@ public class OutboundConnection /** Used in logging statements to lazily build a human-readable number of pending bytes. */ private final Object readablePendingBytes = - new Object() { @Override public String toString() { return prettyPrintMemory(pendingBytes()); } }; + new Object() { @Override public String toString() { return prettyPrintMemory(pendingBytes()); } }; /** Used in logging statements to lazily build a human-readable number of reserve endpoint bytes in use. */ private final Object readableReserveEndpointUsing = - new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.endpoint.using()); } }; + new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.endpoint.using()); } }; /** Used in logging statements to lazily build a human-readable number of reserve global bytes in use. */ private final Object readableReserveGlobalUsing = - new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.global.using()); } }; + new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.global.using()); } }; private volatile long submittedCount = 0; // updated with cas private volatile long overloadedCount = 0; // updated with cas @@ -314,8 +316,8 @@ void cancel() this.debug = template.debug; this.queue = new OutboundMessageQueue(approxTime, this::onExpired); this.delivery = type == ConnectionType.LARGE_MESSAGES - ? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor) - : new EventLoopDelivery(); + ? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor) + : new EventLoopDelivery(); setDisconnected(); } @@ -456,14 +458,14 @@ private void releaseCapacity(long count, long bytes) private void onOverloaded(Message message) { overloadedCountUpdater.incrementAndGet(this); - + int canonicalSize = canonicalSize(message); overloadedBytesUpdater.addAndGet(this, canonicalSize); - + noSpamLogger.warn("{} overloaded; dropping {} message (queue: {} local, {} endpoint, {} global)", - this, FBUtilities.prettyPrintMemory(canonicalSize), - readablePendingBytes, readableReserveEndpointUsing, readableReserveGlobalUsing); - + this, FBUtilities.prettyPrintMemory(canonicalSize), + readablePendingBytes, readableReserveEndpointUsing, readableReserveGlobalUsing); + callbacks.onOverloaded(message, template.to); } @@ -474,7 +476,13 @@ private void onOverloaded(Message message) */ private boolean onExpired(Message message) { - noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb()); + if (logger.isTraceEnabled()) + logger.trace("{} dropping message of type {} with payload {} whose timeout ({}ms) expired before reaching the network. {}ms elapsed after expiration. {}ms since creation.", + id(), message.verb(), message.payload, DatabaseDescriptor.getRpcTimeout(MILLISECONDS), + NANOSECONDS.toMillis(Clock.Global.nanoTime() - message.expiresAtNanos()), + message.elapsedSinceCreated(MILLISECONDS)); + else + noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb()); releaseCapacity(1, canonicalSize(message)); expiredCount += 1; expiredBytes += canonicalSize(message); @@ -999,9 +1007,9 @@ boolean doRun(Established established) { out.discard(); if (out.flushed() > 0 || - isCausedBy(t, cause -> isConnectionReset(cause) - || cause instanceof Errors.NativeIoException - || cause instanceof AsyncChannelOutputPlus.FlushException)) + isCausedBy(t, cause -> isConnectionReset(cause) + || cause instanceof Errors.NativeIoException + || cause instanceof AsyncChannelOutputPlus.FlushException)) { // close the channel, and wait for eventLoop to execute disconnectNow(established).awaitUninterruptibly(); @@ -1119,42 +1127,39 @@ void onCompletedHandshake(Result result) // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections assert !state.isClosed(); - MessagingSuccess success = result.success(); - debug.onConnect(success.messagingVersion, settings); - state.disconnected().maintenance.cancel(false); - - FrameEncoder.PayloadAllocator payloadAllocator = success.allocator; - Channel channel = success.channel; - Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings); - state = established; - channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() { - @Override - public void channelInactive(ChannelHandlerContext ctx) - { - disconnectNow(established); - ctx.fireChannelInactive(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - { - try - { - invalidateChannel(established, cause); + if (result.success() != null) { + MessagingSuccess success = (MessagingSuccess) result.success(); + debug.onConnect(success.messagingVersion, settings); + state.disconnected().maintenance.cancel(false); + + FrameEncoder.PayloadAllocator payloadAllocator = success.allocator; + Channel channel = success.channel; + Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings); + state = established; + channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() { + @Override + public void channelInactive(ChannelHandlerContext ctx) { + disconnectNow(established); + ctx.fireChannelInactive(); } - catch (Throwable t) - { - logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + invalidateChannel(established, cause); + } catch (Throwable t) { + logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + } } - } - }); - ++successfulConnections; + }); + ++successfulConnections; - logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", + logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", id(true), success.messagingVersion, settings.framing, encryptionConnectionSummary(channel)); + } break; case RETRY: @@ -1171,7 +1176,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) case INCOMPATIBLE: // we cannot communicate with this peer given its messaging version; mark this as any other failure, and continue trying Throwable t = new IOException(String.format("Incompatible peer: %s, messaging version: %s", - settings.to, result.incompatible().maxMessagingVersion)); + settings.to, result.incompatible().maxMessagingVersion)); t.fillInStackTrace(); onFailure(t); break; @@ -1205,7 +1210,7 @@ private void attempt(Promise> result, boolean sslFallba if (knownMessagingVersion != messagingVersion) { logger.trace("Endpoint version changed from {} to {} since connection initialized, updating.", - messagingVersion, knownMessagingVersion); + messagingVersion, knownMessagingVersion); messagingVersion = knownMessagingVersion; } @@ -1221,20 +1226,20 @@ private void attempt(Promise> result, boolean sslFallba // while talking to older nodes in the cluster which are configured to make NON-SSL connections SslFallbackConnectionType[] fallBackSslFallbackConnectionTypes = SslFallbackConnectionType.values(); int index = sslFallbackEnabled && settings.withEncryption() && settings.encryption.getOptional() ? - (int) (connectionAttempts - 1) % fallBackSslFallbackConnectionTypes.length : 0; + (int) (connectionAttempts - 1) % fallBackSslFallbackConnectionTypes.length : 0; if (fallBackSslFallbackConnectionTypes[index] != SslFallbackConnectionType.SERVER_CONFIG) { logger.info("ConnectionId {} is falling back to {} reconnect strategy for retry", id(), fallBackSslFallbackConnectionTypes[index]); } initiateMessaging(eventLoop, type, fallBackSslFallbackConnectionTypes[index], settings, result) - .addListener(future -> { - if (future.isCancelled()) - return; - if (future.isSuccess()) //noinspection unchecked - onCompletedHandshake((Result) future.getNow()); - else - onFailure(future.cause()); - }); + .addListener(future -> { + if (future.isCancelled()) + return; + if (future.isSuccess()) //noinspection unchecked + onCompletedHandshake((Result) future.getNow()); + else + onFailure(future.cause()); + }); } Future> initiate() @@ -1387,10 +1392,10 @@ private Future disconnectNow(Established closeIfIs) if (hasPending()) delivery.execute(); closeIfIs.channel.close() - .addListener(future -> { - if (!future.isSuccess()) - logger.info("Problem closing channel {}", closeIfIs, future.cause()); - }); + .addListener(future -> { + if (!future.isSuccess()) + logger.info("Problem closing channel {}", closeIfIs, future.cause()); + }); } }); } @@ -1477,7 +1482,7 @@ public Future close(boolean flushQueue) { assert state.isEstablished(); state.established().channel.close() - .addListener(new PromiseNotifier<>(closing)); + .addListener(new PromiseNotifier<>(closing)); } } catch (Throwable t) @@ -1604,8 +1609,8 @@ private String id(boolean includeReal) Channel channel = established.channel; OutboundConnectionSettings settings = established.settings; return SocketFactory.channelId(settings.from, (InetSocketAddress) channel.localAddress(), - settings.to, (InetSocketAddress) channel.remoteAddress(), - type, channel.id().asShortText()); + settings.to, (InetSocketAddress) channel.remoteAddress(), + type, channel.id().asShortText()); } private String id() @@ -1731,7 +1736,7 @@ int messagingVersion() { State state = this.state; return state.isEstablished() ? state.established().messagingVersion - : template.endpointToVersion().get(template.to); + : template.endpointToVersion().get(template.to); } @VisibleForTesting @@ -1776,4 +1781,4 @@ Limit unsafeGetEndpointReserveLimits() { return reserveCapacityInBytes.endpoint; } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index 63b99a45dbcd..69f95aa25fc4 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -29,6 +29,7 @@ import io.netty.util.concurrent.Future; //checkstyle: permit this import import io.netty.util.concurrent.Promise; //checkstyle: permit this import +//import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.slf4j.Logger; @@ -65,6 +66,9 @@ import static java.util.concurrent.TimeUnit.*; import static org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.OUTBOUND; import static org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.OUTBOUND_PRECONNECT; +//import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.NOT_REQUIRED; +//import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.OPTIONAL; +//import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.REQUIRED; import static org.apache.cassandra.net.InternodeConnectionUtils.DISCARD_HANDLER_NAME; import static org.apache.cassandra.net.InternodeConnectionUtils.SSL_FACTORY_CONTEXT_DESCRIPTION; import static org.apache.cassandra.net.InternodeConnectionUtils.SSL_HANDLER_NAME; @@ -119,7 +123,7 @@ public static Future> initiateStreaming(EventLoop event SslFallbackConnectionType sslConnectionType) { return new OutboundConnectionInitiator(STREAMING, sslConnectionType, settings, AsyncPromise.withExecutor(eventLoop)) - .initiate(eventLoop); + .initiate(eventLoop); } /** @@ -133,13 +137,12 @@ static Future> initiateMessaging(EventLoop eventLoop, C OutboundConnectionSettings settings, Promise> result) { return new OutboundConnectionInitiator<>(type, sslConnectionType, settings, result) - .initiate(eventLoop); + .initiate(eventLoop); } private Future> initiate(EventLoop eventLoop) { - if (logger.isTraceEnabled()) - logger.trace("creating outbound bootstrap to {}", settings); + logger.trace("creating outbound bootstrap to {}", settings); if (!settings.authenticator.authenticate(settings.to.getAddress(), settings.to.getPort(), null, OUTBOUND_PRECONNECT)) { @@ -154,20 +157,20 @@ private Future> initiate(EventLoop eventLoop) // and still guarantee that, if on timing out we raced with success, the successfully created channel is handled AtomicBoolean timedout = new AtomicBoolean(); io.netty.util.concurrent.Future bootstrap = createBootstrap(eventLoop) - .connect() - .addListener(future -> { - eventLoop.execute(() -> { - if (!future.isSuccess()) - { - if (future.isCancelled() && !timedout.get()) - resultPromise.cancel(true); - else if (future.isCancelled()) - resultPromise.tryFailure(new IOException("Timeout handshaking with " + settings.connectToId())); - else - resultPromise.tryFailure(future.cause()); - } - }); - }); + .connect() + .addListener(future -> { + eventLoop.execute(() -> { + if (!future.isSuccess()) + { + if (future.isCancelled() && !timedout.get()) + resultPromise.cancel(true); + else if (future.isCancelled()) + resultPromise.tryFailure(new IOException("Timeout handshaking with " + settings.connectToId())); + else + resultPromise.tryFailure(future.cause()); + } + }); + }); ScheduledFuture timeout = eventLoop.schedule(() -> { timedout.set(true); @@ -188,14 +191,14 @@ else if (future.isCancelled()) private Bootstrap createBootstrap(EventLoop eventLoop) { Bootstrap bootstrap = settings.socketFactory - .newClientBootstrap(eventLoop, settings.tcpUserTimeoutInMS) - .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.tcpConnectTimeoutInMS) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.TCP_NODELAY, settings.tcpNoDelay) - .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, NoSizeEstimator.instance) - .handler(new Initializer()); + .newClientBootstrap(eventLoop, settings.tcpUserTimeoutInMS) + .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.tcpConnectTimeoutInMS) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.TCP_NODELAY, settings.tcpNoDelay) + .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, NoSizeEstimator.instance) + .handler(new Initializer()); if (settings.socketSendBufferSizeInBytes > 0) bootstrap.option(ChannelOption.SO_SNDBUF, settings.socketSendBufferSizeInBytes); @@ -221,14 +224,15 @@ public void initChannel(SocketChannel channel) throws Exception // order of handlers: ssl -> server-authentication -> logger -> handshakeHandler if ((sslConnectionType == SslFallbackConnectionType.SERVER_CONFIG && settings.withEncryption()) - || sslConnectionType == SslFallbackConnectionType.SSL || sslConnectionType == SslFallbackConnectionType.MTLS) + || sslConnectionType == SslFallbackConnectionType.SSL || sslConnectionType == SslFallbackConnectionType.MTLS) { SslContext sslContext = getSslContext(sslConnectionType); // for some reason channel.remoteAddress() will return null InetAddressAndPort address = settings.to; InetSocketAddress peer = settings.encryption.require_endpoint_verification ? new InetSocketAddress(address.getAddress(), address.getPort()) : null; SslHandler sslHandler = newSslHandler(channel, sslContext, peer); - logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); + if (logger.isTraceEnabled()) + logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); pipeline.addFirst(SSL_HANDLER_NAME, sslHandler); } pipeline.addLast("server-authentication", new ServerAuthenticationHandler(settings)); @@ -253,6 +257,7 @@ else if (connectionType == SslFallbackConnectionType.SERVER_CONFIG) return SSLFactory.getOrCreateSslContext(settings.encryption, requireClientAuth, ISslContextFactory.SocketType.CLIENT, SSL_FACTORY_CONTEXT_DESCRIPTION); } + } /** @@ -310,7 +315,7 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception logger.trace("starting handshake with peer {}, msg = {}", settings.connectToId(), msg); AsyncChannelPromise.writeAndFlush(ctx, msg.encode(), - future -> { if (!future.isSuccess()) exceptionCaught(ctx, future.cause()); }); + future -> { if (!future.isSuccess()) exceptionCaught(ctx, future.cause()); }); ctx.fireChannelActive(); } @@ -517,15 +522,28 @@ private Result(Outcome outcome) } boolean isSuccess() { return outcome == Outcome.SUCCESS; } - public SuccessType success() { return (SuccessType) this; } + public Success success() { + if (this.outcome == outcome.SUCCESS) + return (Success) this; + return null; + } static MessagingSuccess messagingSuccess(Channel channel, int messagingVersion, FrameEncoder.PayloadAllocator allocator) { return new MessagingSuccess(channel, messagingVersion, allocator); } static StreamingSuccess streamingSuccess(Channel channel, int messagingVersion) { return new StreamingSuccess(channel, messagingVersion); } - public Retry retry() { return (Retry) this; } + public Retry retry() { + if (this.outcome == outcome.RETRY) + return (Retry) this; + return null; + } static Result retry(int withMessagingVersion) { return new Retry<>(withMessagingVersion); } - public Incompatible incompatible() { return (Incompatible) this; } + public Incompatible incompatible() + { + if (this.outcome == outcome.INCOMPATIBLE) + return (Incompatible) this; + return null; + } static Result incompatible(int closestSupportedVersion, int maxMessagingVersion) { return new Incompatible(closestSupportedVersion, maxMessagingVersion); } } -} +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/net/StreamingTest.java b/test/unit/org/apache/cassandra/net/StreamingTest.java new file mode 100644 index 000000000000..1bc35babf16f --- /dev/null +++ b/test/unit/org/apache/cassandra/net/StreamingTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import com.google.common.net.InetAddresses; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +//import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.Builder; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.gms.GossipDigestSyn; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.security.DefaultSslContextFactory; +//import org.apache.cassandra.transport.TlsTestUtils; + +import static org.apache.cassandra.net.OutboundConnectionInitiator.Result; +import static org.apache.cassandra.net.OutboundConnectionInitiator.SslFallbackConnectionType; +import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming; +import static org.apache.cassandra.net.MessagingService.current_version; +import static org.apache.cassandra.net.MessagingService.minimum_version; +//import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.NOT_REQUIRED; +//import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.REQUIRED; +//import static org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER; +public class StreamingTest +{ + private static final SocketFactory factory = new SocketFactory(); + static final InetAddressAndPort TO_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 7012); + static final InetAddressAndPort FROM_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 7012); + private volatile Throwable handshakeEx; + @BeforeClass + public static void startup() + { + DatabaseDescriptor.daemonInitialization(); + CommitLog.instance.start(); + } + + @AfterClass + public static void cleanup() throws InterruptedException + { + factory.shutdownNow(); + } + + @Before + public void setup() + { + handshakeEx = null; + } + + private Result streamingConnect(AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException + { + InboundSockets inbound = new InboundSockets(new InboundConnectionSettings().withAcceptMessaging(acceptInbound)); + try + { + inbound.open(); + InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> s.settings.bindAddress).findFirst().get(); + EventLoop eventLoop = factory.defaultGroup().next(); + Future> result = initiateStreaming(eventLoop, + SslFallbackConnectionType.SERVER_CONFIG, + new OutboundConnectionSettings(endpoint) + .withAcceptVersions(acceptOutbound) + .withDefaults(ConnectionCategory.STREAMING) + ); + result.awaitUninterruptibly(); + Assert.assertTrue(result.isSuccess()); + + return result.getNow(); + } + finally + { + inbound.close().await(1L, TimeUnit.SECONDS); + } + } + + @Test + public void testIncompatibleVersion() throws InterruptedException, ExecutionException + { + Result nowResult = streamingConnect(new AcceptVersions(current_version + 1, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + Assert.assertNull(nowResult.success()); + Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.incompatible().closestSupportedVersion); + Assert.assertEquals(current_version, nowResult.incompatible().maxMessagingVersion); + } + + @Test + public void testCompatibleVersion() throws InterruptedException, ExecutionException + { + Result nowResult = streamingConnect(new AcceptVersions(MessagingService.minimum_version, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + Assert.assertNotNull(nowResult.success()); + Assert.assertNotNull(nowResult.success().channel); + Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.success().messagingVersion); + } + + private ServerEncryptionOptions getServerEncryptionOptions(SslFallbackConnectionType sslConnectionType, boolean optional) + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions().withOptional(optional) + .withKeyStore("test/conf/cassandra_ssl_test.keystore") + .withKeyStorePassword("cassandra") + .withOutboundKeystore("test/conf/cassandra_ssl_test_outbound.keystore") + .withOutboundKeystorePassword("cassandra") + .withTrustStore("test/conf/cassandra_ssl_test.truststore") + .withTrustStorePassword("cassandra") + .withSslContextFactory((new ParameterizedClass(DefaultSslContextFactory.class.getName(), + new HashMap<>()))); + if (sslConnectionType == SslFallbackConnectionType.MTLS) + { + serverEncryptionOptions = serverEncryptionOptions.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all) + .withRequireClientAuth(true); + } + else if (sslConnectionType == SslFallbackConnectionType.SSL) + { + serverEncryptionOptions = serverEncryptionOptions.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all) + .withRequireClientAuth(false); + } + return serverEncryptionOptions; + } + + private OutboundConnection initiateOutbound(InetAddressAndPort endpoint, SslFallbackConnectionType connectionType, boolean optional) throws ClosedChannelException + { + final OutboundConnectionSettings settings = new OutboundConnectionSettings(endpoint) + .withAcceptVersions(new AcceptVersions(minimum_version, current_version)) + .withDefaults(ConnectionCategory.MESSAGING) + .withEncryption(getServerEncryptionOptions(connectionType, optional)) + .withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx = t)) + .withFrom(FROM_ADDR); + OutboundConnections outboundConnections = OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings); + GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", new ArrayList<>(0)); + Message message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn); + OutboundConnection outboundConnection = outboundConnections.connectionFor(message); + outboundConnection.enqueue(message); + return outboundConnection; + } + private static class HandshakeAcknowledgeChecker implements OutboundDebugCallbacks + { + private final AtomicInteger acks = new AtomicInteger(0); + private final Consumer fail; + + private HandshakeAcknowledgeChecker(Consumer fail) + { + this.fail = fail; + } + + @Override + public void onSendSmallFrame(int messageCount, int payloadSizeInBytes) + { + } + + @Override + public void onSentSmallFrame(int messageCount, int payloadSizeInBytes) + { + } + + @Override + public void onFailedSmallFrame(int messageCount, int payloadSizeInBytes) + { + } + + @Override + public void onConnect(int messagingVersion, OutboundConnectionSettings settings) + { + if (acks.incrementAndGet() > 1) + fail.accept(new AssertionError("Handshake was acknowledged more than once")); + } + } +} \ No newline at end of file