From f4b3d8ecd9de9be1d3f2fca0e15524c00be4e88f Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Mon, 8 Dec 2025 14:57:20 +0000 Subject: [PATCH] Ensure that gossip state for LEFT nodes is expired eventually By default the expiry time is calculated on each peer independently. It can be made to converge by disabling gossip quarantine using the configuration setting gossip_quarantine_disabled or via a hotprop on GossiperMBean. Patch by Sam Tunnicliffe; reviewed by XXX for CASSANDRA-21035 --- .../org/apache/cassandra/config/Config.java | 25 +++ .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../apache/cassandra/gms/EndpointState.java | 2 +- .../org/apache/cassandra/gms/Gossiper.java | 168 +++++++++++++++--- .../apache/cassandra/gms/GossiperMBean.java | 4 + .../apache/cassandra/gms/VersionedValue.java | 5 + .../cassandra/service/StorageService.java | 13 +- .../tcm/listeners/LegacyStateListener.java | 3 +- .../listeners/UpgradeMigrationListener.java | 2 + .../cassandra/tcm/membership/Directory.java | 5 +- .../tcm/sequences/SingleNodeSequences.java | 10 +- .../tcm/transformations/Assassinate.java | 10 +- .../distributed/shared/ClusterUtils.java | 3 +- .../cassandra/distributed/test/CASTest.java | 20 ++- .../distributed/test/CASTestBase.java | 5 +- .../distributed/test/GossipTest.java | 2 +- .../GossipExpiryAfterAssassinateTest.java | 48 +++++ .../GossipExpiryAfterDecommissionTest.java | 33 ++++ .../GossipExpiryAfterRemoveNodeTest.java | 47 +++++ .../test/gossip/GossipExpiryTestBase.java | 165 +++++++++++++++++ .../test/log/ClusterMetadataTestHelper.java | 27 ++- .../apache/cassandra/gms/GossiperTest.java | 8 + .../tcm/membership/MembershipUtils.java | 11 ++ .../tcm/transformations/PrepareJoinTest.java | 10 +- 24 files changed, 586 insertions(+), 50 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 986346da4158..9742cabada7f 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -1520,4 +1520,29 @@ public enum CQLStartTime public boolean enforce_native_deadline_for_hints = false; public boolean paxos_repair_race_wait = true; + + /** + * If true, gossip state updates for nodes which have left the cluster will continue to be processed while the + * node is still present in ClusterMetadata. This enables the gossip expiry time for those nodes (the deadline + * after which their state is fully purged from gossip) to converge across the remaining nodes in the cluster. + * This is a change from previous behaviour as historically once a node has advertised a LEFT status further + * updates to gossip state for it are ignored for a period of time to prevent flapping if older/stale states + * are encountered. + * Following CEP-21, most significant state changes are handled by the cluster metadata log, so resurrection + * of left nodes is not a problem for gossip to solve and so quarantine is not really necessary. However, + * FailureDetector does still use gossip messages to assess node health and some external systems still use gossip + * state to inform decisions about topology/node health/etc. For those reasons, for now the disabling of quarantine + * is off by default and hot-proppable. + * + * With quarantine still in effect, expiry from gossip of LEFT nodes will occur at different times on each peer. + * Also, when there are LEFT nodes in gossip, the state will never fully converge across the cluster as each node + * will have its own expiry time for a LEFT peer. + * + * With quarantine disabled the STATUS_WITH_PORT values for the left node which include the expiry time will + * converge and peers will all evict it from gossip after the same deadline. + * + * Eventually, this configuration option should be removed and quarantine disabled entirely for clusters running + * 6.0 and later. + */ + public volatile boolean gossip_quarantine_disabled = false; } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 23e0aa834186..c6382a7aeca2 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -6109,4 +6109,14 @@ public static void setPartitioner(String name) { partitioner = FBUtilities.newPartitioner(name); } + + public static boolean getGossipQuarantineDisabled() + { + return conf.gossip_quarantine_disabled; + } + + public static void setGossipQuarantineDisabled(boolean disabled) + { + conf.gossip_quarantine_disabled = disabled; + } } diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 26fe33ec698c..41a79caf7f59 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -324,7 +324,7 @@ public CassandraVersion getReleaseVersion() public String toString() { View view = ref.get(); - return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState; + return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState + ", isAlive = " + isAlive; } public boolean isSupersededBy(EndpointState that) diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 83bf5169af65..9d4a09e7f269 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -87,6 +87,7 @@ import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.utils.FBUtilities; @@ -107,6 +108,10 @@ import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED; import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS; import static org.apache.cassandra.gms.VersionedValue.HIBERNATE; +import static org.apache.cassandra.gms.VersionedValue.REMOVED_TOKEN; +import static org.apache.cassandra.gms.VersionedValue.REMOVING_TOKEN; +import static org.apache.cassandra.gms.VersionedValue.SHUTDOWN; +import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT; import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.ECHO_REQ; @@ -137,8 +142,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, private static final ScheduledExecutorPlus executor = executorFactory().scheduled("GossipTasks"); static final ApplicationState[] STATES = ApplicationState.values(); - static final List DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, - VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE); + static final List DEAD_STATES = Arrays.asList(REMOVING_TOKEN, REMOVED_TOKEN, STATUS_LEFT, HIBERNATE); static ArrayList SILENT_SHUTDOWN_STATES = new ArrayList<>(); static { @@ -185,7 +189,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, /* map where key is endpoint and value is timestamp when this endpoint was removed from * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time * after removal to prevent nodes from falsely reincarnating during the time when removal - * gossip gets propagated to all nodes */ + * gossip gets propagated to all nodes. + * Note: in future, this need only be used when ClusterMetadataService is in the GOSSIP state, + * i.e. during the major upgrade to the version with CEP-21, but before the CMS is initialized. + * In this state, gossip is still used to propagate changes to broadcast address and release + * version. Once the CMS initialization is complete, this is no longer necessary. + * Currently in order to support a controlled rollout of that change to behaviour, quarantine + * is still used by default, but can be disabled via config (gossip_quarantine_disabled) or + * JMX (GossiperMBean::setQuarantineDisabled) + */ private final Map justRemovedEndpoints = new ConcurrentHashMap<>(); private final Map expireTimeEndpointMap = new ConcurrentHashMap<>(); @@ -450,14 +462,7 @@ private static boolean isShutdown(EndpointState epState) public static boolean isShutdown(VersionedValue vv) { - if (vv == null) - return false; - - String value = vv.value; - String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); - assert (pieces.length > 0); - String state = pieces[0]; - return state.equals(VersionedValue.SHUTDOWN); + return matchesStatusString(vv, SHUTDOWN); } public static boolean isHibernate(EndpointState epState) @@ -469,15 +474,39 @@ public static boolean isHibernate(EndpointState epState) } public static boolean isHibernate(VersionedValue vv) + { + return matchesStatusString(vv, HIBERNATE); + } + + public static boolean isLeft(VersionedValue vv) + { + return matchesStatusString(vv, STATUS_LEFT); + } + + private static boolean matchesStatusString(VersionedValue vv, String toMatch) { if (vv == null) return false; - String value = vv.value; - String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); + String[] pieces = vv.splitValue(); assert (pieces.length > 0); String state = pieces[0]; - return state.equals(VersionedValue.HIBERNATE); + return state.equals(toMatch); + } + + public static long extractExpireTime(String[] pieces) + { + if (pieces.length < 3) + return 0L; + try + { + return Long.parseLong(pieces[2]); + } + catch (NumberFormatException e) + { + logger.debug("Invalid value found for expire time ({}), ignoring", pieces[2]); + return 0L; + } } public static void runInGossipStageBlocking(Runnable runnable) @@ -696,10 +725,21 @@ private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpi { if (disableEndpointRemoval) return; + + // Quarantine is only necessary while upgrading from gossip-driven management of cluster metadata + if (getQuarantineDisabled() && ClusterMetadata.current().epoch.isAfter(Epoch.UPGRADE_GOSSIP)) + return; + justRemovedEndpoints.put(endpoint, quarantineExpiration); GossiperDiagnostics.quarantinedEndpoint(this, endpoint, quarantineExpiration); } + public void clearQuarantinedEndpoints() + { + logger.info("Clearing quarantined endpoints"); + justRemovedEndpoints.clear(); + } + /** * The gossip digest is built based on randomization * rather than just looping through the collection of live endpoints. @@ -948,15 +988,14 @@ void doStatusCheck() } // check for dead state removal - long expireTime = getExpireTimeForEndpoint(endpoint); - if (!epState.isAlive() && (now > expireTime) - && (!metadata.directory.allAddresses().contains(endpoint))) + if (!epState.isAlive() && (!metadata.directory.allJoinedEndpoints().contains(endpoint))) { - if (logger.isDebugEnabled()) + long expireTime = getExpireTimeForEndpoint(endpoint); + if (now > expireTime) { - logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime); + logger.info("Reached gossip expiry time for endpoint : {} ({})", endpoint, expireTime); + runInGossipStageBlocking(() -> evictFromMembership(endpoint)); } - runInGossipStageBlocking(() -> evictFromMembership(endpoint)); } } } @@ -1897,11 +1936,15 @@ public int getCurrentGenerationNumber(String address) throws UnknownHostExceptio public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long expireTime) { - if (logger.isDebugEnabled()) + if (expireTime == 0L) + { + logger.debug("Supplied expire time for {} was 0, not recording", endpoint); + } + else { logger.debug("adding expire time for endpoint : {} ({})", endpoint, expireTime); + expireTimeEndpointMap.put(endpoint, expireTime); } - expireTimeEndpointMap.put(endpoint, expireTime); } public static long computeExpireTime() @@ -2104,6 +2147,50 @@ public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep) MessagingService.instance().send(message, ep); } + public void unsafeBroadcastLeftStatus(InetAddressAndPort left, + Collection tokens, + Iterable sendTo) + { + runInGossipStageBlocking(() -> { + EndpointState epState = endpointStateMap.get(left); + if (epState == null) + { + logger.info("No gossip state for node {}", left); + return; + } + + NodeState state = ClusterMetadata.current().directory.peerState(left); + if (state != NodeState.LEFT) + { + logger.info("Node Status for {} is not LEFT ({})", left, state); + return; + } + + EndpointState toSend = new EndpointState(epState); + toSend.forceNewerGenerationUnsafe(); + toSend.markDead(); + VersionedValue value = StorageService.instance.valueFactory.left(tokens, computeExpireTime()); + + if (left.equals(getBroadcastAddressAndPort())) + { + // Adding local state bumps the value's version. To keep this consistent across + // the cluster, re-fetch it before broadcasting. + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, value); + value = Gossiper.instance.endpointStateMap.get(getBroadcastAddressAndPort()) + .getApplicationState(ApplicationState.STATUS_WITH_PORT); + } + + toSend.addApplicationState(ApplicationState.STATUS_WITH_PORT, value); + GossipDigestAck2 payload = new GossipDigestAck2(Collections.singletonMap(left, toSend)); + logger.info("Sending app state with status {} to {}", value.value, sendTo); + for (InetAddressAndPort ep : sendTo) + { + Message message = Message.out(Verb.GOSSIP_DIGEST_ACK2, payload); + MessagingService.instance().send(message, ep); + } + }); + } + private void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState epstate) { checkProperThreadForStateMutation(); @@ -2221,9 +2308,31 @@ private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collecti newValue = valueFactory.hibernate(true); break; } + if (isLocal && !StorageService.instance.shouldJoinRing()) break; - newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue); + + // If quarantine has been disabled and we have already seen a LEFT status for a remote peer + // which originated from the peer itself or the node which coordinated its removal (and so + // has a version > 0), keep it as this is how we ensure the gossip expiry time encoded in + // the status string converges across peers. + // Should a node leave and then rejoin after resetting its local state (i.e. wipe and + // rejoin), it is automatically unregistered which removes all gossip state for it so there + // will be no oldValue in that case. + // + // Note: don't reorder these conditions as isLeft includes a null check + if (getQuarantineDisabled() && !isLocal && Gossiper.isLeft(oldValue) && oldValue.version > 0) + { + logger.debug("Already seen a LEFT status for {} with a non-zero version, " + + "dropping derived value {}", endpoint, newValue); + newValue = oldValue; + } + else + { + newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue); + if (Gossiper.isLeft(newValue)) + Gossiper.instance.addExpireTimeForEndpoint(endpoint, Gossiper.extractExpireTime(newValue.splitValue())); + } break; default: newValue = oldValue; @@ -2269,4 +2378,17 @@ public void triggerRoundWithCMS() sendGossip(message, cms); } } + + @Override + public boolean getQuarantineDisabled() + { + return DatabaseDescriptor.getGossipQuarantineDisabled(); + } + + @Override + public void setQuarantineDisabled(boolean enabled) + { + logger.info("Setting gossip_quarantine_disabled: {}", enabled); + DatabaseDescriptor.setGossipQuarantineDisabled(enabled); + } } diff --git a/src/java/org/apache/cassandra/gms/GossiperMBean.java b/src/java/org/apache/cassandra/gms/GossiperMBean.java index 0552883a60e5..3d46887b0eda 100644 --- a/src/java/org/apache/cassandra/gms/GossiperMBean.java +++ b/src/java/org/apache/cassandra/gms/GossiperMBean.java @@ -43,4 +43,8 @@ public interface GossiperMBean public boolean getLooseEmptyEnabled(); public void setLooseEmptyEnabled(boolean enabled); + + public boolean getQuarantineDisabled(); + + public void setQuarantineDisabled(boolean disabled); } diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index b03211b05c64..126aecf70336 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -159,6 +159,11 @@ public byte[] toBytes() return value.getBytes(ISO_8859_1); } + public String[] splitValue() + { + return value.split(DELIMITER_STR, -1); + } + private static String versionString(String... args) { return StringUtils.join(args, VersionedValue.DELIMITER); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a5703fe6018c..2abbc44ea0e0 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2158,6 +2158,12 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio Gossiper.instance.markDead(endpoint, epState); }); } + else if (Gossiper.isLeft(value)) + { + long expireTime = Gossiper.extractExpireTime(value.splitValue()); + logger.info("Node state LEFT detected, setting or updating expire time {}", expireTime); + Gossiper.instance.addExpireTimeForEndpoint(endpoint, expireTime); + } } if (epState == null || Gossiper.instance.isDeadState(epState)) @@ -2207,7 +2213,7 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio updateNetVersion(endpoint, value); break; case STATUS_WITH_PORT: - String[] pieces = splitValue(value); + String[] pieces = value.splitValue(); String moveName = pieces[0]; if (moveName.equals(VersionedValue.SHUTDOWN)) logger.info("Node {} state jump to shutdown", endpoint); @@ -2226,11 +2232,6 @@ else if (moveName.equals(VersionedValue.STATUS_NORMAL)) } } - private static String[] splitValue(VersionedValue value) - { - return value.value.split(VersionedValue.DELIMITER_STR, -1); - } - public static void updateIndexStatus(InetAddressAndPort endpoint, VersionedValue versionedValue) { IndexStatusManager.instance.receivePeerIndexStatus(endpoint, versionedValue); diff --git a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java index fa1fe9d25ba0..3f77db174f7f 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java @@ -65,8 +65,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean next.tokenMap.lastModified().equals(prev.tokenMap.lastModified())) return; - Set removedAddr = Sets.difference(new HashSet<>(prev.directory.allAddresses()), - new HashSet<>(next.directory.allAddresses())); + Set removedAddr = Sets.difference(prev.directory.allAddresses(), next.directory.allAddresses()); Set changed = new HashSet<>(); for (NodeId node : next.directory.peerIds()) diff --git a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java index ff24e17b9d95..67931af1779c 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java @@ -35,5 +35,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId()); Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next); + if (Gossiper.instance.getQuarantineDisabled()) + Gossiper.instance.clearQuarantinedEndpoints(); } } diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index 8e73f7f34189..ec5f2c4a9dd9 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -385,9 +384,9 @@ public boolean isEmpty() * those cases use allJoinedEndpoints. * @return */ - public ImmutableList allAddresses() + public Set allAddresses() { - return ImmutableList.copyOf(peers.values()); + return peers.values(); } public NavigableSet peerIds() diff --git a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java index 58c5f024f9c0..9da95b8aff2b 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java +++ b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java @@ -18,6 +18,7 @@ package org.apache.cassandra.tcm.sequences; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -38,6 +39,7 @@ import org.apache.cassandra.tcm.transformations.CancelInProgressSequence; import org.apache.cassandra.tcm.transformations.PrepareLeave; import org.apache.cassandra.tcm.transformations.PrepareMove; +import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.service.StorageService.Mode.LEAVING; import static org.apache.cassandra.service.StorageService.Mode.MOVE_FAILED; @@ -73,7 +75,7 @@ static void decommission(boolean shutdownNetworking, boolean force) logger.debug("DECOMMISSIONING"); NodeId self = metadata.myNodeId(); - + Collection tokens = metadata.tokenMap.tokens(self); ReconfigureCMS.maybeReconfigureCMS(metadata, getBroadcastAddressAndPort()); MultiStepOperation inProgress = metadata.inProgressSequences.get(self); @@ -95,6 +97,9 @@ else if (InProgressSequences.isLeave(inProgress)) } InProgressSequences.finishInProgressSequences(self); + Gossiper.instance.unsafeBroadcastLeftStatus(FBUtilities.getBroadcastAddressAndPort(), + tokens, + metadata.directory.allJoinedEndpoints()); if (shutdownNetworking) StorageService.instance.shutdownNetworking(); } @@ -134,12 +139,13 @@ static void removeNode(NodeId toRemove, boolean force) ReconfigureCMS.maybeReconfigureCMS(metadata, endpoint); logger.info("starting removenode with {} {}", metadata.epoch, toRemove); - + Collection tokens = metadata.tokenMap.tokens(toRemove); ClusterMetadataService.instance().commit(new PrepareLeave(toRemove, force, ClusterMetadataService.instance().placementProvider(), LeaveStreams.Kind.REMOVENODE)); InProgressSequences.finishInProgressSequences(toRemove); + Gossiper.instance.unsafeBroadcastLeftStatus(endpoint, tokens, metadata.directory.allJoinedEndpoints()); } static void abortRemoveNode(String nodeId) diff --git a/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java b/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java index 76bca253ee93..f1bb6d50d317 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java +++ b/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java @@ -18,8 +18,12 @@ package org.apache.cassandra.tcm.transformations; +import java.util.Collection; + import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -64,8 +68,10 @@ public static void assassinateEndpoint(InetAddressAndPort endpoint) ReconfigureCMS.maybeReconfigureCMS(metadata, endpoint); NodeId nodeId = metadata.directory.peerId(endpoint); - ClusterMetadataService.instance().commit(new Assassinate(nodeId, - ClusterMetadataService.instance().placementProvider())); + Collection tokens = metadata.tokenMap.tokens(nodeId); + ClusterMetadataService.instance() + .commit(new Assassinate(nodeId, ClusterMetadataService.instance().placementProvider())); + Gossiper.instance.unsafeBroadcastLeftStatus(endpoint, tokens, metadata.directory.allJoinedEndpoints()); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index a362938aba76..ab26d03b8fea 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -46,7 +46,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import accord.topology.EpochReady; @@ -802,7 +801,7 @@ public static Epoch snapshotClusterMetadata(IInvokableInstance inst) public static Map getPeerEpochs(IInvokableInstance requester) { Map map = requester.callOnInstance(() -> { - ImmutableList peers = ClusterMetadata.current().directory.allAddresses(); + Set peers = ClusterMetadata.current().directory.allAddresses(); CountDownLatch latch = CountDownLatch.newCountDownLatch(peers.size()); Map epochs = new ConcurrentHashMap<>(peers.size()); peers.forEach(peer -> { diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java index a08ba58a06ba..15c532422a61 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java @@ -105,13 +105,17 @@ public static void beforeClass() throws Throwable ParameterizedClass seeds = new ParameterizedClass(SimpleSeedProvider.class.getName(), Collections.singletonMap("seeds", "127.0.0.2")); + // TODO: This currently requires the accord service to be disabled as some tests remove a node from the + // cluster and re-join it using the same broadcast address. See CASSANDRA-21026 Consumer conf = config -> config .set("paxos_variant", "v2") .set("write_request_timeout", REQUEST_TIMEOUT) .set("cas_contention_timeout", CONTENTION_TIMEOUT) .set("request_timeout", REQUEST_TIMEOUT) .set("seed_provider", seeds) - .set("auto_bootstrap", config.num() == 2); + .set("auto_bootstrap", config.num() == 2) + .set("accord.enabled", false); + // TODO: fails with vnode enabled THREE_NODES = init(Cluster.build(3).withConfig(conf).withoutVNodes().start()); FOUR_NODES = init(Cluster.build(4).withConfig(conf).withoutVNodes().start(), 3); @@ -830,18 +834,28 @@ private void joinFully(Cluster cluster, int node) { IInstanceConfig config = cluster.get(node).config(); InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress()); + String dc = config.localDatacenter(); + String rack = config.localRack(); IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner")); Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token")); - cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.join(address, token)); + cluster.get(node).runOnInstance(() -> { + ClusterMetadataTestHelper.register(address, dc, rack); + ClusterMetadataTestHelper.join(address, token); + }); } private void joinPartially(Cluster cluster, int node) { IInstanceConfig config = cluster.get(node).config(); InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress()); + String dc = config.localDatacenter(); + String rack = config.localRack(); IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner")); Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token")); - cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.joinPartially(address, token)); + cluster.get(node).runOnInstance(() -> { + ClusterMetadataTestHelper.register(address, dc, rack); + ClusterMetadataTestHelper.joinPartially(address, token); + }); } private void finishJoin(Cluster cluster, int node) diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java index 1c441ded3ef7..715a091dca26 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java @@ -193,7 +193,10 @@ public static void assertVisibleInRing(IInstance peer) public static void removeFromRing(IInvokableInstance peer) { - peer.runOnInstance(() -> ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort())); + peer.runOnInstance(() -> { + ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort()); + ClusterMetadataTestHelper.unregister(FBUtilities.getBroadcastAddressAndPort()); + }); } public static void assertNotVisibleInRing(IInstance peer) diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java index 9c52ac92a9cc..cb5cb1f470f5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java @@ -251,7 +251,7 @@ static void populate(Cluster cluster) } @Test - public void testQuarantine() throws IOException + public void testReplacedNodeRemovedFromGossip() throws IOException { TokenSupplier even = TokenSupplier.evenlyDistributedTokens(4, 1); try (Cluster cluster = Cluster.build(4) diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java new file mode 100644 index 000000000000..4bcae67a2aaa --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import java.net.InetSocketAddress; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; + +public class GossipExpiryAfterAssassinateTest extends GossipExpiryTestBase +{ + @Override + void doRemoval(Cluster cluster, IInvokableInstance toRemove) + { + // Shut down one peer, then have another assassinate it. The coordinating node will gossip a final LEFT status, + // including the expiry time it calculated to the remaining members. + IInvokableInstance coordinator = cluster.get(1); + if (coordinator.equals(toRemove)) + throw new IllegalArgumentException("Node cannot assassinate itself"); + + try + { + InetSocketAddress toAssassinate = toRemove.broadcastAddress(); + toRemove.shutdown().get(); + coordinator.nodetoolResult("assassinate", toAssassinate.getHostString()).asserts().success(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java new file mode 100644 index 000000000000..5b3f4e09fd2a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; + +public class GossipExpiryAfterDecommissionTest extends GossipExpiryTestBase +{ + @Override + void doRemoval(Cluster cluster, IInvokableInstance toRemove) + { + // Decommission one peer. Before shutting down messaging, the leaving node will gossip its final LEFT + // status, including the expiry time it calculated to the remaining members. + toRemove.nodetoolResult("decommission").asserts().success(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java new file mode 100644 index 000000000000..c9def985dd6b --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.tcm.ClusterMetadata; + +public class GossipExpiryAfterRemoveNodeTest extends GossipExpiryTestBase +{ + @Override + void doRemoval(Cluster cluster, IInvokableInstance toRemove) + { + // Shut down one peer, then have another remove it. The coordinating node will gossip a final LEFT status, + // including the expiry time it calculated to the remaining members. + IInvokableInstance coordinator = cluster.get(1); + if (coordinator.equals(toRemove)) + throw new IllegalArgumentException("Node to be removed cannot act as removal coordinator"); + + try + { + String nodeId = toRemove.callOnInstance(() -> ClusterMetadata.current().myNodeId().toUUID().toString()); + toRemove.shutdown().get(); + coordinator.nodetoolResult("removenode", nodeId).asserts().success(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java new file mode 100644 index 000000000000..a9fd9ab10525 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.WithProperties; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.listeners.LegacyStateListener; +import org.apache.cassandra.tcm.membership.NodeId; +import org.awaitility.Awaitility; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.config.CassandraRelevantProperties.VERY_LONG_TIME_MS; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.gms.ApplicationState.STATUS_WITH_PORT; +import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT; + +public abstract class GossipExpiryTestBase extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(GossipExpiryTestBase.class); + + abstract void doRemoval(Cluster cluster, IInvokableInstance toRemove); + + @Test + public void testExpiryOfLeftStateWithoutQuarantine() throws IOException + { + doTest(true); + } + + @Test + public void testExpiryOfLeftStateWithQuarantine() throws IOException + { + doTest(false); + } + + private void doTest(boolean withQuarantineDisabled) throws IOException + { + // This test verifies that when a node leaves the cluster, the expiry time for its state in gossip is + // recorded on each node and then expunged when that deadline is reached. By default, the expiry time for + // a left peer is calculated on each node independently, but if the gossip_quarantine_disabled config + // option is set to true it will converge and become consistent across the remaining members. + // + // * First we set the property that controls expiry time to 10s. This interval is added to the current wall + // clock time to calculate the expiry deadline. + // * Use bytebuddy to inject some jitter into the local expiry time calculation. Each node will individually + // calculate an expiry based on when it processes the completion of the operation that removes the node. If + // the config option is set the cluster should converge on the expiry time calculated by the node coordinating + // the operation. For decommission, this will be the leaving node itself and for removenode/assassinate it + // will be the coordinator. The jitter is to make sure that in the test, the nodes start off with differing + // expiry times. + // * Remove one peer via decommission, removenode or assassinate. + // * After maybe verifying the convergence, check that the state for the left node does in fact get removed. + try (WithProperties ignored = new WithProperties().set(VERY_LONG_TIME_MS, 11000); + Cluster cluster = builder().withNodes(5) + .withInstanceInitializer(GossipExpiryTestBase.BB::install) + .withConfig(config -> config.with(NETWORK, GOSSIP) + .set("gossip_quarantine_disabled", withQuarantineDisabled)) + .start()) + { + cluster.forEach(i -> i.runOnInstance(() -> BB.injectDelay.set(true))); + + IInvokableInstance toRemove = cluster.get(5); + String gossipStateKey = toRemove.config().broadcastAddress().getAddress().toString(); + doRemoval(cluster, toRemove); + if (withQuarantineDisabled) + { + // STATUS_WITH_PORT for the left node should converge to share the same expiry time across all nodes + Awaitility.waitAtMost(10, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> { + Set endpointStates = new HashSet<>(); + cluster.forEach(i -> { + if (!i.equals(toRemove)) + { + Map instanceState = ClusterUtils.gossipInfo(i).get(gossipStateKey); + if (instanceState != null && instanceState.containsKey(STATUS_WITH_PORT.name())) + endpointStates.add(instanceState.get(STATUS_WITH_PORT.name())); + } + }); + logger.info("Collected STATUS_WITH_PORT values: {}", endpointStates); + return endpointStates.size() == 1 && endpointStates.iterator() + .next() + .contains(STATUS_LEFT); + }); + } + + // Once the expiry time is reached, gossip state for the left node is purged + Awaitility.waitAtMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> { + AtomicBoolean purged = new AtomicBoolean(true); + cluster.forEach(i -> { + // The left node doesn't purge itself from local state, so skip it here + if (!i.equals(toRemove) && ClusterUtils.gossipInfo(i).containsKey(gossipStateKey)) + purged.set(false); + }); + return purged.get(); + }); + } + } + + public static class BB + { + static void install(ClassLoader cl, int nodeNumber) + { + if (nodeNumber != 2) + return; + new ByteBuddy().rebase(LegacyStateListener.class) + .method(named("processChangesToRemotePeers")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + static AtomicBoolean injectDelay = new AtomicBoolean(false); + static Random random = new Random(System.nanoTime()); + + public static void processChangesToRemotePeers(ClusterMetadata prev, + ClusterMetadata next, + Set changed, + @SuperCall Callable zuper) throws Exception + { + if (injectDelay.get()) + TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); + zuper.call(); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index 43c147c2550c..cbdcc2239495 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.Map; import java.util.Random; @@ -83,9 +84,9 @@ import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.sequences.LeaveStreams; import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.sequences.Move; -import org.apache.cassandra.tcm.sequences.LeaveStreams; import org.apache.cassandra.tcm.sequences.ReconfigureCMS; import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave; import org.apache.cassandra.tcm.transformations.AlterSchema; @@ -94,6 +95,7 @@ import org.apache.cassandra.tcm.transformations.PrepareMove; import org.apache.cassandra.tcm.transformations.PrepareReplace; import org.apache.cassandra.tcm.transformations.Register; +import org.apache.cassandra.tcm.transformations.Unregister; import org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration; import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; import org.apache.cassandra.utils.ByteBufferUtil; @@ -101,6 +103,10 @@ import org.apache.cassandra.utils.Throwables; import static org.apache.cassandra.schema.SchemaTestUtil.submit; +import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING; +import static org.apache.cassandra.tcm.membership.NodeState.BOOT_REPLACING; +import static org.apache.cassandra.tcm.membership.NodeState.LEFT; +import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED; import static org.junit.Assert.assertEquals; public class ClusterMetadataTestHelper @@ -427,6 +433,25 @@ public static void leave(InetAddressAndPort endpoint) } } + public static void unregister(InetAddressAndPort endpoint) + { + unregister(nodeId(endpoint)); + } + + public static void unregister(NodeId nodeId) + { + try + { + commit(new Unregister(nodeId, + EnumSet.of(REGISTERED, BOOTSTRAPPING, BOOT_REPLACING, LEFT), + ClusterMetadataService.instance().placementProvider())); + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + } + public static JoinProcess lazyJoin(int nodeIdx, long token) { return lazyJoin(addr(nodeIdx), Collections.singleton(new Murmur3Partitioner.LongToken(token))); diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java index d254850a36d2..1dfe194b2216 100644 --- a/test/unit/org/apache/cassandra/gms/GossiperTest.java +++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java @@ -47,6 +47,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SeedProvider; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -122,6 +123,8 @@ public void testPaddingIntact() throws Exception public void testLargeGenerationJump() throws UnknownHostException, InterruptedException { Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2); + for (InetAddressAndPort host : hosts) + ClusterMetadataTestHelper.register(host); try { InetAddressAndPort remoteHostAddress = hosts.get(1); @@ -169,6 +172,8 @@ public void testDuplicatedStateUpdate() throws Exception SimpleStateChangeListener stateChangeListener = null; Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2); + for (InetAddressAndPort host : hosts) + ClusterMetadataTestHelper.register(host); try { InetAddressAndPort remoteHostAddress = hosts.get(1); @@ -329,6 +334,9 @@ public void testNotFireDuplicatedNotificationsWithUpdateContainsOldAndNewState() new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2); + for (InetAddressAndPort host : hosts) + ClusterMetadataTestHelper.register(host); + SimpleStateChangeListener stateChangeListener = null; try { diff --git a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java index aebe443529e3..91e1f1e1c3ce 100644 --- a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java +++ b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java @@ -20,6 +20,8 @@ import java.net.UnknownHostException; import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.cassandra.locator.InetAddressAndPort; @@ -36,6 +38,15 @@ public static InetAddressAndPort randomEndpoint(Random random) return endpoint(random.nextInt(254) + 1); } + public static Set uniqueEndpoints(Random random, int count) + { + return random.ints(1, 255) + .distinct() + .limit(count) + .mapToObj(MembershipUtils::endpoint) + .collect(Collectors.toSet()); + } + public static InetAddressAndPort endpoint(int i) { return endpoint((byte)i); diff --git a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java index 33ccb137ae61..9cd8ee774685 100644 --- a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java +++ b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.Random; import java.util.Set; @@ -36,16 +37,18 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.OwnershipUtils; -import static org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses; +import static org.apache.cassandra.tcm.membership.MembershipUtils.uniqueEndpoints; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -116,9 +119,10 @@ private ClusterMetadata metadata() other = new NodeId(1); joining = new NodeId(2); Location location = new Location("dc", "rack"); - Directory directory = new Directory().unsafeWithNodeForTesting(other, nodeAddresses(random), location, NodeVersion.CURRENT) + Iterator endpoints = uniqueEndpoints(random, 2).iterator(); + Directory directory = new Directory().unsafeWithNodeForTesting(other, new NodeAddresses(endpoints.next()), location, NodeVersion.CURRENT) .withNodeState(other, NodeState.JOINED) - .unsafeWithNodeForTesting(joining, nodeAddresses(random), location, NodeVersion.CURRENT) + .unsafeWithNodeForTesting(joining, new NodeAddresses(endpoints.next()), location, NodeVersion.CURRENT) .withNodeState(joining, NodeState.REGISTERED); Set ownedTokens = OwnershipUtils.randomTokens(16, partitioner, random); return ClusterMetadataTestHelper.minimalForTesting(partitioner)