From 19d8087b9012020b999b9929ed9c1b96b31ef73c Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Mon, 4 Aug 2025 10:10:21 -0700 Subject: [PATCH 1/3] Make virtual group assignment for FaultZoneBasedVirtualGroupAssignmentAlgorithm stable --- ...eBasedVirtualGroupAssignmentAlgorithm.java | 52 +++++-- ...tFaultZoneBasedVirtualGroupAssignment.java | 132 +++++++++++------- .../rest/server/TestClusterAccessor.java | 10 +- 3 files changed, 128 insertions(+), 66 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java index 4b22fc99bc..f588b1feab 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java @@ -128,20 +128,35 @@ private void distributeUnassignedZones( // Priority queue sorted by current load of the virtual group // We always assign new zones to the group with the smallest load to keep them balanced. - Queue minHeap = new PriorityQueue<>( - Comparator.comparingInt(vg -> - virtualGroupToZoneMapping.get(vg).stream() - .map(zoneMapping::get) - .mapToInt(Set::size) - .sum() - ) - ); + Queue minHeap = new PriorityQueue<>((o1, o2) -> { + int load1 = computeGroupLoad(virtualGroupToZoneMapping, o1, zoneMapping); + int load2 = computeGroupLoad(virtualGroupToZoneMapping, o2, zoneMapping); + + int loadDiff = Integer.compare(load1, load2); + + // If the loads are not equal, return the difference + if (loadDiff != 0) { + return loadDiff; + } + + // When loads are equal, sort by group name to ensure consistent ordering + return o1.compareTo(o2); + }); // Seed the min-heap with existing groups minHeap.addAll(virtualGroupToZoneMapping.keySet()); // Sort unassigned zones by descending number of unassigned instances, assigning "heavier" zones first. - unassignedZones.sort(Comparator.comparingInt(zone -> zoneMapping.get(zone).size()) - .reversed()); + unassignedZones.sort((o1, o2) -> { + int zone1Size = zoneMapping.get(o1).size(); + int zone2Size = zoneMapping.get(o2).size(); + + if (zone1Size != zone2Size) { + return Integer.compare(zone2Size, zone1Size); // Sort by size descending + } + + // If sizes are equal, sort by zone name to ensure consistent ordering + return o1.compareTo(o2); + }); // Assign each zone to the least-loaded group for (String zone : unassignedZones) { @@ -170,4 +185,21 @@ private Map> constructResult(Map> vgToZo } return result; } + + /** + * Computes the load of a virtual group based on the number of instances across all zones in that group. + * + * @param virtualGroupToZoneMapping Mapping of virtual group -> set of zones. + * @param group The virtual group for which to compute the load. + * @param zoneMapping Mapping of zone -> set of instances. + * @return The total number of instances across all zones in the specified virtual group. + */ + private int computeGroupLoad( + Map> virtualGroupToZoneMapping, String group, + Map> zoneMapping) { + // The load of a group is defined as the total number of instances across all zones in that group. + return virtualGroupToZoneMapping.getOrDefault(group, Collections.emptySet()).stream() + .mapToInt(zone -> zoneMapping.getOrDefault(zone, Collections.emptySet()).size()) + .sum(); + } } diff --git a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java index 85463fe522..7a2a39e8e7 100644 --- a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java +++ b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java @@ -1,7 +1,12 @@ package org.apache.helix.cloud.virtualTopologyGroup; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -53,6 +58,31 @@ public void testAssignmentScheme(int numGroups, Map> expecte algorithm.computeAssignment(numGroups, GROUP_NAME, zoneMapping, virtualMapping), expected); } + @Test + public void testStableVirtualZoneAssignment() { + VirtualGroupAssignmentAlgorithm algorithm = FaultZoneBasedVirtualGroupAssignmentAlgorithm.getInstance(); + int numGroups = 4; + + Map> virtualMapping1 = algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping); + Map> virtualMapping2 = algorithm.computeAssignment(numGroups, GROUP_NAME, shuffleZoneMapping(_zoneMapping)); + Map> virtualMapping3 = algorithm.computeAssignment(numGroups, GROUP_NAME, shuffleZoneMapping(_zoneMapping)); + + Assert.assertEquals(virtualMapping1, virtualMapping2); + Assert.assertEquals(virtualMapping1, virtualMapping3); + } + + private Map> shuffleZoneMapping(Map> virtualMapping) { + LinkedHashMap> shuffledMapping = new LinkedHashMap<>(); + List keys = new ArrayList<>(virtualMapping.keySet()); + Collections.shuffle(keys); + for (String key : keys) { + ArrayList instances = new ArrayList<>(virtualMapping.get(key)); + Collections.shuffle(instances); + shuffledMapping.put(key, new LinkedHashSet<>(instances)); + } + return shuffledMapping; + } + @DataProvider public Object[][] getMappingTests() { VirtualGroupAssignmentAlgorithm algorithm = FaultZoneBasedVirtualGroupAssignmentAlgorithm.getInstance(); @@ -61,105 +91,105 @@ public Object[][] getMappingTests() { Map> virtualMapping = new HashMap<>(); virtualMapping.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>()); - virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_5")); - virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_1")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_0")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_12")); virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_16")); - virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_7")); - virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_14")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_2")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_6")); virtualMapping.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>()); - virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_0")); - virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_12")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_1")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_13")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_17")); virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_3")); - virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_18")); - virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_10")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_7")); virtualMapping.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>()); - virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_17")); - virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_9")); - virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_11")); - virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_19")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_10")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_14")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_18")); virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_4")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_8")); virtualMapping.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>()); - virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_13")); - virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_6")); - virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_2")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_11")); virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_15")); - virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_8")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_19")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_5")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_9")); Map> virtualMapping2 = new HashMap<>(); virtualMapping2.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>()); - virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_1")); - virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_12")); + virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_0")); + virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_15")); + virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_4")); virtualMapping2.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>()); - virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_0")); + virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_1")); + virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_16")); virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_5")); - virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_13")); virtualMapping2.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_10")); virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_17")); virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_6")); - virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_15")); virtualMapping2.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>()); - virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_19")); - virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_8")); - virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_9")); + virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_11")); + virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_18")); + virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_7")); virtualMapping2.put(computeVirtualGroupId(4, GROUP_NAME), new HashSet<>()); - virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_7")); - virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_10")); - virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_18")); + virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_12")); + virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_19")); + virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_8")); virtualMapping2.put(computeVirtualGroupId(5, GROUP_NAME), new HashSet<>()); - virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_3")); - virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_16")); - virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_14")); + virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_13")); + virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_2")); + virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_9")); virtualMapping2.put(computeVirtualGroupId(6, GROUP_NAME), new HashSet<>()); - virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_11")); - virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_2")); - virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_4")); + virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_14")); + virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_3")); Map> virtualMapping3 = new HashMap<>(); virtualMapping3.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>()); - virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_1")); - virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_12")); - virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_20")); + virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_0")); + virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_15")); + virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_4")); virtualMapping3.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>()); - virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_0")); + virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_1")); + virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_16")); virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_5")); - virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_13")); virtualMapping3.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_10")); virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_17")); virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_6")); - virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_15")); virtualMapping3.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>()); - virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_19")); - virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_8")); - virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_9")); + virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_11")); + virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_18")); + virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_7")); virtualMapping3.put(computeVirtualGroupId(4, GROUP_NAME), new HashSet<>()); - virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_7")); - virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_10")); - virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_18")); + virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_12")); + virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_19")); + virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_8")); virtualMapping3.put(computeVirtualGroupId(5, GROUP_NAME), new HashSet<>()); - virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_3")); - virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_16")); - virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_14")); + virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_13")); + virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_2")); + virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_9")); virtualMapping3.put(computeVirtualGroupId(6, GROUP_NAME), new HashSet<>()); - virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_11")); - virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_2")); - virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_4")); + virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_14")); + virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_3")); + virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_20")); return new Object[][]{{4, virtualMapping, algorithm, _zoneMapping, new HashMap<>()}, {7, virtualMapping2, algorithm, _zoneMapping, new HashMap<>()}, diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index 619a781272..f9af0a9c2b 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -353,7 +353,7 @@ public Object[][] prepareVirtualTopologyTests() { setupClusterForVirtualTopology(VG_CLUSTER); String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}"; String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}"; - // Split 5 zones into 2 virtual groups, expect 0-1-2 in virtual group 0, 3-4 in virtual group 1 + // Split 5 zones into 2 virtual groups, expect 0-2-4 in virtual group 0, 1-3 in virtual group 1 String test3 = "{\"virtualTopologyGroupNumber\":\"2\",\"virtualTopologyGroupName\":\"vgTest\"," + "\"assignmentAlgorithmType\":\"ZONE_BASED\"}"; String test4 = "{\"virtualTopologyGroupNumber\":\"5\",\"virtualTopologyGroupName\":\"vgTest\"," @@ -384,10 +384,10 @@ public Object[][] prepareVirtualTopologyTests() { "vgCluster_localhost_12925", "vgTest_1", "vgCluster_localhost_12927", "vgTest_0")}, {test4, 5, ImmutableMap.of( - "vgCluster_localhost_12918", "vgTest_4", - "vgCluster_localhost_12919", "vgTest_4", - "vgCluster_localhost_12925", "vgTest_2", - "vgCluster_localhost_12927", "vgTest_1")}, + "vgCluster_localhost_12918", "vgTest_0", + "vgCluster_localhost_12919", "vgTest_0", + "vgCluster_localhost_12925", "vgTest_3", + "vgCluster_localhost_12927", "vgTest_4")}, // repeat test3 for deterministic and test for decreasing numGroups {test3, 2, ImmutableMap.of( "vgCluster_localhost_12918", "vgTest_0", From a33525867855514dfed7a8b3c94af65f542f3d9e Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Tue, 5 Aug 2025 05:45:11 -0700 Subject: [PATCH 2/3] Update a test case name --- .../TestFaultZoneBasedVirtualGroupAssignment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java index 7a2a39e8e7..58959916e6 100644 --- a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java +++ b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java @@ -59,7 +59,7 @@ public void testAssignmentScheme(int numGroups, Map> expecte } @Test - public void testStableVirtualZoneAssignment() { + public void testDeterministicVirtualZoneAssignment() { VirtualGroupAssignmentAlgorithm algorithm = FaultZoneBasedVirtualGroupAssignmentAlgorithm.getInstance(); int numGroups = 4; From 26fe84f20830dd73f588490407d9a893997ffd81 Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Thu, 7 Aug 2025 17:16:48 -0700 Subject: [PATCH 3/3] Use more readable comparator chaining --- ...eBasedVirtualGroupAssignmentAlgorithm.java | 61 ++++--------------- 1 file changed, 13 insertions(+), 48 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java index f588b1feab..5530a91690 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java @@ -20,20 +20,14 @@ */ import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; -import java.util.TreeSet; -import java.util.stream.Collectors; - -import org.apache.commons.math3.util.Pair; import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualGroupId; @@ -128,35 +122,23 @@ private void distributeUnassignedZones( // Priority queue sorted by current load of the virtual group // We always assign new zones to the group with the smallest load to keep them balanced. - Queue minHeap = new PriorityQueue<>((o1, o2) -> { - int load1 = computeGroupLoad(virtualGroupToZoneMapping, o1, zoneMapping); - int load2 = computeGroupLoad(virtualGroupToZoneMapping, o2, zoneMapping); - - int loadDiff = Integer.compare(load1, load2); - - // If the loads are not equal, return the difference - if (loadDiff != 0) { - return loadDiff; - } - - // When loads are equal, sort by group name to ensure consistent ordering - return o1.compareTo(o2); - }); + // If loads are equal, sort by zone name to ensure consistent ordering + Queue minHeap = new PriorityQueue<>( + Comparator.comparingInt(vg -> + virtualGroupToZoneMapping.get(vg).stream() + .map(zoneMapping::get) + .mapToInt(Set::size) + .sum() + ).thenComparing(String::compareTo) + ); // Seed the min-heap with existing groups minHeap.addAll(virtualGroupToZoneMapping.keySet()); // Sort unassigned zones by descending number of unassigned instances, assigning "heavier" zones first. - unassignedZones.sort((o1, o2) -> { - int zone1Size = zoneMapping.get(o1).size(); - int zone2Size = zoneMapping.get(o2).size(); - - if (zone1Size != zone2Size) { - return Integer.compare(zone2Size, zone1Size); // Sort by size descending - } - - // If sizes are equal, sort by zone name to ensure consistent ordering - return o1.compareTo(o2); - }); + // If sizes are equal, sort by zone name to ensure consistent ordering + unassignedZones.sort(Comparator.comparingInt(zone -> zoneMapping.get(zone).size()) + .reversed() + .thenComparing(String::compareTo)); // Assign each zone to the least-loaded group for (String zone : unassignedZones) { @@ -185,21 +167,4 @@ private Map> constructResult(Map> vgToZo } return result; } - - /** - * Computes the load of a virtual group based on the number of instances across all zones in that group. - * - * @param virtualGroupToZoneMapping Mapping of virtual group -> set of zones. - * @param group The virtual group for which to compute the load. - * @param zoneMapping Mapping of zone -> set of instances. - * @return The total number of instances across all zones in the specified virtual group. - */ - private int computeGroupLoad( - Map> virtualGroupToZoneMapping, String group, - Map> zoneMapping) { - // The load of a group is defined as the total number of instances across all zones in that group. - return virtualGroupToZoneMapping.getOrDefault(group, Collections.emptySet()).stream() - .mapToInt(zone -> zoneMapping.getOrDefault(zone, Collections.emptySet()).size()) - .sum(); - } }