Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
Expand Down Expand Up @@ -57,17 +58,18 @@
* Compares the currentState, pendingState with IdealState and generate messages
*/
public class MessageGenerationPhase extends AbstractBaseStage {
private final static String NO_DESIRED_STATE = "NoDesiredState";
private static final String NO_DESIRED_STATE = "NoDesiredState";

// If we see there is any invalid pending message leaving on host, i.e. message
// tells participant to change from SLAVE to MASTER, and the participant is already
// at MASTER state, we wait for timeout and if the message is still not cleaned up by
// participant, controller will cleanup them proactively to unblock further state
// transition
public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
private final static String PENDING_MESSAGE = "pending message";
private final static String STALE_MESSAGE = "stale message";
private static final String PENDING_MESSAGE = "pending message";
private static final String STALE_MESSAGE = "stale message";
private static final String OFFLINE = "OFFLINE";

private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);

Expand Down Expand Up @@ -163,6 +165,18 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
// desired-state->list of generated-messages
Map<String, List<Message>> messageMap = new HashMap<>();

/*
* Calculate the current active replica count based on state model type.
* This represents the number of replicas currently serving traffic for this partition
* Active replicas include: top states, secondary top states(excluding OFFLINE) and ERROR
* states.
* Active replicas exclude: OFFLINE and DROPPED states.
* All qualifying state transitions for this partition will receive this same value,
* allowing clients to understand the current availability level and prioritize accordingly.
*/
int currentActiveReplicaCount =
calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef);

for (String instanceName : instanceStateMap.keySet()) {

Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);
Expand Down Expand Up @@ -250,17 +264,39 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
// Set currentActiveReplicaNumber to provide metadata for potential message
// prioritization by participant.
// Assign the current active replica count to all qualifying upward transitions for this
// partition.
// This ensures consistent prioritization metadata across concurrent state transitions.
// -1 indicates no prioritization metadata, for eg:Downward ST messages get a -1.
int currentActiveReplicaNumber = -1;

/*
* Assign currentActiveReplicaNumber for qualifying upward state transitions.
* Criteria for assignment:
* - Must be an upward state transition according to state model
* - Target state must be considered active (according to state model type)
*/
if (stateModelDef.isUpwardStateTransition(currentState, nextState)
&& isStateActive(nextState, stateModelDef)) {

// All qualifying transitions for this partition get the same
// currentActiveReplicaNumber
currentActiveReplicaNumber = currentActiveReplicaCount;
}

// Create new state transition message
message = MessageUtil
.createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
resource, partition.getPartitionName(), instanceName, currentState, nextState,
sessionIdMap.get(instanceName), stateModelDef.getId());
message = MessageUtil.createStateTransitionMessage(manager.getInstanceName(),
manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
currentActiveReplicaNumber);

if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
"Resource %s partition %s for instance %s with currentState %s and nextState %s",
"Resource %s partition %s for instance %s with currentState %s, nextState %s and currentActiveReplicaNumber %d",
resource.getResourceName(), partition.getPartitionName(), instanceName,
currentState, nextState));
currentState, nextState, currentActiveReplicaNumber));
}
}
}
Expand Down Expand Up @@ -290,7 +326,66 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
} // end of for-each-partition
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
/**
* Calculate the current active replica count based on state model type.
* The count includes replicas in top states, secondary top states (excluding OFFLINE),
* and ERROR states since helix considers them active.Count excludes OFFLINE and DROPPED states.
* @param currentStateMap
* @param stateModelDef
* @return The number of replicas currently in active states, used to determine the
* currentActiveReplicaNumber for the partition.
*/
private int calculateCurrentActiveReplicaCount(Map<String, String> currentStateMap,
StateModelDefinition stateModelDef) {
return (int) currentStateMap.values().stream()
.filter(state -> stateModelDef.getTopState().contains(state) // Top states (MASTER, ONLINE,
// LEADER)
|| getActiveSecondaryTopStates(stateModelDef).contains(state) // Active secondary states
// (SLAVE, STANDBY,
// BOOTSTRAP)
|| HelixDefinedState.ERROR.name().equals(state) // ERROR states (still considered
// active)
// DROPPED and OFFLINE are automatically excluded by getActiveSecondaryTopStates()
).count();
}

/**
* Get active secondary top states - states that are not non-serving states like OFFLINE and
* DROPPED.
* Reasons for elimination:
* - getSecondTopStates() can include OFFLINE as a secondary top state in some state models.
* Example - OnlineOffline:
* getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE.
* After filtering: activeSecondaryTopStates=[] (removes "OFFLINE" as it's not a serving state).
* @param stateModelDef
*/
private List<String> getActiveSecondaryTopStates(StateModelDefinition stateModelDef) {
return stateModelDef.getSecondTopStates().stream()
// Remove non-serving states
.filter(state -> !OFFLINE.equals(state) && !HelixDefinedState.DROPPED.name().equals(state))
.collect(Collectors.toList());
}

/**
* Determines if the given state is considered active based on the state model type.
* Active states include: top states, active secondary top states (excluding OFFLINE),
* and ERROR states. Active states exclude OFFLINE and DROPPED states.
* ERROR state replicas are always considered active in HELIX as they do not
* affect availability.
* @param state
* @param stateModelDef
* @return true if the state is considered active, false otherwise
*/
private boolean isStateActive(String state, StateModelDefinition stateModelDef) {
// ERROR state is always considered active regardless of state model type
if (HelixDefinedState.ERROR.name().equals(state)) {
return true;
}
return stateModelDef.getTopState().contains(state)
|| getActiveSecondaryTopStates(stateModelDef).contains(state);
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
String initialState) {
if (pendingMessage == null) {
return false;
Expand Down
44 changes: 37 additions & 7 deletions helix-core/src/main/java/org/apache/helix/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public enum Attributes {
RELAY_FROM,
EXPIRY_PERIOD,
SRC_CLUSTER,
ST_REBALANCE_TYPE
ST_REBALANCE_TYPE,
CURRENT_ACTIVE_REPLICA_NUMBER
}

/**
Expand Down Expand Up @@ -137,12 +138,8 @@ public enum STRebalanceType {
/**
* Compares the creation time of two Messages
*/
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
@Override
public int compare(Message m1, Message m2) {
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
}
};
public static final Comparator<Message> CREATE_TIME_COMPARATOR =
(m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp());

/**
* Instantiate a message
Expand Down Expand Up @@ -935,6 +932,39 @@ public void setSrcClusterName(String clusterName) {
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
}

/**
* Set current active replica count for participant-side message prioritization.
* This field indicates the number of replicas currently in active states (including ERROR states)
* for this partition at the time the state transition message is generated.
* Active states include top states, secondary top states (for single-top state models),
* and ERROR states.
* This metadata enables participants to prioritize recovery scenarios (low active counts)
* over load balancing scenarios (high active counts) in custom thread pools or message handlers.
* For example, 2→3 transitions get higher priority than 3→4 transitions.
* Default value is -1 for transitions that don't require prioritization metadata.(for eg :
* downward transitions).
* @param currentActiveReplicaNumber the number of currently active replicas (-1 when there is no
* prioritization metadata,
* >=0 for transitions containing current availability level)
*/
public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
_record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(),
currentActiveReplicaNumber);
}

/**
* Get the current active replica count for this partition at message generation time.
* This value represents the number of replicas in active states (including ERROR states) before
* any state transitions occur, enabling participant-side message prioritization based on
* current availability levels.
* @return current active replica count, or -1 for cases where we don't provide metadata for
* prioritization like downward state transitions.
*/

public int getCurrentActiveReplicaNumber() {
return _record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
}

/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,23 @@ public static Map<String, Integer> getStateCounts(Map<String, String> stateMap)
}
return stateCounts;
}

/**
* Check if a state transition is upward
* @param fromState source state
* @param toState destination state
* @return True if it's an upward state transition, false otherwise
*/
public boolean isUpwardStateTransition(String fromState, String toState) {
Map<String, Integer> statePriorityMap = getStatePriorityMap();

Integer fromStateWeight = statePriorityMap.get(fromState);
Integer toStateWeight = statePriorityMap.get(toState);

if (fromStateWeight == null || toStateWeight == null) {
return false;
}

return toStateWeight < fromStateWeight;
}
}
94 changes: 70 additions & 24 deletions helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
toState);

Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
nextState, sessionId, stateModelDefName);

Expand All @@ -60,28 +60,6 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
return null;
}

public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName) {
Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
stateModelDefName);

// Set the retry count for state transition messages.
// TODO: make the retry count configurable in ClusterConfig or IdealState
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);

if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
}
if (resource.getResourceTag() != null) {
message.setResourceTag(resource.getResourceTag());
}

return message;
}

/**
* Creates a message to change participant status
* {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
Expand Down Expand Up @@ -121,7 +99,7 @@ private static Message createBasicMessage(Message.MessageType messageType, Strin
}

/* Creates state transition or state transition cancellation message */
private static Message createStateTransitionMessage(Message.MessageType messageType,
private static Message createBasicStateTransitionMessage(Message.MessageType messageType,
String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
String instanceName, String currentState, String nextState, String tgtSessionId,
String stateModelDefName) {
Expand All @@ -136,4 +114,72 @@ private static Message createStateTransitionMessage(Message.MessageType messageT

return message;
}

/**
* Create a state transition message with replica prioritization metadata
* @param srcInstanceName source instance name
* @param srcSessionId source session id
* @param resource resource
* @param partitionName partition name
* @param instanceName target instance name
* @param currentState current state
* @param nextState next state
* @param tgtSessionId target session id
* @param stateModelDefName state model definition name
* @param currentActiveReplicaNumber The number of replicas currently in active states
* for this partition before any state transitions occur. This metadata
* enables participant-side message prioritization by indicating the
* current availability level (e.g., 0→1 recovery scenarios get higher
* priority than 2→3 load balancing scenarios). Set to -1 for transitions
* that should not be prioritized (downward transitions).
* Active states include top states, secondary top states (for single-top
* state models), and ERROR states since they are still considered active by Helix.
* @return state transition message
*/
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName,
int currentActiveReplicaNumber) {
Message message = createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
nextState, tgtSessionId, stateModelDefName);

// Set the retry count for state transition messages.
// TODO: make the retry count configurable in ClusterConfig or IdealState
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);

if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
}
if (resource.getResourceTag() != null) {
message.setResourceTag(resource.getResourceTag());
}

// Set current active replica number for participant-side prioritization
message.setCurrentActiveReplicaNumber(currentActiveReplicaNumber);

return message;
}

/**
* Create a state transition message (backward compatibility)
* @param srcInstanceName source instance name
* @param srcSessionId source session id
* @param resource resource
* @param partitionName partition name
* @param instanceName target instance name
* @param currentState current state
* @param nextState next state
* @param tgtSessionId target session id
* @param stateModelDefName state model definition name
* @return state transition message
*/
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName) {
// currentActiveReplicaNumber is set to -1 for ST messages needing no prioritization metadata
// (backward compatibility)
return createStateTransitionMessage(srcInstanceName, srcSessionId, resource, partitionName,
instanceName, currentState, nextState, tgtSessionId, stateModelDefName, -1);
}
}
Loading