Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1520,4 +1520,29 @@ public enum CQLStartTime
public boolean enforce_native_deadline_for_hints = false;

public boolean paxos_repair_race_wait = true;

/**
* If true, gossip state updates for nodes which have left the cluster will continue to be processed while the
* node is still present in ClusterMetadata. This enables the gossip expiry time for those nodes (the deadline
* after which their state is fully purged from gossip) to converge across the remaining nodes in the cluster.
* This is a change from previous behaviour as historically once a node has advertised a LEFT status further
* updates to gossip state for it are ignored for a period of time to prevent flapping if older/stale states
* are encountered.
* Following CEP-21, most significant state changes are handled by the cluster metadata log, so resurrection
* of left nodes is not a problem for gossip to solve and so quarantine is not really necessary. However,
* FailureDetector does still use gossip messages to assess node health and some external systems still use gossip
* state to inform decisions about topology/node health/etc. For those reasons, for now the disabling of quarantine
* is off by default and hot-proppable.
*
* With quarantine still in effect, expiry from gossip of LEFT nodes will occur at different times on each peer.
* Also, when there are LEFT nodes in gossip, the state will never fully converge across the cluster as each node
* will have its own expiry time for a LEFT peer.
*
* With quarantine disabled the STATUS_WITH_PORT values for the left node which include the expiry time will
* converge and peers will all evict it from gossip after the same deadline.
*
* Eventually, this configuration option should be removed and quarantine disabled entirely for clusters running
* 6.0 and later.
*/
public volatile boolean gossip_quarantine_disabled = false;
}
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6109,4 +6109,14 @@ public static void setPartitioner(String name)
{
partitioner = FBUtilities.newPartitioner(name);
}

public static boolean getGossipQuarantineDisabled()
{
return conf.gossip_quarantine_disabled;
}

public static void setGossipQuarantineDisabled(boolean disabled)
{
conf.gossip_quarantine_disabled = disabled;
}
}
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/gms/EndpointState.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public CassandraVersion getReleaseVersion()
public String toString()
{
View view = ref.get();
return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState;
return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState + ", isAlive = " + isAlive;
}

public boolean isSupersededBy(EndpointState that)
Expand Down
168 changes: 145 additions & 23 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.utils.FBUtilities;
Expand All @@ -107,6 +108,10 @@
import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED;
import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS;
import static org.apache.cassandra.gms.VersionedValue.HIBERNATE;
import static org.apache.cassandra.gms.VersionedValue.REMOVED_TOKEN;
import static org.apache.cassandra.gms.VersionedValue.REMOVING_TOKEN;
import static org.apache.cassandra.gms.VersionedValue.SHUTDOWN;
import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT;
import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.ECHO_REQ;
Expand Down Expand Up @@ -137,8 +142,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
private static final ScheduledExecutorPlus executor = executorFactory().scheduled("GossipTasks");

static final ApplicationState[] STATES = ApplicationState.values();
static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
static final List<String> DEAD_STATES = Arrays.asList(REMOVING_TOKEN, REMOVED_TOKEN, STATUS_LEFT, HIBERNATE);
static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
static
{
Expand Down Expand Up @@ -185,7 +189,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
/* map where key is endpoint and value is timestamp when this endpoint was removed from
* gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
* after removal to prevent nodes from falsely reincarnating during the time when removal
* gossip gets propagated to all nodes */
* gossip gets propagated to all nodes.
* Note: in future, this need only be used when ClusterMetadataService is in the GOSSIP state,
* i.e. during the major upgrade to the version with CEP-21, but before the CMS is initialized.
* In this state, gossip is still used to propagate changes to broadcast address and release
* version. Once the CMS initialization is complete, this is no longer necessary.
* Currently in order to support a controlled rollout of that change to behaviour, quarantine
* is still used by default, but can be disabled via config (gossip_quarantine_disabled) or
* JMX (GossiperMBean::setQuarantineDisabled)
*/
private final Map<InetAddressAndPort, Long> justRemovedEndpoints = new ConcurrentHashMap<>();

private final Map<InetAddressAndPort, Long> expireTimeEndpointMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -450,14 +462,7 @@ private static boolean isShutdown(EndpointState epState)

public static boolean isShutdown(VersionedValue vv)
{
if (vv == null)
return false;

String value = vv.value;
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
assert (pieces.length > 0);
String state = pieces[0];
return state.equals(VersionedValue.SHUTDOWN);
return matchesStatusString(vv, SHUTDOWN);
}

public static boolean isHibernate(EndpointState epState)
Expand All @@ -469,15 +474,39 @@ public static boolean isHibernate(EndpointState epState)
}

public static boolean isHibernate(VersionedValue vv)
{
return matchesStatusString(vv, HIBERNATE);
}

public static boolean isLeft(VersionedValue vv)
{
return matchesStatusString(vv, STATUS_LEFT);
}

private static boolean matchesStatusString(VersionedValue vv, String toMatch)
{
if (vv == null)
return false;

String value = vv.value;
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
String[] pieces = vv.splitValue();
assert (pieces.length > 0);
String state = pieces[0];
return state.equals(VersionedValue.HIBERNATE);
return state.equals(toMatch);
}

public static long extractExpireTime(String[] pieces)
{
if (pieces.length < 3)
return 0L;
try
{
return Long.parseLong(pieces[2]);
}
catch (NumberFormatException e)
{
logger.debug("Invalid value found for expire time ({}), ignoring", pieces[2]);
return 0L;
}
}

public static void runInGossipStageBlocking(Runnable runnable)
Expand Down Expand Up @@ -696,10 +725,21 @@ private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpi
{
if (disableEndpointRemoval)
return;

// Quarantine is only necessary while upgrading from gossip-driven management of cluster metadata
if (getQuarantineDisabled() && ClusterMetadata.current().epoch.isAfter(Epoch.UPGRADE_GOSSIP))
return;

justRemovedEndpoints.put(endpoint, quarantineExpiration);
GossiperDiagnostics.quarantinedEndpoint(this, endpoint, quarantineExpiration);
}

public void clearQuarantinedEndpoints()
{
logger.info("Clearing quarantined endpoints");
justRemovedEndpoints.clear();
}

/**
* The gossip digest is built based on randomization
* rather than just looping through the collection of live endpoints.
Expand Down Expand Up @@ -948,15 +988,14 @@ void doStatusCheck()
}

// check for dead state removal
long expireTime = getExpireTimeForEndpoint(endpoint);
if (!epState.isAlive() && (now > expireTime)
&& (!metadata.directory.allAddresses().contains(endpoint)))
if (!epState.isAlive() && (!metadata.directory.allJoinedEndpoints().contains(endpoint)))
{
if (logger.isDebugEnabled())
long expireTime = getExpireTimeForEndpoint(endpoint);
if (now > expireTime)
{
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime);
logger.info("Reached gossip expiry time for endpoint : {} ({})", endpoint, expireTime);
runInGossipStageBlocking(() -> evictFromMembership(endpoint));
}
runInGossipStageBlocking(() -> evictFromMembership(endpoint));
}
}
}
Expand Down Expand Up @@ -1897,11 +1936,15 @@ public int getCurrentGenerationNumber(String address) throws UnknownHostExceptio

public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long expireTime)
{
if (logger.isDebugEnabled())
if (expireTime == 0L)
{
logger.debug("Supplied expire time for {} was 0, not recording", endpoint);
}
else
{
logger.debug("adding expire time for endpoint : {} ({})", endpoint, expireTime);
expireTimeEndpointMap.put(endpoint, expireTime);
}
expireTimeEndpointMap.put(endpoint, expireTime);
}

public static long computeExpireTime()
Expand Down Expand Up @@ -2104,6 +2147,50 @@ public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep)
MessagingService.instance().send(message, ep);
}

public void unsafeBroadcastLeftStatus(InetAddressAndPort left,
Collection<Token> tokens,
Iterable<InetAddressAndPort> sendTo)
{
runInGossipStageBlocking(() -> {
EndpointState epState = endpointStateMap.get(left);
if (epState == null)
{
logger.info("No gossip state for node {}", left);
return;
}

NodeState state = ClusterMetadata.current().directory.peerState(left);
if (state != NodeState.LEFT)
{
logger.info("Node Status for {} is not LEFT ({})", left, state);
return;
}

EndpointState toSend = new EndpointState(epState);
toSend.forceNewerGenerationUnsafe();
toSend.markDead();
VersionedValue value = StorageService.instance.valueFactory.left(tokens, computeExpireTime());

if (left.equals(getBroadcastAddressAndPort()))
{
// Adding local state bumps the value's version. To keep this consistent across
// the cluster, re-fetch it before broadcasting.
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, value);
value = Gossiper.instance.endpointStateMap.get(getBroadcastAddressAndPort())
.getApplicationState(ApplicationState.STATUS_WITH_PORT);
}

toSend.addApplicationState(ApplicationState.STATUS_WITH_PORT, value);
GossipDigestAck2 payload = new GossipDigestAck2(Collections.singletonMap(left, toSend));
logger.info("Sending app state with status {} to {}", value.value, sendTo);
for (InetAddressAndPort ep : sendTo)
{
Message<GossipDigestAck2> message = Message.out(Verb.GOSSIP_DIGEST_ACK2, payload);
MessagingService.instance().send(message, ep);
}
});
}

private void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState epstate)
{
checkProperThreadForStateMutation();
Expand Down Expand Up @@ -2221,9 +2308,31 @@ private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collecti
newValue = valueFactory.hibernate(true);
break;
}

if (isLocal && !StorageService.instance.shouldJoinRing())
break;
newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue);

// If quarantine has been disabled and we have already seen a LEFT status for a remote peer
// which originated from the peer itself or the node which coordinated its removal (and so
// has a version > 0), keep it as this is how we ensure the gossip expiry time encoded in
// the status string converges across peers.
// Should a node leave and then rejoin after resetting its local state (i.e. wipe and
// rejoin), it is automatically unregistered which removes all gossip state for it so there
// will be no oldValue in that case.
//
// Note: don't reorder these conditions as isLeft includes a null check
if (getQuarantineDisabled() && !isLocal && Gossiper.isLeft(oldValue) && oldValue.version > 0)
{
logger.debug("Already seen a LEFT status for {} with a non-zero version, " +
"dropping derived value {}", endpoint, newValue);
newValue = oldValue;
}
else
{
newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue);
if (Gossiper.isLeft(newValue))
Gossiper.instance.addExpireTimeForEndpoint(endpoint, Gossiper.extractExpireTime(newValue.splitValue()));
}
break;
default:
newValue = oldValue;
Expand Down Expand Up @@ -2269,4 +2378,17 @@ public void triggerRoundWithCMS()
sendGossip(message, cms);
}
}

@Override
public boolean getQuarantineDisabled()
{
return DatabaseDescriptor.getGossipQuarantineDisabled();
}

@Override
public void setQuarantineDisabled(boolean enabled)
{
logger.info("Setting gossip_quarantine_disabled: {}", enabled);
DatabaseDescriptor.setGossipQuarantineDisabled(enabled);
}
}
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/gms/GossiperMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public interface GossiperMBean
public boolean getLooseEmptyEnabled();

public void setLooseEmptyEnabled(boolean enabled);

public boolean getQuarantineDisabled();

public void setQuarantineDisabled(boolean disabled);
}
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/gms/VersionedValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public byte[] toBytes()
return value.getBytes(ISO_8859_1);
}

public String[] splitValue()
{
return value.split(DELIMITER_STR, -1);
}

private static String versionString(String... args)
{
return StringUtils.join(args, VersionedValue.DELIMITER);
Expand Down
13 changes: 7 additions & 6 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2158,6 +2158,12 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
Gossiper.instance.markDead(endpoint, epState);
});
}
else if (Gossiper.isLeft(value))
{
long expireTime = Gossiper.extractExpireTime(value.splitValue());
logger.info("Node state LEFT detected, setting or updating expire time {}", expireTime);
Gossiper.instance.addExpireTimeForEndpoint(endpoint, expireTime);
}
}

if (epState == null || Gossiper.instance.isDeadState(epState))
Expand Down Expand Up @@ -2207,7 +2213,7 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
updateNetVersion(endpoint, value);
break;
case STATUS_WITH_PORT:
String[] pieces = splitValue(value);
String[] pieces = value.splitValue();
String moveName = pieces[0];
if (moveName.equals(VersionedValue.SHUTDOWN))
logger.info("Node {} state jump to shutdown", endpoint);
Expand All @@ -2226,11 +2232,6 @@ else if (moveName.equals(VersionedValue.STATUS_NORMAL))
}
}

private static String[] splitValue(VersionedValue value)
{
return value.value.split(VersionedValue.DELIMITER_STR, -1);
}

public static void updateIndexStatus(InetAddressAndPort endpoint, VersionedValue versionedValue)
{
IndexStatusManager.instance.receivePeerIndexStatus(endpoint, versionedValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean
next.tokenMap.lastModified().equals(prev.tokenMap.lastModified()))
return;

Set<InetAddressAndPort> removedAddr = Sets.difference(new HashSet<>(prev.directory.allAddresses()),
new HashSet<>(next.directory.allAddresses()));
Set<InetAddressAndPort> removedAddr = Sets.difference(prev.directory.allAddresses(), next.directory.allAddresses());

Set<NodeId> changed = new HashSet<>();
for (NodeId node : next.directory.peerIds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean

logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId());
Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next);
if (Gossiper.instance.getQuarantineDisabled())
Gossiper.instance.clearQuarantinedEndpoints();
}
}
Loading