Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,14 @@
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import org.apache.commons.math3.util.Pair;

import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualGroupId;

Expand Down Expand Up @@ -128,20 +122,23 @@ private void distributeUnassignedZones(

// Priority queue sorted by current load of the virtual group
// We always assign new zones to the group with the smallest load to keep them balanced.
// If loads are equal, sort by zone name to ensure consistent ordering
Queue<String> minHeap = new PriorityQueue<>(
Comparator.comparingInt(vg ->
Comparator.<String>comparingInt(vg ->
virtualGroupToZoneMapping.get(vg).stream()
.map(zoneMapping::get)
.mapToInt(Set::size)
.sum()
)
).thenComparing(String::compareTo)
);
// Seed the min-heap with existing groups
minHeap.addAll(virtualGroupToZoneMapping.keySet());

// Sort unassigned zones by descending number of unassigned instances, assigning "heavier" zones first.
unassignedZones.sort(Comparator.comparingInt(zone -> zoneMapping.get(zone).size())
.reversed());
// If sizes are equal, sort by zone name to ensure consistent ordering
unassignedZones.sort(Comparator.<String>comparingInt(zone -> zoneMapping.get(zone).size())
.reversed()
.thenComparing(String::compareTo));

// Assign each zone to the least-loaded group
for (String zone : unassignedZones) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.apache.helix.cloud.virtualTopologyGroup;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -53,6 +58,31 @@ public void testAssignmentScheme(int numGroups, Map<String, Set<String>> expecte
algorithm.computeAssignment(numGroups, GROUP_NAME, zoneMapping, virtualMapping), expected);
}

@Test
public void testDeterministicVirtualZoneAssignment() {
VirtualGroupAssignmentAlgorithm algorithm = FaultZoneBasedVirtualGroupAssignmentAlgorithm.getInstance();
int numGroups = 4;

Map<String, Set<String>> virtualMapping1 = algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping);
Map<String, Set<String>> virtualMapping2 = algorithm.computeAssignment(numGroups, GROUP_NAME, shuffleZoneMapping(_zoneMapping));
Map<String, Set<String>> virtualMapping3 = algorithm.computeAssignment(numGroups, GROUP_NAME, shuffleZoneMapping(_zoneMapping));

Assert.assertEquals(virtualMapping1, virtualMapping2);
Assert.assertEquals(virtualMapping1, virtualMapping3);
}

private Map<String, Set<String>> shuffleZoneMapping(Map<String, Set<String>> virtualMapping) {
LinkedHashMap<String, Set<String>> shuffledMapping = new LinkedHashMap<>();
List<String> keys = new ArrayList<>(virtualMapping.keySet());
Collections.shuffle(keys);
for (String key : keys) {
ArrayList<String> instances = new ArrayList<>(virtualMapping.get(key));
Collections.shuffle(instances);
shuffledMapping.put(key, new LinkedHashSet<>(instances));
}
return shuffledMapping;
}

@DataProvider
public Object[][] getMappingTests() {
VirtualGroupAssignmentAlgorithm algorithm = FaultZoneBasedVirtualGroupAssignmentAlgorithm.getInstance();
Expand All @@ -61,105 +91,105 @@ public Object[][] getMappingTests() {
Map<String, Set<String>> virtualMapping = new HashMap<>();

virtualMapping.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>());
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_5"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_1"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_0"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_12"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_16"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_7"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_14"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_2"));
virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_6"));

virtualMapping.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>());
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_0"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_12"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_1"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_13"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_17"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_3"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_18"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_10"));
virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_7"));

virtualMapping.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>());
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_17"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_9"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_11"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_19"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_10"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_14"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_18"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_4"));
virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_8"));

virtualMapping.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>());
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_13"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_6"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_2"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_11"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_15"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_8"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_19"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_5"));
virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_9"));


Map<String, Set<String>> virtualMapping2 = new HashMap<>();
virtualMapping2.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>());
virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_1"));
virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_12"));
virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_0"));
virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_15"));
virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_4"));

virtualMapping2.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>());
virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_0"));
virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_1"));
virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_16"));
virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_5"));
virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_13"));

virtualMapping2.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>());
virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_10"));
virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_17"));
virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_6"));
virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_15"));

virtualMapping2.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>());
virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_19"));
virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_8"));
virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_9"));
virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_11"));
virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_18"));
virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_7"));

virtualMapping2.put(computeVirtualGroupId(4, GROUP_NAME), new HashSet<>());
virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_7"));
virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_10"));
virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_18"));
virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_12"));
virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_19"));
virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_8"));

virtualMapping2.put(computeVirtualGroupId(5, GROUP_NAME), new HashSet<>());
virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_3"));
virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_16"));
virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_14"));
virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_13"));
virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_2"));
virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_9"));

virtualMapping2.put(computeVirtualGroupId(6, GROUP_NAME), new HashSet<>());
virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_11"));
virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_2"));
virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_4"));
virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_14"));
virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_3"));


Map<String, Set<String>> virtualMapping3 = new HashMap<>();
virtualMapping3.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>());
virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_1"));
virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_12"));
virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_20"));
virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_0"));
virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_15"));
virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_4"));

virtualMapping3.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>());
virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_0"));
virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_1"));
virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_16"));
virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_5"));
virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_13"));

virtualMapping3.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>());
virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_10"));
virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_17"));
virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_6"));
virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_15"));

virtualMapping3.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>());
virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_19"));
virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_8"));
virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_9"));
virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_11"));
virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_18"));
virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_7"));

virtualMapping3.put(computeVirtualGroupId(4, GROUP_NAME), new HashSet<>());
virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_7"));
virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_10"));
virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_18"));
virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_12"));
virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_19"));
virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_8"));

virtualMapping3.put(computeVirtualGroupId(5, GROUP_NAME), new HashSet<>());
virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_3"));
virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_16"));
virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_14"));
virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_13"));
virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_2"));
virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_9"));

virtualMapping3.put(computeVirtualGroupId(6, GROUP_NAME), new HashSet<>());
virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_11"));
virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_2"));
virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_4"));
virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_14"));
virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_3"));
virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_20"));

return new Object[][]{{4, virtualMapping, algorithm, _zoneMapping, new HashMap<>()},
{7, virtualMapping2, algorithm, _zoneMapping, new HashMap<>()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public Object[][] prepareVirtualTopologyTests() {
setupClusterForVirtualTopology(VG_CLUSTER);
String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}";
String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}";
// Split 5 zones into 2 virtual groups, expect 0-1-2 in virtual group 0, 3-4 in virtual group 1
// Split 5 zones into 2 virtual groups, expect 0-2-4 in virtual group 0, 1-3 in virtual group 1
String test3 = "{\"virtualTopologyGroupNumber\":\"2\",\"virtualTopologyGroupName\":\"vgTest\","
+ "\"assignmentAlgorithmType\":\"ZONE_BASED\"}";
String test4 = "{\"virtualTopologyGroupNumber\":\"5\",\"virtualTopologyGroupName\":\"vgTest\","
Expand Down Expand Up @@ -384,10 +384,10 @@ public Object[][] prepareVirtualTopologyTests() {
"vgCluster_localhost_12925", "vgTest_1",
"vgCluster_localhost_12927", "vgTest_0")},
{test4, 5, ImmutableMap.of(
"vgCluster_localhost_12918", "vgTest_4",
"vgCluster_localhost_12919", "vgTest_4",
"vgCluster_localhost_12925", "vgTest_2",
"vgCluster_localhost_12927", "vgTest_1")},
"vgCluster_localhost_12918", "vgTest_0",
"vgCluster_localhost_12919", "vgTest_0",
"vgCluster_localhost_12925", "vgTest_3",
"vgCluster_localhost_12927", "vgTest_4")},
// repeat test3 for deterministic and test for decreasing numGroups
{test3, 2, ImmutableMap.of(
"vgCluster_localhost_12918", "vgTest_0",
Expand Down