From 3820236b5c4c730b97d27bc34ee63d954eb8b5f0 Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Mon, 27 Oct 2025 19:07:37 -0700 Subject: [PATCH 1/5] Added scaffolding for StreamingTest --- .../cassandra/net/OutboundConnection.java | 56 +++++++++---------- .../net/OutboundConnectionInitiator.java | 18 +++++- 2 files changed, 41 insertions(+), 33 deletions(-) diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 821521bfb932..28e2e3bf7a12 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -1118,43 +1118,39 @@ void onCompletedHandshake(Result result) case SUCCESS: // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections assert !state.isClosed(); - - MessagingSuccess success = result.success(); - debug.onConnect(success.messagingVersion, settings); - state.disconnected().maintenance.cancel(false); - - FrameEncoder.PayloadAllocator payloadAllocator = success.allocator; - Channel channel = success.channel; - Established established = new Established(messagingVersion, channel, payloadAllocator, settings); - state = established; - channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() { - @Override - public void channelInactive(ChannelHandlerContext ctx) - { - disconnectNow(established); - ctx.fireChannelInactive(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - { - try - { - invalidateChannel(established, cause); + if(result.success() instanceof MessagingSuccess) { + MessagingSuccess success = (MessagingSuccess) result.success(); + debug.onConnect(success.messagingVersion, settings); + state.disconnected().maintenance.cancel(false); + + FrameEncoder.PayloadAllocator payloadAllocator = success.allocator; + Channel channel = success.channel; + Established established = new Established(messagingVersion, channel, payloadAllocator, settings); + state = established; + channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() { + @Override + public void channelInactive(ChannelHandlerContext ctx) { + disconnectNow(established); + ctx.fireChannelInactive(); } - catch (Throwable t) - { - logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + invalidateChannel(established, cause); + } catch (Throwable t) { + logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + } } - } - }); - ++successfulConnections; + }); + ++successfulConnections; - logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", + logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", id(true), success.messagingVersion, settings.framing, encryptionConnectionSummary(channel)); + } break; case RETRY: diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index 5de2a080d957..35b6d29c76fe 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -479,14 +479,26 @@ private Result(Outcome outcome) } boolean isSuccess() { return outcome == Outcome.SUCCESS; } - public SuccessType success() { return (SuccessType) this; } + public Success success() { + if (this.outcome == Outcome.SUCCESS) + return (Success) this; + return null; + } static MessagingSuccess messagingSuccess(Channel channel, int messagingVersion, FrameEncoder.PayloadAllocator allocator) { return new MessagingSuccess(channel, messagingVersion, allocator); } static StreamingSuccess streamingSuccess(Channel channel, int messagingVersion) { return new StreamingSuccess(channel, messagingVersion); } - public Retry retry() { return (Retry) this; } + public Retry retry() { + if (this.outcome == Outcome.RETRY) + return (Retry) this; + return null; + } static Result retry(int withMessagingVersion) { return new Retry<>(withMessagingVersion); } - public Incompatible incompatible() { return (Incompatible) this; } + public Incompatible incompatible() { + if(this.outcome == Outcome.INCOMPATIBLE) + return (Incompatible) this; + return null; + } static Result incompatible(int closestSupportedVersion, int maxMessagingVersion) { return new Incompatible(closestSupportedVersion, maxMessagingVersion); } } From 9f3cb8e63e95f53e96907a26d7efcec593d67226 Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Mon, 27 Oct 2025 19:08:12 -0700 Subject: [PATCH 2/5] Added scaffolding for StreamingTest --- .../apache/cassandra/net/StreamingTest.java | 242 ++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 test/unit/org/apache/cassandra/net/StreamingTest.java diff --git a/test/unit/org/apache/cassandra/net/StreamingTest.java b/test/unit/org/apache/cassandra/net/StreamingTest.java new file mode 100644 index 000000000000..2d6280d3cb95 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/StreamingTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.nio.channels.ClosedChannelException; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSuccess; +import org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess; + +import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming; +import static org.apache.cassandra.net.MessagingService.VERSION_30; +import static org.apache.cassandra.net.MessagingService.VERSION_3014; +import static org.apache.cassandra.net.MessagingService.current_version; +import static org.apache.cassandra.net.MessagingService.minimum_version; +import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES; +import static org.apache.cassandra.net.OutboundConnectionInitiator.*; + +// TODO: test failure due to exception, timeout, etc +public class StreamingTest +{ + private static final SocketFactory factory = new SocketFactory(); + + @BeforeClass + public static void startup() + { + DatabaseDescriptor.daemonInitialization(); + CommitLog.instance.start(); + } + + @AfterClass + public static void cleanup() throws InterruptedException + { + factory.shutdownNow(); + } + + private Result handshake(int req, int outMin, int outMax) throws ExecutionException, InterruptedException + { + return handshake(req, new AcceptVersions(outMin, outMax), null); + } + private Result handshake(int req, int outMin, int outMax, int inMin, int inMax) throws ExecutionException, InterruptedException + { + return handshake(req, new AcceptVersions(outMin, outMax), new AcceptVersions(inMin, inMax)); + } + private Result handshake(int req, AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException + { + InboundSockets inbound = new InboundSockets(new InboundConnectionSettings().withAcceptMessaging(acceptInbound)); + try + { + inbound.open(); + InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> s.settings.bindAddress).findFirst().get(); + EventLoop eventLoop = factory.defaultGroup().next(); + Future> future = + initiateMessaging(eventLoop, + SMALL_MESSAGES, + new OutboundConnectionSettings(endpoint) + .withAcceptVersions(acceptOutbound) + .withDefaults(ConnectionCategory.MESSAGING), + req, AsyncPromise.withExecutor(eventLoop)); + return future.get(); + } + finally + { + inbound.close().await(1L, TimeUnit.SECONDS); + } + } + private Result streamingConnect(int req, AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException + { + InboundSockets inbound = new InboundSockets(new InboundConnectionSettings().withAcceptMessaging(acceptInbound)); + try + { + inbound.open(); + InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> s.settings.bindAddress).findFirst().get(); + EventLoop eventLoop = factory.defaultGroup().next(); + Future> future = + initiateStreaming(eventLoop, + new OutboundConnectionSettings(endpoint) + .withAcceptVersions(acceptOutbound) + .withDefaults(ConnectionCategory.STREAMING), req); + return future.get(); + } + finally + { + inbound.close().await(1L, TimeUnit.SECONDS); + } + } + @Test + public void testBothCurrentVersion() throws InterruptedException, ExecutionException + { + Result result = handshake(current_version, minimum_version, current_version); + Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); + result.success().channel.close(); + } + + @Test + public void testSendCompatibleOldVersion() throws InterruptedException, ExecutionException + { + Result result = handshake(current_version, current_version, current_version + 1, current_version +1, current_version + 2); + Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); + Assert.assertEquals(current_version + 1, result.success().messagingVersion); + result.success().channel.close(); + } + + @Test + public void testSendCompatibleFutureVersion() throws InterruptedException, ExecutionException + { + Result result = handshake(current_version + 1, current_version - 1, current_version + 1); + Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); + Assert.assertEquals(current_version, result.success().messagingVersion); + result.success().channel.close(); + } + + @Test + public void testSendIncompatibleFutureVersion() throws InterruptedException, ExecutionException + { + Result result = handshake(current_version + 1, current_version + 1, current_version + 1); + Assert.assertEquals(Result.Outcome.INCOMPATIBLE, result.outcome); + Assert.assertEquals(current_version, result.incompatible().closestSupportedVersion); + Assert.assertEquals(current_version, result.incompatible().maxMessagingVersion); + } + + @Test + public void testSendIncompatibleOldVersion() throws InterruptedException, ExecutionException + { + Result result = handshake(current_version + 1, current_version + 1, current_version + 1, current_version + 2, current_version + 3); + Assert.assertEquals(Result.Outcome.INCOMPATIBLE, result.outcome); + Assert.assertEquals(current_version + 2, result.incompatible().closestSupportedVersion); + Assert.assertEquals(current_version + 3, result.incompatible().maxMessagingVersion); + } + + @Test + public void testSendCompatibleMaxVersionPre40() throws InterruptedException, ExecutionException + { + Result result = handshake(VERSION_3014, VERSION_30, VERSION_3014, VERSION_30, VERSION_3014); + Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); + Assert.assertEquals(VERSION_3014, result.success().messagingVersion); + result.success().channel.close(); + } + + @Test + public void testSendCompatibleFutureVersionPre40() throws InterruptedException, ExecutionException + { + Result result = handshake(VERSION_3014, VERSION_30, VERSION_3014, VERSION_30, VERSION_30); + Assert.assertEquals(Result.Outcome.RETRY, result.outcome); + Assert.assertEquals(VERSION_30, result.retry().withMessagingVersion); + } + + @Test + public void testSendIncompatibleFutureVersionPre40() throws InterruptedException, ExecutionException + { + Result result = handshake(VERSION_3014, VERSION_3014, VERSION_3014, VERSION_30, VERSION_30); + Assert.assertEquals(Result.Outcome.INCOMPATIBLE, result.outcome); + Assert.assertEquals(-1, result.incompatible().closestSupportedVersion); + Assert.assertEquals(VERSION_30, result.incompatible().maxMessagingVersion); + } + + @Test + public void testSendCompatibleOldVersionPre40() throws InterruptedException + { + try + { + handshake(VERSION_30, VERSION_30, VERSION_3014, VERSION_3014, VERSION_3014); + Assert.fail("Should have thrown"); + } + catch (ExecutionException e) + { + Assert.assertTrue(e.getCause() instanceof ClosedChannelException); + } + } + + @Test + public void testSendIncompatibleOldVersionPre40() throws InterruptedException + { + try + { + handshake(VERSION_30, VERSION_30, VERSION_30, VERSION_3014, VERSION_3014); + Assert.fail("Should have thrown"); + } + catch (ExecutionException e) + { + Assert.assertTrue(e.getCause() instanceof ClosedChannelException); + } + } + + @Test + public void testSendCompatibleOldVersion40() throws InterruptedException, ExecutionException + { + Result result = handshake(VERSION_30, VERSION_30, VERSION_30, VERSION_30, current_version); + Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); + Assert.assertEquals(VERSION_30, result.success().messagingVersion); + } + + @Test + public void testSendIncompatibleOldVersion40() throws InterruptedException + { + try + { + Assert.fail(Objects.toString(handshake(VERSION_30, VERSION_30, VERSION_30, current_version, current_version))); + } + catch (ExecutionException e) + { + Assert.assertTrue(e.getCause() instanceof ClosedChannelException); + } + } + + @Test // fairly contrived case, but since we introduced logic for testing we need to be careful it doesn't make us worse + public void testSendToFuturePost40BelievedToBePre40() throws InterruptedException, ExecutionException + { + Result result = handshake(VERSION_30, VERSION_30, current_version, VERSION_30, current_version + 1); + Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); + Assert.assertEquals(VERSION_30, result.success().messagingVersion); + } +} From 704d983f3244dba14b36e09d37a7e59c7528cb7b Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Wed, 29 Oct 2025 20:21:39 -0700 Subject: [PATCH 3/5] Triage test failure --- .../apache/cassandra/net/StreamingTest.java | 219 ++++++------------ 1 file changed, 73 insertions(+), 146 deletions(-) diff --git a/test/unit/org/apache/cassandra/net/StreamingTest.java b/test/unit/org/apache/cassandra/net/StreamingTest.java index 2d6280d3cb95..082c4ff0c05b 100644 --- a/test/unit/org/apache/cassandra/net/StreamingTest.java +++ b/test/unit/org/apache/cassandra/net/StreamingTest.java @@ -19,13 +19,17 @@ package org.apache.cassandra.net; import java.nio.channels.ClosedChannelException; -import java.util.Objects; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; -import org.apache.cassandra.utils.concurrent.AsyncPromise; +import com.google.common.net.InetAddresses; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -33,23 +37,20 @@ import io.netty.util.concurrent.Future; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.gms.GossipDigestSyn; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSuccess; -import org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess; -import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming; -import static org.apache.cassandra.net.MessagingService.VERSION_30; -import static org.apache.cassandra.net.MessagingService.VERSION_3014; import static org.apache.cassandra.net.MessagingService.current_version; import static org.apache.cassandra.net.MessagingService.minimum_version; -import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES; -import static org.apache.cassandra.net.OutboundConnectionInitiator.*; - -// TODO: test failure due to exception, timeout, etc +//import static org.apache.cassandra.net.MessagingService.VERSION_40; +import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming; +import static org.apache.cassandra.net.OutboundConnectionInitiator.Result; public class StreamingTest { private static final SocketFactory factory = new SocketFactory(); - + static final InetAddressAndPort TO_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 7012); + static final InetAddressAndPort FROM_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 7012); + private volatile Throwable handshakeEx; @BeforeClass public static void startup() { @@ -63,180 +64,106 @@ public static void cleanup() throws InterruptedException factory.shutdownNow(); } - private Result handshake(int req, int outMin, int outMax) throws ExecutionException, InterruptedException + @Before + public void setup() { - return handshake(req, new AcceptVersions(outMin, outMax), null); + handshakeEx = null; } - private Result handshake(int req, int outMin, int outMax, int inMin, int inMax) throws ExecutionException, InterruptedException - { - return handshake(req, new AcceptVersions(outMin, outMax), new AcceptVersions(inMin, inMax)); - } - private Result handshake(int req, AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException - { - InboundSockets inbound = new InboundSockets(new InboundConnectionSettings().withAcceptMessaging(acceptInbound)); - try - { - inbound.open(); - InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> s.settings.bindAddress).findFirst().get(); - EventLoop eventLoop = factory.defaultGroup().next(); - Future> future = - initiateMessaging(eventLoop, - SMALL_MESSAGES, - new OutboundConnectionSettings(endpoint) - .withAcceptVersions(acceptOutbound) - .withDefaults(ConnectionCategory.MESSAGING), - req, AsyncPromise.withExecutor(eventLoop)); - return future.get(); - } - finally - { - inbound.close().await(1L, TimeUnit.SECONDS); - } - } - private Result streamingConnect(int req, AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException + + private Result streamingConnect(AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException { InboundSockets inbound = new InboundSockets(new InboundConnectionSettings().withAcceptMessaging(acceptInbound)); try { inbound.open(); + EventLoop eventLoop = MessagingService.instance().socketFactory.outboundStreamingGroup().next(); + InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> s.settings.bindAddress).findFirst().get(); - EventLoop eventLoop = factory.defaultGroup().next(); - Future> future = - initiateStreaming(eventLoop, - new OutboundConnectionSettings(endpoint) - .withAcceptVersions(acceptOutbound) - .withDefaults(ConnectionCategory.STREAMING), req); - return future.get(); + OutboundConnectionSettings settings = new OutboundConnectionSettings(endpoint).withAcceptVersions(acceptOutbound) + .withDefaults(ConnectionCategory.STREAMING); + Future> result = initiateStreaming(eventLoop, + settings, acceptOutbound.min); + result.awaitUninterruptibly(); + Assert.assertTrue(result.isSuccess()); + + return result.getNow(); } finally { inbound.close().await(1L, TimeUnit.SECONDS); } } - @Test - public void testBothCurrentVersion() throws InterruptedException, ExecutionException - { - Result result = handshake(current_version, minimum_version, current_version); - Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); - result.success().channel.close(); - } @Test - public void testSendCompatibleOldVersion() throws InterruptedException, ExecutionException + public void testIncompatibleVersion() throws InterruptedException, ExecutionException { - Result result = handshake(current_version, current_version, current_version + 1, current_version +1, current_version + 2); - Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); - Assert.assertEquals(current_version + 1, result.success().messagingVersion); - result.success().channel.close(); + Result nowResult = streamingConnect(new AcceptVersions(current_version + 1, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + Assert.assertNull(nowResult.success()); + System.out.println(nowResult.outcome); + Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.incompatible().closestSupportedVersion); + Assert.assertEquals(current_version, nowResult.incompatible().maxMessagingVersion); } @Test - public void testSendCompatibleFutureVersion() throws InterruptedException, ExecutionException + public void testCompatibleVersion() throws InterruptedException, ExecutionException { - Result result = handshake(current_version + 1, current_version - 1, current_version + 1); - Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); - Assert.assertEquals(current_version, result.success().messagingVersion); - result.success().channel.close(); + Result nowResult = streamingConnect(new AcceptVersions(MessagingService.minimum_version, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + //new AcceptVersions(VERSION_40, VERSION_40), new AcceptVersions(VERSION_40, VERSION_40)); + Assert.assertNotNull(nowResult.success()); + System.out.println(nowResult.outcome); + Assert.assertNotNull(nowResult.success().channel); + Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.success().messagingVersion); } - @Test - public void testSendIncompatibleFutureVersion() throws InterruptedException, ExecutionException - { - Result result = handshake(current_version + 1, current_version + 1, current_version + 1); - Assert.assertEquals(Result.Outcome.INCOMPATIBLE, result.outcome); - Assert.assertEquals(current_version, result.incompatible().closestSupportedVersion); - Assert.assertEquals(current_version, result.incompatible().maxMessagingVersion); - } - @Test - public void testSendIncompatibleOldVersion() throws InterruptedException, ExecutionException - { - Result result = handshake(current_version + 1, current_version + 1, current_version + 1, current_version + 2, current_version + 3); - Assert.assertEquals(Result.Outcome.INCOMPATIBLE, result.outcome); - Assert.assertEquals(current_version + 2, result.incompatible().closestSupportedVersion); - Assert.assertEquals(current_version + 3, result.incompatible().maxMessagingVersion); - } - @Test - public void testSendCompatibleMaxVersionPre40() throws InterruptedException, ExecutionException - { - Result result = handshake(VERSION_3014, VERSION_30, VERSION_3014, VERSION_30, VERSION_3014); - Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); - Assert.assertEquals(VERSION_3014, result.success().messagingVersion); - result.success().channel.close(); - } - @Test - public void testSendCompatibleFutureVersionPre40() throws InterruptedException, ExecutionException + private OutboundConnection initiateOutbound(InetAddressAndPort endpoint, boolean optional) throws ClosedChannelException { - Result result = handshake(VERSION_3014, VERSION_30, VERSION_3014, VERSION_30, VERSION_30); - Assert.assertEquals(Result.Outcome.RETRY, result.outcome); - Assert.assertEquals(VERSION_30, result.retry().withMessagingVersion); + final OutboundConnectionSettings settings = new OutboundConnectionSettings(endpoint) + .withAcceptVersions(new AcceptVersions(minimum_version, current_version)) + .withDefaults(ConnectionCategory.MESSAGING) + .withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx = t)) + .withFrom(FROM_ADDR); + OutboundConnections outboundConnections = OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings); + GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", new ArrayList<>(0)); + Message message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn); + OutboundConnection outboundConnection = outboundConnections.connectionFor(message); + outboundConnection.enqueue(message); + return outboundConnection; } - - @Test - public void testSendIncompatibleFutureVersionPre40() throws InterruptedException, ExecutionException + private static class HandshakeAcknowledgeChecker implements OutboundDebugCallbacks { - Result result = handshake(VERSION_3014, VERSION_3014, VERSION_3014, VERSION_30, VERSION_30); - Assert.assertEquals(Result.Outcome.INCOMPATIBLE, result.outcome); - Assert.assertEquals(-1, result.incompatible().closestSupportedVersion); - Assert.assertEquals(VERSION_30, result.incompatible().maxMessagingVersion); - } + private final AtomicInteger acks = new AtomicInteger(0); + private final Consumer fail; - @Test - public void testSendCompatibleOldVersionPre40() throws InterruptedException - { - try - { - handshake(VERSION_30, VERSION_30, VERSION_3014, VERSION_3014, VERSION_3014); - Assert.fail("Should have thrown"); - } - catch (ExecutionException e) + private HandshakeAcknowledgeChecker(Consumer fail) { - Assert.assertTrue(e.getCause() instanceof ClosedChannelException); + this.fail = fail; } - } - @Test - public void testSendIncompatibleOldVersionPre40() throws InterruptedException - { - try + @Override + public void onSendSmallFrame(int messageCount, int payloadSizeInBytes) { - handshake(VERSION_30, VERSION_30, VERSION_30, VERSION_3014, VERSION_3014); - Assert.fail("Should have thrown"); } - catch (ExecutionException e) + + @Override + public void onSentSmallFrame(int messageCount, int payloadSizeInBytes) { - Assert.assertTrue(e.getCause() instanceof ClosedChannelException); } - } - - @Test - public void testSendCompatibleOldVersion40() throws InterruptedException, ExecutionException - { - Result result = handshake(VERSION_30, VERSION_30, VERSION_30, VERSION_30, current_version); - Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); - Assert.assertEquals(VERSION_30, result.success().messagingVersion); - } - @Test - public void testSendIncompatibleOldVersion40() throws InterruptedException - { - try + @Override + public void onFailedSmallFrame(int messageCount, int payloadSizeInBytes) { - Assert.fail(Objects.toString(handshake(VERSION_30, VERSION_30, VERSION_30, current_version, current_version))); } - catch (ExecutionException e) + + @Override + public void onConnect(int messagingVersion, OutboundConnectionSettings settings) { - Assert.assertTrue(e.getCause() instanceof ClosedChannelException); + if (acks.incrementAndGet() > 1) + fail.accept(new AssertionError("Handshake was acknowledged more than once")); } } - - @Test // fairly contrived case, but since we introduced logic for testing we need to be careful it doesn't make us worse - public void testSendToFuturePost40BelievedToBePre40() throws InterruptedException, ExecutionException - { - Result result = handshake(VERSION_30, VERSION_30, current_version, VERSION_30, current_version + 1); - Assert.assertEquals(Result.Outcome.SUCCESS, result.outcome); - Assert.assertEquals(VERSION_30, result.success().messagingVersion); - } -} +} \ No newline at end of file From 51290f89aec401fcb3572d2fa516430eaa9d72f9 Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Sun, 2 Nov 2025 23:13:40 -0800 Subject: [PATCH 4/5] Add StreamingTest for 4.1 --- test/unit/org/apache/cassandra/net/StreamingTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/test/unit/org/apache/cassandra/net/StreamingTest.java b/test/unit/org/apache/cassandra/net/StreamingTest.java index 082c4ff0c05b..e4a377a1dfdc 100644 --- a/test/unit/org/apache/cassandra/net/StreamingTest.java +++ b/test/unit/org/apache/cassandra/net/StreamingTest.java @@ -42,7 +42,6 @@ import static org.apache.cassandra.net.MessagingService.current_version; import static org.apache.cassandra.net.MessagingService.minimum_version; -//import static org.apache.cassandra.net.MessagingService.VERSION_40; import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming; import static org.apache.cassandra.net.OutboundConnectionInitiator.Result; public class StreamingTest @@ -99,7 +98,6 @@ public void testIncompatibleVersion() throws InterruptedException, ExecutionExce { Result nowResult = streamingConnect(new AcceptVersions(current_version + 1, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); Assert.assertNull(nowResult.success()); - System.out.println(nowResult.outcome); Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome); Assert.assertEquals(current_version, nowResult.incompatible().closestSupportedVersion); Assert.assertEquals(current_version, nowResult.incompatible().maxMessagingVersion); @@ -108,10 +106,9 @@ public void testIncompatibleVersion() throws InterruptedException, ExecutionExce @Test public void testCompatibleVersion() throws InterruptedException, ExecutionException { - Result nowResult = streamingConnect(new AcceptVersions(MessagingService.minimum_version, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); - //new AcceptVersions(VERSION_40, VERSION_40), new AcceptVersions(VERSION_40, VERSION_40)); + Result nowResult = streamingConnect(new AcceptVersions(current_version, current_version + 1), new AcceptVersions(current_version, current_version + 1)); +// Result nowResult = streamingConnect( new AcceptVersions(VERSION_40, VERSION_40 + 1), new AcceptVersions(VERSION_40, VERSION_40 + 1)); Assert.assertNotNull(nowResult.success()); - System.out.println(nowResult.outcome); Assert.assertNotNull(nowResult.success().channel); Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome); Assert.assertEquals(current_version, nowResult.success().messagingVersion); From 8eae9d30da6afb90fbdb3c795d8b54270660c5d3 Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Sun, 2 Nov 2025 23:31:44 -0800 Subject: [PATCH 5/5] Remove instanceof for subtype of Success (MessagingSuccess) --- src/java/org/apache/cassandra/net/OutboundConnection.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 28e2e3bf7a12..429c3e440ca8 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -52,6 +52,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSuccess; +import org.apache.cassandra.net.OutboundConnectionInitiator.Result.Success; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -1118,7 +1119,7 @@ void onCompletedHandshake(Result result) case SUCCESS: // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections assert !state.isClosed(); - if(result.success() instanceof MessagingSuccess) { + if(result.success() != null) { MessagingSuccess success = (MessagingSuccess) result.success(); debug.onConnect(success.messagingVersion, settings); state.disconnected().maintenance.cancel(false);