From 14766e770efc8b44619ba18e11140c8c4c77875c Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Fri, 23 Aug 2024 13:53:46 -0700 Subject: [PATCH 1/3] log on npe --- .../stages/CurrentStateComputationStage.java | 6 +- .../helix/integration/TestWagedNPE.java | 124 ++++++++++++++++++ 2 files changed, 125 insertions(+), 5 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index da972d682c..77f70e8bdf 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -364,10 +364,6 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat .filter(entry -> WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - if (wagedEnabledResourceMap.isEmpty()) { - return; - } - // Phase 1: Rebuild Always WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(cache); WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(cache); @@ -385,7 +381,7 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat */ static boolean skipCapacityCalculation(ResourceControllerDataProvider cache, Map resourceMap, ClusterEvent event) { - if (resourceMap == null || resourceMap.isEmpty()) { + if (resourceMap == null) { return true; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java new file mode 100644 index 0000000000..de845a398d --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java @@ -0,0 +1,124 @@ +package org.apache.helix.integration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestWagedNPE extends ZkTestBase { + + public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster"; + public static int PARTICIPANT_COUNT = 10; + public static List _participants = new ArrayList<>(); + public static ClusterControllerManager _controller; + public static ConfigAccessor _configAccessor; + + @BeforeClass + public void beforeClass() { + System.out.println("Start test " + TestHelper.getTestClassName()); + _gSetupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < PARTICIPANT_COUNT; i++) { + addParticipant("localhost_" + i); + } + + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + String testCapacityKey = "TestCapacityKey"; + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey)); + clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1)); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } + + @Test + public void testNPE() throws Exception { + String firstDB = "firstDB"; + int numPartition = 10; + _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby", + IdealState.RebalanceMode.FULL_AUTO.name(), null); + + IdealState idealStateOne = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB); + idealStateOne.setMinActiveReplicas(2); + idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName()); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3); + + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(Collections.singleton(firstDB)) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + + Assert.assertTrue(verifier.verifyByPolling()); + + // drop resource + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, firstDB); + + // add instance + MockParticipantManager participantToAdd = addParticipant("instance_to_add"); + verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(new HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME))) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // add resource again + String secondDb = "secondDB"; + _configAccessor.setResourceConfig(CLUSTER_NAME, secondDb, new ResourceConfig(secondDb)); + _gSetupTool.addResourceToCluster(CLUSTER_NAME, secondDb, numPartition, "LeaderStandby", + IdealState.RebalanceMode.FULL_AUTO.name(), null); + IdealState idealStateTwo = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, secondDb); + idealStateTwo.setMinActiveReplicas(2); + idealStateTwo.setRebalancerClassName(WagedRebalancer.class.getName()); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, secondDb, idealStateTwo); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, secondDb, 3); + + + verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(Collections.singleton(secondDb)) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + Assert.assertTrue(verifier.verifyByPolling()); + } + + public MockParticipantManager addParticipant(String instanceName) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName); + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + participant.syncStart(); + _participants.add(participant); + return participant; + } + + public void dropParticipant(String instanceName) { + MockParticipantManager participantToDrop = _participants.stream() + .filter(p -> p.getInstanceName().equals(instanceName)).findFirst().get(); + dropParticipant(participantToDrop); + + } + + public void dropParticipant(MockParticipantManager participantToDrop) { + participantToDrop.syncStop(); + _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantToDrop.getInstanceName())); + _participants.remove(participantToDrop); + } +} From 88df65a61b7b78e5dd63a3692091ab2687cf180d Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Thu, 21 Nov 2024 14:12:20 -0800 Subject: [PATCH 2/3] fix --- .../stages/CurrentStateComputationStage.java | 13 ++++- .../helix/integration/TestWagedNPE.java | 55 +++++++------------ 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 77f70e8bdf..2d41aaff70 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -356,6 +356,11 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat CurrentStateOutput currentStateOutput) { Map resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); if (skipCapacityCalculation(cache, resourceMap, event)) { + // Ensure instance capacity is null if there are no resources. This prevents using a stale map when all resources + // are removed and then a new resource is added. + if (resourceMap == null || resourceMap.isEmpty()) { + cache.setWagedCapacityProviders(null, null); + } return; } @@ -364,6 +369,12 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat .filter(entry -> WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + // Ensure instance capacity is null if there are no WAGED enabled instances + if (wagedEnabledResourceMap.isEmpty()) { + cache.setWagedCapacityProviders(null, null); + return; + } + // Phase 1: Rebuild Always WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(cache); WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(cache); @@ -381,7 +392,7 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat */ static boolean skipCapacityCalculation(ResourceControllerDataProvider cache, Map resourceMap, ClusterEvent event) { - if (resourceMap == null) { + if (resourceMap == null || resourceMap.isEmpty()) { return true; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java index de845a398d..57930fa712 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java @@ -7,6 +7,8 @@ import org.apache.helix.ConfigAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; @@ -22,7 +24,7 @@ public class TestWagedNPE extends ZkTestBase { public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster"; - public static int PARTICIPANT_COUNT = 10; + public static int PARTICIPANT_COUNT = 3; public static List _participants = new ArrayList<>(); public static ClusterControllerManager _controller; public static ConfigAccessor _configAccessor; @@ -48,13 +50,19 @@ public void beforeClass() { _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); } + // This test was constructed to capture the bug described in issue 2891 + // https://github.com/apache/helix/issues/2891 @Test public void testNPE() throws Exception { + int numPartition = 3; + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + + // Create 1 WAGED Resource String firstDB = "firstDB"; - int numPartition = 10; _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby", IdealState.RebalanceMode.FULL_AUTO.name(), null); - IdealState idealStateOne = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB); idealStateOne.setMinActiveReplicas(2); @@ -62,25 +70,22 @@ public void testNPE() throws Exception { _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3); - BestPossibleExternalViewVerifier verifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(Collections.singleton(firstDB)) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); - + // Wait for cluster to converge Assert.assertTrue(verifier.verifyByPolling()); - // drop resource + // Drop resource _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, firstDB); + // Wait for cluster to converge + Assert.assertTrue(verifier.verifyByPolling()); + // add instance - MockParticipantManager participantToAdd = addParticipant("instance_to_add"); - verifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(new HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME))) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + addParticipant("instance_to_add"); + + // Wait for cluster to converge Assert.assertTrue(verifier.verifyByPolling()); - // add resource again + // Add a new resource String secondDb = "secondDB"; _configAccessor.setResourceConfig(CLUSTER_NAME, secondDb, new ResourceConfig(secondDb)); _gSetupTool.addResourceToCluster(CLUSTER_NAME, secondDb, numPartition, "LeaderStandby", @@ -92,11 +97,7 @@ public void testNPE() throws Exception { _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, secondDb, idealStateTwo); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, secondDb, 3); - - verifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(Collections.singleton(secondDb)) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + // Confirm cluster can converge. Cluster will not converge if NPE occurs during pipeline run Assert.assertTrue(verifier.verifyByPolling()); } @@ -107,18 +108,4 @@ public MockParticipantManager addParticipant(String instanceName) { _participants.add(participant); return participant; } - - public void dropParticipant(String instanceName) { - MockParticipantManager participantToDrop = _participants.stream() - .filter(p -> p.getInstanceName().equals(instanceName)).findFirst().get(); - dropParticipant(participantToDrop); - - } - - public void dropParticipant(MockParticipantManager participantToDrop) { - participantToDrop.syncStop(); - _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, - _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantToDrop.getInstanceName())); - _participants.remove(participantToDrop); - } } From 5de569069fa8aca7867200f82eb530cc2a66f693 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Thu, 19 Dec 2024 13:09:26 -0800 Subject: [PATCH 3/3] respond reviewer feedback --- .../dataproviders/ResourceControllerDataProvider.java | 8 ++++++++ .../controller/stages/CurrentStateComputationStage.java | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 021aab6a84..b006a4f1e0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -515,6 +515,14 @@ public void setWagedCapacityProviders(WagedInstanceCapacity capacityProvider, Wa _wagedPartitionWeightProvider = resourceWeightProvider; } + /** + * Clears the WAGED algorithm specific instance capacity provider and resource weight provider. + */ + public void clearWagedCapacityProviders() { + _wagedInstanceCapacity = null; + _wagedPartitionWeightProvider = null; + } + /** * Check and reduce the capacity of an instance for a resource partition * @param instance - the instance to check diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 2d41aaff70..2814e4062f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -359,7 +359,7 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat // Ensure instance capacity is null if there are no resources. This prevents using a stale map when all resources // are removed and then a new resource is added. if (resourceMap == null || resourceMap.isEmpty()) { - cache.setWagedCapacityProviders(null, null); + cache.clearWagedCapacityProviders(); } return; } @@ -371,7 +371,7 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat // Ensure instance capacity is null if there are no WAGED enabled instances if (wagedEnabledResourceMap.isEmpty()) { - cache.setWagedCapacityProviders(null, null); + cache.clearWagedCapacityProviders(); return; }