diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 859c6679e9..0868ac6d6a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; @@ -57,17 +58,18 @@ * Compares the currentState, pendingState with IdealState and generate messages */ public class MessageGenerationPhase extends AbstractBaseStage { - private final static String NO_DESIRED_STATE = "NoDesiredState"; + private static final String NO_DESIRED_STATE = "NoDesiredState"; // If we see there is any invalid pending message leaving on host, i.e. message // tells participant to change from SLAVE to MASTER, and the participant is already // at MASTER state, we wait for timeout and if the message is still not cleaned up by // participant, controller will cleanup them proactively to unblock further state // transition - public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil + public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil .getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000); - private final static String PENDING_MESSAGE = "pending message"; - private final static String STALE_MESSAGE = "stale message"; + private static final String PENDING_MESSAGE = "pending message"; + private static final String STALE_MESSAGE = "stale message"; + private static final String OFFLINE = "OFFLINE"; private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class); @@ -163,6 +165,18 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr // desired-state->list of generated-messages Map> messageMap = new HashMap<>(); + /* + * Calculate the current active replica count based on state model type. + * This represents the number of replicas currently serving traffic for this partition + * Active replicas include: top states, secondary top states(excluding OFFLINE) and ERROR + * states. + * Active replicas exclude: OFFLINE and DROPPED states. + * All qualifying state transitions for this partition will receive this same value, + * allowing clients to understand the current availability level and prioritize accordingly. + */ + int currentActiveReplicaCount = + calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef); + for (String instanceName : instanceStateMap.keySet()) { Set staleMessages = cache.getStaleMessagesByInstance(instanceName); @@ -250,17 +264,39 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr pendingMessage, manager, resource, partition, sessionIdMap, instanceName, stateModelDef, cancellationMessage, isCancellationEnabled); } else { + // Set currentActiveReplicaNumber to provide metadata for potential message + // prioritization by participant. + // Assign the current active replica count to all qualifying upward transitions for this + // partition. + // This ensures consistent prioritization metadata across concurrent state transitions. + // -1 indicates no prioritization metadata, for eg:Downward ST messages get a -1. + int currentActiveReplicaNumber = -1; + + /* + * Assign currentActiveReplicaNumber for qualifying upward state transitions. + * Criteria for assignment: + * - Must be an upward state transition according to state model + * - Target state must be considered active (according to state model type) + */ + if (stateModelDef.isUpwardStateTransition(currentState, nextState) + && isStateActive(nextState, stateModelDef)) { + + // All qualifying transitions for this partition get the same + // currentActiveReplicaNumber + currentActiveReplicaNumber = currentActiveReplicaCount; + } + // Create new state transition message - message = MessageUtil - .createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(), - resource, partition.getPartitionName(), instanceName, currentState, nextState, - sessionIdMap.get(instanceName), stateModelDef.getId()); + message = MessageUtil.createStateTransitionMessage(manager.getInstanceName(), + manager.getSessionId(), resource, partition.getPartitionName(), instanceName, + currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(), + currentActiveReplicaNumber); if (logger.isDebugEnabled()) { LogUtil.logDebug(logger, _eventId, String.format( - "Resource %s partition %s for instance %s with currentState %s and nextState %s", + "Resource %s partition %s for instance %s with currentState %s, nextState %s and currentActiveReplicaNumber %d", resource.getResourceName(), partition.getPartitionName(), instanceName, - currentState, nextState)); + currentState, nextState, currentActiveReplicaNumber)); } } } @@ -290,7 +326,66 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr } // end of for-each-partition } - private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState, + /** + * Calculate the current active replica count based on state model type. + * The count includes replicas in top states, secondary top states (excluding OFFLINE), + * and ERROR states since helix considers them active.Count excludes OFFLINE and DROPPED states. + * @param currentStateMap + * @param stateModelDef + * @return The number of replicas currently in active states, used to determine the + * currentActiveReplicaNumber for the partition. + */ + private int calculateCurrentActiveReplicaCount(Map currentStateMap, + StateModelDefinition stateModelDef) { + return (int) currentStateMap.values().stream() + .filter(state -> stateModelDef.getTopState().contains(state) // Top states (MASTER, ONLINE, + // LEADER) + || getActiveSecondaryTopStates(stateModelDef).contains(state) // Active secondary states + // (SLAVE, STANDBY, + // BOOTSTRAP) + || HelixDefinedState.ERROR.name().equals(state) // ERROR states (still considered + // active) + // DROPPED and OFFLINE are automatically excluded by getActiveSecondaryTopStates() + ).count(); + } + + /** + * Get active secondary top states - states that are not non-serving states like OFFLINE and + * DROPPED. + * Reasons for elimination: + * - getSecondTopStates() can include OFFLINE as a secondary top state in some state models. + * Example - OnlineOffline: + * getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE. + * After filtering: activeSecondaryTopStates=[] (removes "OFFLINE" as it's not a serving state). + * @param stateModelDef + */ + private List getActiveSecondaryTopStates(StateModelDefinition stateModelDef) { + return stateModelDef.getSecondTopStates().stream() + // Remove non-serving states + .filter(state -> !OFFLINE.equals(state) && !HelixDefinedState.DROPPED.name().equals(state)) + .collect(Collectors.toList()); + } + + /** + * Determines if the given state is considered active based on the state model type. + * Active states include: top states, active secondary top states (excluding OFFLINE), + * and ERROR states. Active states exclude OFFLINE and DROPPED states. + * ERROR state replicas are always considered active in HELIX as they do not + * affect availability. + * @param state + * @param stateModelDef + * @return true if the state is considered active, false otherwise + */ + private boolean isStateActive(String state, StateModelDefinition stateModelDef) { + // ERROR state is always considered active regardless of state model type + if (HelixDefinedState.ERROR.name().equals(state)) { + return true; + } + return stateModelDef.getTopState().contains(state) + || getActiveSecondaryTopStates(stateModelDef).contains(state); + } + + private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState, String initialState) { if (pendingMessage == null) { return false; diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index acf0758217..68751de40d 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -104,7 +104,8 @@ public enum Attributes { RELAY_FROM, EXPIRY_PERIOD, SRC_CLUSTER, - ST_REBALANCE_TYPE + ST_REBALANCE_TYPE, + CURRENT_ACTIVE_REPLICA_NUMBER } /** @@ -137,12 +138,8 @@ public enum STRebalanceType { /** * Compares the creation time of two Messages */ - public static final Comparator CREATE_TIME_COMPARATOR = new Comparator() { - @Override - public int compare(Message m1, Message m2) { - return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp())); - } - }; + public static final Comparator CREATE_TIME_COMPARATOR = + (m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp()); /** * Instantiate a message @@ -935,6 +932,39 @@ public void setSrcClusterName(String clusterName) { _record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName); } + /** + * Set current active replica count for participant-side message prioritization. + * This field indicates the number of replicas currently in active states (including ERROR states) + * for this partition at the time the state transition message is generated. + * Active states include top states, secondary top states (for single-top state models), + * and ERROR states. + * This metadata enables participants to prioritize recovery scenarios (low active counts) + * over load balancing scenarios (high active counts) in custom thread pools or message handlers. + * For example, 2→3 transitions get higher priority than 3→4 transitions. + * Default value is -1 for transitions that don't require prioritization metadata.(for eg : + * downward transitions). + * @param currentActiveReplicaNumber the number of currently active replicas (-1 when there is no + * prioritization metadata, + * >=0 for transitions containing current availability level) + */ + public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) { + _record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), + currentActiveReplicaNumber); + } + + /** + * Get the current active replica count for this partition at message generation time. + * This value represents the number of replicas in active states (including ERROR states) before + * any state transitions occur, enabling participant-side message prioritization based on + * current availability levels. + * @return current active replica count, or -1 for cases where we don't provide metadata for + * prioritization like downward state transitions. + */ + + public int getCurrentActiveReplicaNumber() { + return _record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1); + } + /** * Check if this message is targetted for a controller * @return true if this is a controller message, false otherwise diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index fcf24fb305..e79d589384 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -505,4 +505,23 @@ public static Map getStateCounts(Map stateMap) } return stateCounts; } + + /** + * Check if a state transition is upward + * @param fromState source state + * @param toState destination state + * @return True if it's an upward state transition, false otherwise + */ + public boolean isUpwardStateTransition(String fromState, String toState) { + Map statePriorityMap = getStatePriorityMap(); + + Integer fromStateWeight = statePriorityMap.get(fromState); + Integer toStateWeight = statePriorityMap.get(toState); + + if (fromStateWeight == null || toStateWeight == null) { + return false; + } + + return toStateWeight < fromStateWeight; + } } diff --git a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java index 94de8331b1..4a93050dcb 100644 --- a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java @@ -48,7 +48,7 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc toState); Message message = - createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION, + createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION, srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState, nextState, sessionId, stateModelDefName); @@ -60,28 +60,6 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc return null; } - public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId, - Resource resource, String partitionName, String instanceName, String currentState, - String nextState, String tgtSessionId, String stateModelDefName) { - Message message = - createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName, - srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId, - stateModelDefName); - - // Set the retry count for state transition messages. - // TODO: make the retry count configurable in ClusterConfig or IdealState - message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT); - - if (resource.getResourceGroupName() != null) { - message.setResourceGroupName(resource.getResourceGroupName()); - } - if (resource.getResourceTag() != null) { - message.setResourceTag(resource.getResourceTag()); - } - - return message; - } - /** * Creates a message to change participant status * {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus} @@ -121,7 +99,7 @@ private static Message createBasicMessage(Message.MessageType messageType, Strin } /* Creates state transition or state transition cancellation message */ - private static Message createStateTransitionMessage(Message.MessageType messageType, + private static Message createBasicStateTransitionMessage(Message.MessageType messageType, String srcInstanceName, String srcSessionId, Resource resource, String partitionName, String instanceName, String currentState, String nextState, String tgtSessionId, String stateModelDefName) { @@ -136,4 +114,72 @@ private static Message createStateTransitionMessage(Message.MessageType messageT return message; } + + /** + * Create a state transition message with replica prioritization metadata + * @param srcInstanceName source instance name + * @param srcSessionId source session id + * @param resource resource + * @param partitionName partition name + * @param instanceName target instance name + * @param currentState current state + * @param nextState next state + * @param tgtSessionId target session id + * @param stateModelDefName state model definition name + * @param currentActiveReplicaNumber The number of replicas currently in active states + * for this partition before any state transitions occur. This metadata + * enables participant-side message prioritization by indicating the + * current availability level (e.g., 0→1 recovery scenarios get higher + * priority than 2→3 load balancing scenarios). Set to -1 for transitions + * that should not be prioritized (downward transitions). + * Active states include top states, secondary top states (for single-top + * state models), and ERROR states since they are still considered active by Helix. + * @return state transition message + */ + public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId, + Resource resource, String partitionName, String instanceName, String currentState, + String nextState, String tgtSessionId, String stateModelDefName, + int currentActiveReplicaNumber) { + Message message = createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION, + srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState, + nextState, tgtSessionId, stateModelDefName); + + // Set the retry count for state transition messages. + // TODO: make the retry count configurable in ClusterConfig or IdealState + message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT); + + if (resource.getResourceGroupName() != null) { + message.setResourceGroupName(resource.getResourceGroupName()); + } + if (resource.getResourceTag() != null) { + message.setResourceTag(resource.getResourceTag()); + } + + // Set current active replica number for participant-side prioritization + message.setCurrentActiveReplicaNumber(currentActiveReplicaNumber); + + return message; + } + + /** + * Create a state transition message (backward compatibility) + * @param srcInstanceName source instance name + * @param srcSessionId source session id + * @param resource resource + * @param partitionName partition name + * @param instanceName target instance name + * @param currentState current state + * @param nextState next state + * @param tgtSessionId target session id + * @param stateModelDefName state model definition name + * @return state transition message + */ + public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId, + Resource resource, String partitionName, String instanceName, String currentState, + String nextState, String tgtSessionId, String stateModelDefName) { + // currentActiveReplicaNumber is set to -1 for ST messages needing no prioritization metadata + // (backward compatibility) + return createStateTransitionMessage(srcInstanceName, srcSessionId, resource, partitionName, + instanceName, currentState, nextState, tgtSessionId, stateModelDefName, -1); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java new file mode 100644 index 0000000000..1a9b50ae3e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java @@ -0,0 +1,594 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.OnlineOfflineWithBootstrapSMD; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.apache.helix.tools.StateModelConfigGenerator.generateConfigForMasterSlave; +import static org.apache.helix.tools.StateModelConfigGenerator.generateConfigForOnlineOffline; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPrioritizationMessageGeneration extends MessageGenerationPhase { + + private static final String TEST_CLUSTER = "TestCluster"; + private static final String TEST_RESOURCE = "TestDB"; + private static final String PARTITION_0 = "TestDB_0"; + private static final String INSTANCE_0 = "localhost_0"; + private static final String INSTANCE_1 = "localhost_1"; + private static final String INSTANCE_2 = "localhost_2"; + private static final String INSTANCE_3 = "localhost_3"; + private static final String INSTANCE_4 = "localhost_4"; + private static final String SESSION_ID = "123"; + + // === Tests for upward transitions and replica counting === + + @Test + public void testCurrentReplicaCountForUpwardTransitions() throws Exception { + // Test: Upward transitions from non-second top states should receive currentActiveReplicaNumber + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + // Setup: 1 MASTER, 1 SLAVE, 2 OFFLINE (current active = 2) + Map currentStates = createCurrentStates(Map.of(INSTANCE_0, "MASTER", + INSTANCE_1, "SLAVE", INSTANCE_2, "OFFLINE", INSTANCE_3, "OFFLINE"), "MasterSlave"); + // Action: Move 2 OFFLINE instances to SLAVE (upward transitions) + Map bestPossible = Map.of(INSTANCE_0, "MASTER", // No change + INSTANCE_1, "SLAVE", // No change + INSTANCE_2, "SLAVE", // OFFLINE -> SLAVE (upward) + INSTANCE_3, "SLAVE" // OFFLINE -> SLAVE (upward) + ); + + List messages = processAndGetMessages(stateModelDef, currentStates, bestPossible, 4); + + // Verify: 2 messages generated, both with current active replica count = 2 + // Current active replicas: 1 MASTER + 1 SLAVE = 2 + Assert.assertEquals(messages.size(), 2); + for (Message msg : messages) { + Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 2, + "Upward transitions should have currentActiveReplicaNumber = current active replica count"); + Assert.assertTrue(msg.getTgtName().equals(INSTANCE_2) || msg.getTgtName().equals(INSTANCE_3)); + } + } + + @Test + public void testZeroReplicaScenario() throws Exception { + // Test: All instances starting from OFFLINE (0 active replicas) + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + + // Setup: All instances OFFLINE (current active = 0) + Map currentStates = createCurrentStates( + Map.of(INSTANCE_0, "OFFLINE", INSTANCE_1, "OFFLINE", INSTANCE_2, "OFFLINE"), "MasterSlave"); + + // Action: Create 1 MASTER, 2 SLAVE from all OFFLINE + Map bestPossible = Map.of(INSTANCE_0, "SLAVE", // OFFLINE -> SLAVE (upward) + INSTANCE_1, "MASTER", // OFFLINE -> MASTER (upward) + INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (upward) + ); + + List messages = processAndGetMessages(stateModelDef, currentStates, bestPossible, 3); + + // Verify: All messages have currentActiveReplicaNumber = 0 + Assert.assertEquals(messages.size(), 3); + for (Message msg : messages) { + Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 0, + "All upward transitions should have currentActiveReplicaNumber = 0"); + } + } + + // === Tests for non-upward transitions === + + @Test + public void testNoReplicaNumberForNonUpwardTransitions() throws Exception { + // Test: Downward transitions should not receive currentActiveReplicaNumber + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + + Map currentStates = + createCurrentStates(Map.of(INSTANCE_0, "SLAVE"), "MasterSlave"); + + // Action: SLAVE -> OFFLINE (downward transition) + Map bestPossible = Map.of(INSTANCE_0, "OFFLINE"); + + List messages = processAndGetMessages(stateModelDef, currentStates, bestPossible, 1); + + // Verify: Downward transition gets default value (-1) + Assert.assertEquals(messages.size(), 1); + Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), -1, + "Non-upward state transitions should not have currentActiveReplicaNumber assigned"); + } + + // === Tests for pending messages === + + @Test + public void testPendingMessagesDoNotAffectCurrentReplicaCount() throws Exception { + // Test: SLAVE -> MASTER (second top to top) should not receive currentActiveReplicaNumber + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + + // Setup: 1 MASTER, 2 OFFLINE (current active = 1) + Map currentStates = createCurrentStates( + Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "OFFLINE", INSTANCE_2, "OFFLINE"), "MasterSlave"); + + CurrentStateOutput currentStateOutput = createCurrentStateOutput(currentStates); + + // Add pending message for INSTANCE_1: OFFLINE->SLAVE + Message pendingMsg = createMessage("OFFLINE", "SLAVE", INSTANCE_1); + currentStateOutput.setPendingMessage(TEST_RESOURCE, new Partition(PARTITION_0), INSTANCE_1, + pendingMsg); + + ClusterEvent event = prepareClusterEvent(stateModelDef, currentStateOutput, 3); + + // Action: Both offline instances should become SLAVE + Map bestPossible = Map.of(INSTANCE_0, "MASTER", // No change + INSTANCE_1, "SLAVE", // Has pending message, no new message + INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (new transition) + ); + + setBestPossibleState(event, bestPossible); + process(event); + + MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name()); + List messages = output.getMessages(TEST_RESOURCE, new Partition(PARTITION_0)); + + // Verify: Only new message for INSTANCE_2, with current active count = 1 (ignoring pending) + Assert.assertEquals(messages.size(), 1); + Assert.assertEquals(messages.get(0).getTgtName(), INSTANCE_2); + Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 1, + "currentActiveReplicaNumber should be based on current active replicas only, not including pending transitions"); + } + + // === Tests for different state model types === + + @Test + public void testSingleTopStateModelWithoutSecondaryTop() throws Exception { + // Test: ONLINE-OFFLINE model (single top without secondary) - only top + ERROR count as active + StateModelDefinition onlineOfflineStateModel = CustomOnlineOfflineSMD.build(1); + + // Verify this is a single-top state model + Assert.assertTrue(onlineOfflineStateModel.isSingleTopStateModel(), + "ONLINE-OFFLINE should be a single-top state model"); + + // Setup: 0 ONLINE, 1 ERROR, 1 OFFLINE (current active = 1: ERROR only) + Map currentStates = createCurrentStates( + Map.of(INSTANCE_0, "OFFLINE", INSTANCE_1, "ERROR", INSTANCE_2, "OFFLINE"), "OnlineOffline"); + + // Action: One OFFLINE becomes ONLINE + Map bestPossible = Map.of(INSTANCE_0, "ONLINE", // OFFLINE -> ONLINE (upward) + INSTANCE_1, "ERROR", // No change + INSTANCE_2, "OFFLINE" // No change + ); + + List messages = processAndGetMessagesForOnlineOffline(onlineOfflineStateModel, + currentStates, bestPossible, 3); + + // Verify: Current active = 1 (0 ONLINE + 1 ERROR) + Assert.assertEquals(messages.size(), 1); + Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 1, + "Single-top without secondary: only top states + ERROR count as active"); + } + + @Test + public void testSingleTopStateModelWithSecondaryTop() throws Exception { + // Test: MASTER-SLAVE model (single top with secondary) - top + secondary + ERROR count as + // active + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + + // Setup: 1 MASTER, 2 SLAVE, 1 ERROR, 1 OFFLINE (current active = 4) + Map currentStates = + createCurrentStates(Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "SLAVE", INSTANCE_2, "SLAVE", + INSTANCE_3, "ERROR", INSTANCE_4, "OFFLINE"), "MasterSlave"); + + // Action: OFFLINE becomes SLAVE + Map bestPossible = Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "SLAVE", INSTANCE_2, + "SLAVE", INSTANCE_3, "ERROR", INSTANCE_4, "SLAVE" // OFFLINE -> SLAVE (upward) + ); + + List messages = processAndGetMessages(stateModelDef, currentStates, bestPossible, 5); + + // Verify: Current active = 4 (1 MASTER + 2 SLAVE + 1 ERROR) + Assert.assertEquals(messages.size(), 1); + Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 4, + "Single-top with secondary: top + secondary top + ERROR count as active"); + } + + @Test + public void testMultiTopStateModelWithoutSecondaryTop() throws Exception { + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForOnlineOffline()); + + // Setup: 1 ONLINE, 2 OFFLINE (current active = 1) + Map currentStates = createCurrentStates( + Map.of(INSTANCE_0, "ONLINE", INSTANCE_1, "OFFLINE", INSTANCE_2, "OFFLINE"), + "OfflineOnline"); + + // Action: Both OFFLINE become ONLINE + Map bestPossible = Map.of(INSTANCE_0, "ONLINE", // No change + INSTANCE_1, "ONLINE", // OFFLINE -> ONLINE (upward) + INSTANCE_2, "ONLINE" // OFFLINE -> ONLINE (upward) + ); + + List messages = + processAndGetMessagesForOnlineOffline(stateModelDef, currentStates, bestPossible, 3); + + // Verify: Current active = 1 (only ONLINE states count) + Assert.assertEquals(messages.size(), 2); + for (Message msg : messages) { + Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 1, + "Multi-top state model without secondary top: only top states count as active"); + } + } + + @Test + public void testMultiTopStateModelWithSecondaryTop() throws Exception { + StateModelDefinition stateModelDef = OnlineOfflineWithBootstrapSMD.build(); + + // Setup: 2 ONLINE, 1 BOOTSTRAP, 1 OFFLINE (current active = 3) + Map currentStates = + createCurrentStates(Map.of(INSTANCE_0, "ONLINE", INSTANCE_1, "ONLINE", INSTANCE_2, + "BOOTSTRAP", INSTANCE_3, "OFFLINE"), "OnlineOfflineWithBootstrap"); + + // Action: OFFLINE becomes BOOTSTRAP. + Map bestPossible = Map.of(INSTANCE_0, "ONLINE", // No change + INSTANCE_1, "ONLINE", // No change + INSTANCE_2, "BOOTSTRAP", // No change + INSTANCE_3, "BOOTSTRAP" // OFFLINE -> BOOTSTRAP (upward) + ); + + List messages = + processAndGetMessagesForOnlineOffline(stateModelDef, currentStates, bestPossible, 4); + + // Verify: Current active = 3 (ONLINE + BOOTSTRAP state counts) + Assert.assertEquals(messages.size(), 1); + for (Message msg : messages) { + Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 3, + "Multi-top state model without secondary top: top states and secondary top states count as active"); + } + } + + // === Tests for ERROR state handling === + + @Test + public void testErrorStateIncludedInActiveCount() throws Exception { + // Test: ERROR states are always counted as active + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + + // Setup: 1 MASTER, 1 ERROR, 1 OFFLINE (current active = 2) + Map currentStates = createCurrentStates( + Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "ERROR", INSTANCE_2, "OFFLINE"), "MasterSlave"); + + // Action: OFFLINE becomes SLAVE + Map bestPossible = + Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "ERROR", INSTANCE_2, "SLAVE"); + + List messages = processAndGetMessages(stateModelDef, currentStates, bestPossible, 3); + + // Verify: Current active = 2 (1 MASTER + 1 ERROR) + Assert.assertEquals(messages.size(), 1); + Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 2, + "ERROR state replicas should be included in active count"); + } + + @Test + public void testTransitionFromErrorToOffline() throws Exception { + // Test: ERROR -> OFFLINE is a downward transition (active to inactive) + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + + Map currentStates = + createCurrentStates(Map.of(INSTANCE_0, "ERROR"), "MasterSlave"); + + // Action: ERROR -> OFFLINE (standard recovery pattern) + Map bestPossible = Map.of(INSTANCE_0, "OFFLINE"); + + List messages = processAndGetMessages(stateModelDef, currentStates, bestPossible, 1); + + // Verify: Downward transition gets default value (-1) + Assert.assertEquals(messages.size(), 1); + Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), -1, + "ERROR→OFFLINE transitions should not have currentActiveReplicaNumber assigned (downward)"); + } + + // === Tests for DROPPED state handling === + + @Test + public void testDroppedReplicasExcludedFromActiveCount() throws Exception { + // Test: DROPPED replicas are excluded from active count calculations + StateModelDefinition stateModelDef = new StateModelDefinition(generateConfigForMasterSlave()); + + // Setup: 1 MASTER, 1 OFFLINE, 1 OFFLINE (current active = 1) + Map currentStates = createCurrentStates( + Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "OFFLINE", INSTANCE_2, "OFFLINE"), "MasterSlave"); + + // Action: One OFFLINE drops, other becomes SLAVE + Map bestPossible = Map.of(INSTANCE_0, "MASTER", // No change + INSTANCE_1, "DROPPED", // OFFLINE -> DROPPED (downward) + INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (upward) + ); + + List messages = processAndGetMessages(stateModelDef, currentStates, bestPossible, 3); + + // Verify: 2 messages, upward gets active count, DROPPED doesn't + Assert.assertEquals(messages.size(), 2); + + Message upwardMsg = messages.stream() + .filter(m -> m.getTgtName().equals(INSTANCE_2) && m.getToState().equals("SLAVE")) + .findFirst().orElse(null); + Message droppedMsg = messages.stream() + .filter(m -> m.getTgtName().equals(INSTANCE_1) && m.getToState().equals("DROPPED")) + .findFirst().orElse(null); + + Assert.assertNotNull(upwardMsg, "Should have upward transition message"); + Assert.assertNotNull(droppedMsg, "Should have dropped transition message"); + + Assert.assertEquals(upwardMsg.getCurrentActiveReplicaNumber(), 1, + "Upward transition should have current active replica count"); + Assert.assertEquals(droppedMsg.getCurrentActiveReplicaNumber(), -1, + "DROPPED transition should not have currentActiveReplicaNumber"); + } + + // === Helper methods === + + /** + * Creates current state map for multiple instances with specified states. + */ + private Map createCurrentStates(Map instanceStates, + String stateModelRef) { + Map currentStateMap = new HashMap<>(); + for (Map.Entry entry : instanceStates.entrySet()) { + CurrentState currentState = new CurrentState(TEST_RESOURCE); + currentState.setState(PARTITION_0, entry.getValue()); + currentState.setSessionId(SESSION_ID); + currentState.setStateModelDefRef(stateModelRef); + currentStateMap.put(entry.getKey(), currentState); + } + return currentStateMap; + } + + /** + * Creates CurrentStateOutput from current state map. + */ + private CurrentStateOutput createCurrentStateOutput(Map currentStateMap) { + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + for (Map.Entry entry : currentStateMap.entrySet()) { + String instance = entry.getKey(); + CurrentState currentState = entry.getValue(); + currentStateOutput.setCurrentState(TEST_RESOURCE, new Partition(PARTITION_0), instance, + currentState.getState(PARTITION_0)); + } + return currentStateOutput; + } + + /** + * Processes state transitions and returns generated messages. + */ + private List processAndGetMessages(StateModelDefinition stateModelDef, + Map currentStates, Map bestPossible, int instanceCount) + throws Exception { + CurrentStateOutput currentStateOutput = createCurrentStateOutput(currentStates); + ClusterEvent event = prepareClusterEvent(stateModelDef, currentStateOutput, instanceCount); + setBestPossibleState(event, bestPossible); + + process(event); + + MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name()); + return output.getMessages(TEST_RESOURCE, new Partition(PARTITION_0)); + } + + /** + * Processes state transitions for OnlineOffline state model. + */ + private List processAndGetMessagesForOnlineOffline(StateModelDefinition stateModelDef, + Map currentStates, Map bestPossible, int instanceCount) + throws Exception { + CurrentStateOutput currentStateOutput = createCurrentStateOutput(currentStates); + ClusterEvent event = + prepareClusterEventForOnlineOffline(stateModelDef, currentStateOutput, instanceCount); + setBestPossibleState(event, bestPossible); + + process(event); + + MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name()); + return output.getMessages(TEST_RESOURCE, new Partition(PARTITION_0)); + } + + /** + * Sets best possible state in cluster event. + */ + private void setBestPossibleState(ClusterEvent event, Map partitionMap) { + BestPossibleStateOutput bestPossibleOutput = new BestPossibleStateOutput(); + bestPossibleOutput.setState(TEST_RESOURCE, new Partition(PARTITION_0), partitionMap); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleOutput); + } + + /** + * Prepares cluster event with necessary mock objects and configurations. + */ + private ClusterEvent prepareClusterEvent(StateModelDefinition stateModelDef, + CurrentStateOutput currentStateOutput, int instanceCount) { + ClusterEvent event = createBaseClusterEvent(currentStateOutput); + + // Setup ResourceControllerDataProvider + ResourceControllerDataProvider cache = mock(ResourceControllerDataProvider.class); + when(cache.getClusterConfig()).thenReturn(new ClusterConfig(TEST_CLUSTER)); + when(cache.getStateModelDef("MasterSlave")).thenReturn(stateModelDef); + when(cache.getLiveInstances()).thenReturn(createLiveInstances(instanceCount)); + event.addAttribute(AttributeName.ControllerDataProvider.name(), cache); + + // Setup resources + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), + createResourceMap("MasterSlave")); + + return event; + } + + /** + * Prepares cluster event for OnlineOffline state model. + */ + private ClusterEvent prepareClusterEventForOnlineOffline(StateModelDefinition stateModelDef, + CurrentStateOutput currentStateOutput, int instanceCount) { + ClusterEvent event = createBaseClusterEvent(currentStateOutput); + + // Setup ResourceControllerDataProvider for OnlineOffline + ResourceControllerDataProvider cache = mock(ResourceControllerDataProvider.class); + when(cache.getClusterConfig()).thenReturn(new ClusterConfig(TEST_CLUSTER)); + when(cache.getStateModelDef("OfflineOnline")).thenReturn(stateModelDef); + when(cache.getLiveInstances()).thenReturn(createLiveInstances(instanceCount)); + event.addAttribute(AttributeName.ControllerDataProvider.name(), cache); + + // Setup resources for OnlineOffline + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), + createResourceMap("OfflineOnline")); + + return event; + } + + /** + * Creates base cluster event with common attributes. + */ + private ClusterEvent createBaseClusterEvent(CurrentStateOutput currentStateOutput) { + ClusterEvent event = new ClusterEvent(TEST_CLUSTER, ClusterEventType.Unknown); + + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput); + + // Mock HelixManager and HelixDataAccessor + HelixManager manager = mock(HelixManager.class); + when(manager.getInstanceName()).thenReturn("Controller"); + when(manager.getSessionId()).thenReturn(SESSION_ID); + when(manager.getHelixDataAccessor()).thenReturn(mock(HelixDataAccessor.class)); + event.addAttribute(AttributeName.helixmanager.name(), manager); + + return event; + } + + /** + * Creates mock live instances for the specified count. + */ + private Map createLiveInstances(int instanceCount) { + Map liveInstances = new HashMap<>(); + for (int i = 0; i < instanceCount; i++) { + String instanceName = "localhost_" + i; + ZNRecord znRecord = new ZNRecord(instanceName); + znRecord.setEphemeralOwner(Long.parseLong(SESSION_ID)); + + LiveInstance liveInstance = new LiveInstance(znRecord); + liveInstance.setSessionId(SESSION_ID); + liveInstance.setHelixVersion("1.0.0"); + liveInstance.setLiveInstance(instanceName); + + liveInstances.put(instanceName, liveInstance); + } + return liveInstances; + } + + /** + * Creates resource map with specified state model reference. + */ + private Map createResourceMap(String stateModelRef) { + Map resourceMap = new HashMap<>(); + Resource resource = new Resource(TEST_RESOURCE); + resource.setStateModelDefRef(stateModelRef); + resource.addPartition(PARTITION_0); + resourceMap.put(TEST_RESOURCE, resource); + return resourceMap; + } + + /** + * Creates a state transition message. + */ + private Message createMessage(String fromState, String toState, String tgtName) { + Message message = + new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString()); + message.setFromState(fromState); + message.setToState(toState); + message.setTgtName(tgtName); + message.setTgtSessionId(SESSION_ID); + message.setResourceName(TEST_RESOURCE); + message.setPartitionName(PARTITION_0); + message.setStateModelDef("MasterSlave"); + message.setSrcName("Controller"); + message.setSrcSessionId(SESSION_ID); + return message; + } + + /** + * Custom OnlineOffline state model with configurable upper bounds for ONLINE state. + * Enables testing scenarios with specific replica count constraints. + */ + private static final class CustomOnlineOfflineSMD { + private static final String STATE_MODEL_NAME = "CustomOnlineOffline"; + + /** + * States for the CustomOnlineOffline state model + */ + private enum States { + ONLINE, + OFFLINE + } + + /** + * Build OnlineOffline state model definition with custom instance count + * @param instanceCount the maximum number of instances that can be in ONLINE state + * @return StateModelDefinition for OnlineOffline model with custom bounds + */ + public static StateModelDefinition build(int instanceCount) { + if (instanceCount <= 0) { + throw new IllegalArgumentException( + "Instance count must be positive, got: " + instanceCount); + } + + StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME); + + // init state + builder.initialState(States.OFFLINE.name()); + + // add states + builder.addState(States.ONLINE.name(), 0); + builder.addState(States.OFFLINE.name(), 1); + for (final HelixDefinedState state : HelixDefinedState.values()) { + builder.addState(state.name()); + } + + // add transitions + builder.addTransition(States.ONLINE.name(), States.OFFLINE.name(), 0); + builder.addTransition(States.OFFLINE.name(), States.ONLINE.name(), 1); + builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name()); + + // bounds - uses the instanceCount parameter + builder.dynamicUpperBound(States.ONLINE.name(), String.valueOf(instanceCount)); + + return builder.build(); + } + } +}