Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added modules/accord/.gradle/8.5/checksums/checksums.lock
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
2 changes: 2 additions & 0 deletions modules/accord/.gradle/buildOutputCleanup/cache.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#Wed Oct 22 21:00:52 PDT 2025
gradle.version=8.5
Binary file not shown.
Binary file added modules/accord/.gradle/file-system.probe
Binary file not shown.
Binary file added modules/accord/.gradle/noVersion/buildLogic.lock
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#Wed Oct 22 21:00:52 PDT 2025
gradle.version=8.5
Binary file not shown.
Binary file added modules/accord/buildSrc/.gradle/file-system.probe
Binary file not shown.
131 changes: 68 additions & 63 deletions src/java/org/apache/cassandra/net/OutboundConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.slf4j.Logger;
Expand Down Expand Up @@ -61,6 +62,7 @@
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.net.InternodeConnectionUtils.isSSLError;
import static org.apache.cassandra.net.MessagingService.current_version;
import static org.apache.cassandra.net.OutboundConnectionInitiator.*;
Expand Down Expand Up @@ -127,15 +129,15 @@ public class OutboundConnection

/** Used in logging statements to lazily build a human-readable number of pending bytes. */
private final Object readablePendingBytes =
new Object() { @Override public String toString() { return prettyPrintMemory(pendingBytes()); } };
new Object() { @Override public String toString() { return prettyPrintMemory(pendingBytes()); } };

/** Used in logging statements to lazily build a human-readable number of reserve endpoint bytes in use. */
private final Object readableReserveEndpointUsing =
new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.endpoint.using()); } };
new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.endpoint.using()); } };

/** Used in logging statements to lazily build a human-readable number of reserve global bytes in use. */
private final Object readableReserveGlobalUsing =
new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.global.using()); } };
new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.global.using()); } };

private volatile long submittedCount = 0; // updated with cas
private volatile long overloadedCount = 0; // updated with cas
Expand Down Expand Up @@ -314,8 +316,8 @@ void cancel()
this.debug = template.debug;
this.queue = new OutboundMessageQueue(approxTime, this::onExpired);
this.delivery = type == ConnectionType.LARGE_MESSAGES
? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor)
: new EventLoopDelivery();
? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor)
: new EventLoopDelivery();
setDisconnected();
}

Expand Down Expand Up @@ -456,14 +458,14 @@ private void releaseCapacity(long count, long bytes)
private void onOverloaded(Message<?> message)
{
overloadedCountUpdater.incrementAndGet(this);

int canonicalSize = canonicalSize(message);
overloadedBytesUpdater.addAndGet(this, canonicalSize);

noSpamLogger.warn("{} overloaded; dropping {} message (queue: {} local, {} endpoint, {} global)",
this, FBUtilities.prettyPrintMemory(canonicalSize),
readablePendingBytes, readableReserveEndpointUsing, readableReserveGlobalUsing);
this, FBUtilities.prettyPrintMemory(canonicalSize),
readablePendingBytes, readableReserveEndpointUsing, readableReserveGlobalUsing);

callbacks.onOverloaded(message, template.to);
}

Expand All @@ -474,7 +476,13 @@ private void onOverloaded(Message<?> message)
*/
private boolean onExpired(Message<?> message)
{
noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb());
if (logger.isTraceEnabled())
logger.trace("{} dropping message of type {} with payload {} whose timeout ({}ms) expired before reaching the network. {}ms elapsed after expiration. {}ms since creation.",
id(), message.verb(), message.payload, DatabaseDescriptor.getRpcTimeout(MILLISECONDS),
NANOSECONDS.toMillis(Clock.Global.nanoTime() - message.expiresAtNanos()),
message.elapsedSinceCreated(MILLISECONDS));
else
noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb());
releaseCapacity(1, canonicalSize(message));
expiredCount += 1;
expiredBytes += canonicalSize(message);
Expand Down Expand Up @@ -999,9 +1007,9 @@ boolean doRun(Established established)
{
out.discard();
if (out.flushed() > 0 ||
isCausedBy(t, cause -> isConnectionReset(cause)
|| cause instanceof Errors.NativeIoException
|| cause instanceof AsyncChannelOutputPlus.FlushException))
isCausedBy(t, cause -> isConnectionReset(cause)
|| cause instanceof Errors.NativeIoException
|| cause instanceof AsyncChannelOutputPlus.FlushException))
{
// close the channel, and wait for eventLoop to execute
disconnectNow(established).awaitUninterruptibly();
Expand Down Expand Up @@ -1119,42 +1127,39 @@ void onCompletedHandshake(Result<MessagingSuccess> result)
// it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections
assert !state.isClosed();

MessagingSuccess success = result.success();
debug.onConnect(success.messagingVersion, settings);
state.disconnected().maintenance.cancel(false);

FrameEncoder.PayloadAllocator payloadAllocator = success.allocator;
Channel channel = success.channel;
Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings);
state = established;
channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx)
{
disconnectNow(established);
ctx.fireChannelInactive();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
try
{
invalidateChannel(established, cause);
if (result.success() != null) {
MessagingSuccess success = (MessagingSuccess) result.success();
debug.onConnect(success.messagingVersion, settings);
state.disconnected().maintenance.cancel(false);

FrameEncoder.PayloadAllocator payloadAllocator = success.allocator;
Channel channel = success.channel;
Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings);
state = established;
channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
disconnectNow(established);
ctx.fireChannelInactive();
}
catch (Throwable t)
{
logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
invalidateChannel(established, cause);
} catch (Throwable t) {
logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
}
}
}
});
++successfulConnections;
});
++successfulConnections;

logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}",
logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}",
id(true),
success.messagingVersion,
settings.framing,
encryptionConnectionSummary(channel));
}
break;

case RETRY:
Expand All @@ -1171,7 +1176,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
case INCOMPATIBLE:
// we cannot communicate with this peer given its messaging version; mark this as any other failure, and continue trying
Throwable t = new IOException(String.format("Incompatible peer: %s, messaging version: %s",
settings.to, result.incompatible().maxMessagingVersion));
settings.to, result.incompatible().maxMessagingVersion));
t.fillInStackTrace();
onFailure(t);
break;
Expand Down Expand Up @@ -1205,7 +1210,7 @@ private void attempt(Promise<Result<MessagingSuccess>> result, boolean sslFallba
if (knownMessagingVersion != messagingVersion)
{
logger.trace("Endpoint version changed from {} to {} since connection initialized, updating.",
messagingVersion, knownMessagingVersion);
messagingVersion, knownMessagingVersion);
messagingVersion = knownMessagingVersion;
}

Expand All @@ -1221,20 +1226,20 @@ private void attempt(Promise<Result<MessagingSuccess>> result, boolean sslFallba
// while talking to older nodes in the cluster which are configured to make NON-SSL connections
SslFallbackConnectionType[] fallBackSslFallbackConnectionTypes = SslFallbackConnectionType.values();
int index = sslFallbackEnabled && settings.withEncryption() && settings.encryption.getOptional() ?
(int) (connectionAttempts - 1) % fallBackSslFallbackConnectionTypes.length : 0;
(int) (connectionAttempts - 1) % fallBackSslFallbackConnectionTypes.length : 0;
if (fallBackSslFallbackConnectionTypes[index] != SslFallbackConnectionType.SERVER_CONFIG)
{
logger.info("ConnectionId {} is falling back to {} reconnect strategy for retry", id(), fallBackSslFallbackConnectionTypes[index]);
}
initiateMessaging(eventLoop, type, fallBackSslFallbackConnectionTypes[index], settings, result)
.addListener(future -> {
if (future.isCancelled())
return;
if (future.isSuccess()) //noinspection unchecked
onCompletedHandshake((Result<MessagingSuccess>) future.getNow());
else
onFailure(future.cause());
});
.addListener(future -> {
if (future.isCancelled())
return;
if (future.isSuccess()) //noinspection unchecked
onCompletedHandshake((Result<MessagingSuccess>) future.getNow());
else
onFailure(future.cause());
});
}

Future<Result<MessagingSuccess>> initiate()
Expand Down Expand Up @@ -1387,10 +1392,10 @@ private Future<?> disconnectNow(Established closeIfIs)
if (hasPending())
delivery.execute();
closeIfIs.channel.close()
.addListener(future -> {
if (!future.isSuccess())
logger.info("Problem closing channel {}", closeIfIs, future.cause());
});
.addListener(future -> {
if (!future.isSuccess())
logger.info("Problem closing channel {}", closeIfIs, future.cause());
});
}
});
}
Expand Down Expand Up @@ -1477,7 +1482,7 @@ public Future<Void> close(boolean flushQueue)
{
assert state.isEstablished();
state.established().channel.close()
.addListener(new PromiseNotifier<>(closing));
.addListener(new PromiseNotifier<>(closing));
}
}
catch (Throwable t)
Expand Down Expand Up @@ -1604,8 +1609,8 @@ private String id(boolean includeReal)
Channel channel = established.channel;
OutboundConnectionSettings settings = established.settings;
return SocketFactory.channelId(settings.from, (InetSocketAddress) channel.localAddress(),
settings.to, (InetSocketAddress) channel.remoteAddress(),
type, channel.id().asShortText());
settings.to, (InetSocketAddress) channel.remoteAddress(),
type, channel.id().asShortText());
}

private String id()
Expand Down Expand Up @@ -1731,7 +1736,7 @@ int messagingVersion()
{
State state = this.state;
return state.isEstablished() ? state.established().messagingVersion
: template.endpointToVersion().get(template.to);
: template.endpointToVersion().get(template.to);
}

@VisibleForTesting
Expand Down Expand Up @@ -1776,4 +1781,4 @@ Limit unsafeGetEndpointReserveLimits()
{
return reserveCapacityInBytes.endpoint;
}
}
}
Loading