Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
87f0e34
Set up helix-gateway folder, add PoC code and add protobuf (#2826)
junkaixue Jul 8, 2024
30b9094
Add protobuff definition and an empty grpc service (#2834)
xyuanlu Jul 17, 2024
9449a04
Build HelixGateway integration test base (#2842)
junkaixue Jul 22, 2024
2b44e87
Refactor and remove mock classes (#2841)
xyuanlu Jul 22, 2024
2d619df
Gateway service - service structure dummy class (#2840)
xyuanlu Jul 23, 2024
02e6e83
Gateway util for per key lock and per key blocking queue executor(#2847)
xyuanlu Jul 24, 2024
00db566
Implement GatewayServiceManager (#2844)
xyuanlu Jul 25, 2024
430d20a
Add license to gateway service (#2851)
xyuanlu Jul 26, 2024
7c06069
Implement Helix ST handling logic and HelixGatewayParticipant (#2845)
zpinto Jul 31, 2024
60845e8
API to close grpc client stream connection from server side (#2856)
xyuanlu Jul 31, 2024
b37fa83
Implement helix manager disconnect and client disconnect handling for…
zpinto Aug 2, 2024
765398a
Interfaces of gateway service (#2871)
xyuanlu Aug 6, 2024
b31f2b9
Refine gateway service interface (#2875)
xyuanlu Aug 8, 2024
c5eaa8a
Create Gateway service channel factory (#2883)
xyuanlu Aug 23, 2024
c9f7d05
Gateway - User report their shards' current state instead of state tr…
xyuanlu Aug 28, 2024
ad33675
Add helix-gateway to bump-up.sh and bump-snapshot.sh (#2899)
zpinto Sep 4, 2024
bbb031a
Gateway - Add GatewayCurrentStateCache for gateway service (#2895)
xyuanlu Sep 4, 2024
d8d1c29
Gateway - Implementing poll-mode channel (#2900)
xyuanlu Sep 9, 2024
0059136
Create condition based rebalancer (#2846)
frankmu Jul 25, 2024
2663786
Add missing license for rebalance condition files (#2853)
frankmu Jul 26, 2024
1cdd250
Helix stickiness rebalancer (#2878)
frankmu Aug 9, 2024
5a611cc
Move existing assignments usage calculation to pre-process stage (#2888)
frankmu Aug 29, 2024
ba8c9f1
Add test to ensure no partition movement when nodes restart (#2896)
frankmu Aug 30, 2024
c515551
Add null check for getClusterConfig()
tengfmu Sep 9, 2024
38ba820
Switch to compatible grpc version (#2904)
zpinto Sep 6, 2024
9b03e1b
Use grpc.version for io.grpc version (#2905)
zpinto Sep 7, 2024
6ffa3d3
Expose setting gateway service channel to allow external managment of…
zpinto Sep 12, 2024
663c9aa
Gateway - gateway participant update target state in cache (#2910)
xyuanlu Sep 13, 2024
f6cecbd
Include java stubs in jdk11 jar. (#2920)
zpinto Sep 13, 2024
bd80ae0
Add helix-gateway stubs to exported packages.(#2923)
zpinto Sep 17, 2024
ef0175d
Add an end to end test for helix gateway (#2922)
xyuanlu Sep 23, 2024
3fea937
Synchronize calls to StreamObserver methods (#2934)
zpinto Oct 8, 2024
a685f41
Fix gateway e2e test - set timestamp in second instead of ms (#2942)
xyuanlu Oct 8, 2024
7c4e4dc
Add getter for all target state - gateway service (#2943)
xyuanlu Oct 10, 2024
1c55b73
Add log and refine code style for Gateway service (#2950)
xyuanlu Oct 18, 2024
3873e5d
Make StickyRebalanceStrategy topology aware (#2944)
frankmu Oct 31, 2024
8926f97
Fix version in pom
junkaixue Dec 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/Helix-PR-CI.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Helix PR CI
on:
pull_request:
branches: [ master, metaclient, ApplicationClusterManager] # TODO: remove side branch
branches: [ master, metaclient, ApplicationClusterManager, helix-gateway-service] # TODO: remove side branch
paths-ignore:
- '.github/**'
- 'helix-front/**'
Expand Down
1 change: 1 addition & 0 deletions bump-snapshot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mv metrics-common/metrics-common-$version-SNAPSHOT.ivy metrics-common/metrics-co
mv zookeeper-api/zookeeper-api-$version-SNAPSHOT.ivy zookeeper-api/zookeeper-api-$new_version-SNAPSHOT.ivy
mv helix-view-aggregator/helix-view-aggregator-$version-SNAPSHOT.ivy helix-view-aggregator/helix-view-aggregator-$new_version-SNAPSHOT.ivy
mv meta-client/meta-client-$version-SNAPSHOT.ivy meta-client/meta-client-$new_version-SNAPSHOT.ivy
mv helix-gateway/helix-gateway-$version-SNAPSHOT.ivy helix-gateway/helix-gateway-$new_version-SNAPSHOT.ivy


find . -type f -name '*.ivy' -exec sed -i "s/$version/$new_version/g" {} \;
Expand Down
2 changes: 1 addition & 1 deletion bump-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ echo "bump up: $current_version -> $new_version"
update_pom_version "pom.xml" $current_version

for module in "metrics-common" "metadata-store-directory-common" "zookeeper-api" "helix-common" "helix-core" \
"helix-rest" "helix-lock" "helix-view-aggregator" "helix-agent" "meta-client"; do
"helix-rest" "helix-lock" "helix-view-aggregator" "helix-agent" "meta-client" "helix-gateway"; do
update_ivy $module
update_pom_version $module/pom.xml $current_version
done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,55 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.InstanceConfig;

/**
* A Node is an entity that can serve capacity recording purpose. It has a capacity and knowledge
* of partitions assigned to it, so it can decide if it can receive additional partitions.
*/
public class CapacityNode {
public class CapacityNode implements Comparable<CapacityNode> {
private int _currentlyAssigned;
private int _capacity;
private final String _id;
private final String _instanceName;
private final String _logicaId;
private final String _faultZone;
private final Map<String, Set<String>> _partitionMap;

public CapacityNode(String id) {
_partitionMap = new HashMap<>();
_currentlyAssigned = 0;
this._id = id;
/**
* Constructor used for non-topology-aware use case
* @param instanceName The instance name of this node
* @param capacity The capacity of this node
*/
public CapacityNode(String instanceName, int capacity) {
this(instanceName, null, null, null);
this._capacity = capacity;
}

/**
* Constructor used for topology-aware use case
* @param instanceName The instance name of this node
* @param clusterConfig The cluster config for current helix cluster
* @param clusterTopologyConfig The cluster topology config for current helix cluster
* @param instanceConfig The instance config for current instance
*/
public CapacityNode(String instanceName, ClusterConfig clusterConfig,
ClusterTopologyConfig clusterTopologyConfig, InstanceConfig instanceConfig) {
this._instanceName = instanceName;
this._logicaId = clusterTopologyConfig != null ? instanceConfig.getLogicalId(
clusterTopologyConfig.getEndNodeType()) : instanceName;
this._faultZone =
clusterConfig != null ? computeFaultZone(clusterConfig, instanceConfig) : null;
this._partitionMap = new HashMap<>();
this._capacity =
clusterConfig != null ? clusterConfig.getGlobalMaxPartitionAllowedPerInstance() : 0;
this._currentlyAssigned = 0;
}

/**
Expand All @@ -52,11 +84,25 @@ public boolean canAdd(String resource, String partition) {
&& _partitionMap.get(resource).contains(partition))) {
return false;
}

// Add the partition to the resource's set of partitions in this node
_partitionMap.computeIfAbsent(resource, k -> new HashSet<>()).add(partition);
_currentlyAssigned++;
return true;
}

/**
* Checks if a specific resource + partition is assigned to this node.
*
* @param resource the name of the resource
* @param partition the partition
* @return {@code true} if the resource + partition is assigned to this node, {@code false} otherwise
*/
public boolean hasPartition(String resource, String partition) {
Set<String> partitions = _partitionMap.get(resource);
return partitions != null && partitions.contains(partition);
}

/**
* Set the capacity of this node
* @param capacity The capacity to set
Expand All @@ -66,11 +112,27 @@ public void setCapacity(int capacity) {
}

/**
* Get the ID of this node
* @return The ID of this node
* Get the instance name of this node
* @return The instance name of this node
*/
public String getId() {
return _id;
public String getInstanceName() {
return _instanceName;
}

/**
* Get the logical id of this node
* @return The logical id of this node
*/
public String getLogicalId() {
return _logicaId;
}

/**
* Get the fault zone of this node
* @return The fault zone of this node
*/
public String getFaultZone() {
return _faultZone;
}

/**
Expand All @@ -84,8 +146,40 @@ public int getCurrentlyAssigned() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
.append("\ncapacity:").append(_capacity);
sb.append("##########\nname=").append(_instanceName).append("\nassigned:")
.append(_currentlyAssigned).append("\ncapacity:").append(_capacity).append("\nlogicalId:")
.append(_logicaId).append("\nfaultZone:").append(_faultZone);
return sb.toString();
}

@Override
public int compareTo(CapacityNode o) {
if (_logicaId != null) {
return _logicaId.compareTo(o.getLogicalId());
}
return _instanceName.compareTo(o.getInstanceName());
}

/**
* Computes the fault zone id based on the domain and fault zone type when topology is enabled.
* For example, when
* the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function
* returns "2".
* If cannot find the fault zone type, this function leaves the fault zone id as the instance name.
* TODO: change the return value to logical id when no fault zone type found. Also do the same for
* waged rebalancer in helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
*/
private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
LinkedHashMap<String, String> instanceTopologyMap =
Topology.computeInstanceTopologyMap(clusterConfig, instanceConfig.getInstanceName(),
instanceConfig, true /*earlyQuitTillFaultZone*/);

StringBuilder faultZoneStringBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : instanceTopologyMap.entrySet()) {
faultZoneStringBuilder.append(entry.getValue());
faultZoneStringBuilder.append('/');
}
faultZoneStringBuilder.setLength(faultZoneStringBuilder.length() - 1);
return faultZoneStringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,20 @@
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.MissingTopStateRecord;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
Expand All @@ -73,6 +77,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
// a map from customized state type to customized view cache
private final Map<String, CustomizedViewCache> _customizedViewCacheMap;

// maintain a cache of ideal state (preference list + best possible assignment) which will be managed ondemand in rebalancer
private final Map<String, ZNRecord> _ondemandIdealStateCache;

// maintain a cache of bestPossible assignment across pipeline runs
// TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
private Map<String, ResourceAssignment> _resourceAssignmentCache;
Expand Down Expand Up @@ -149,6 +156,7 @@ public String getObjName(ExternalView obj) {
_refreshedChangeTypes = ConcurrentHashMap.newKeySet();
_customizedStateCache = new CustomizedStateCache(this, _aggregationEnabledTypes);
_customizedViewCacheMap = new HashMap<>();
_ondemandIdealStateCache = new HashMap<>();
}

public synchronized void refresh(HelixDataAccessor accessor) {
Expand Down Expand Up @@ -182,16 +190,17 @@ public synchronized void refresh(HelixDataAccessor accessor) {
refreshStablePartitionList(getIdealStates());
refreshDisabledInstancesForAllPartitionsSet();

if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
if (getClusterConfig() != null
&& getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap();
// Remove all cached IdealState because it is a global computation cannot partially be
// performed for some resources. The computation is simple as well not taking too much resource
// to recompute the assignments.
Set<String> cachedGreedyIdealStates = _idealMappingCache.values().stream().filter(
Set<String> cachedStickyIdealStates = _idealMappingCache.values().stream().filter(
record -> record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name())
.equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
.equals(StickyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
.collect(Collectors.toSet());
_idealMappingCache.keySet().removeAll(cachedGreedyIdealStates);
_idealMappingCache.keySet().removeAll(cachedStickyIdealStates);
}

LogUtil.logInfo(logger, getClusterEventId(), String.format(
Expand Down Expand Up @@ -388,6 +397,28 @@ public Map<String, Map<String, String>> getLastTopStateLocationMap() {
return _lastTopStateLocationMap;
}

/**
* Get cached ideal state (preference list + best possible assignment) for a resource
* @param resource
* @return
*/
public ZNRecord getCachedOndemandIdealState(String resource) {
return _ondemandIdealStateCache.get(resource);
}

/**
* Cache ideal state (preference list + best possible assignment) for a resource
* @param resource
* @return
*/
public void setCachedOndemandIdealState(String resource, ZNRecord idealState) {
_ondemandIdealStateCache.put(resource, idealState);
}

public void clearCachedOndemandIdealStates() {
_ondemandIdealStateCache.clear();
}

/**
* Get cached resourceAssignment (bestPossible mapping) for a resource
* @param resource
Expand Down Expand Up @@ -552,11 +583,16 @@ public WagedInstanceCapacity getWagedInstanceCapacity() {
return _wagedInstanceCapacity;
}

private void buildSimpleCapacityMap(int globalMaxPartitionAllowedPerInstance) {
private void buildSimpleCapacityMap() {
ClusterConfig clusterConfig = getClusterConfig();
ClusterTopologyConfig clusterTopologyConfig =
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
Map<String, InstanceConfig> instanceConfigMap = getAssignableInstanceConfigMap();
_simpleCapacitySet = new HashSet<>();
for (String instance : getEnabledLiveInstances()) {
CapacityNode capacityNode = new CapacityNode(instance);
capacityNode.setCapacity(globalMaxPartitionAllowedPerInstance);
for (String instanceName : getAssignableInstances()) {
CapacityNode capacityNode =
new CapacityNode(instanceName, clusterConfig, clusterTopologyConfig,
instanceConfigMap.getOrDefault(instanceName, new InstanceConfig(instanceName)));
_simpleCapacitySet.add(capacityNode);
}
}
Expand All @@ -565,6 +601,35 @@ public Set<CapacityNode> getSimpleCapacitySet() {
return _simpleCapacitySet;
}

public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet,
final CurrentStateOutput currentStateOutput) {
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> simpleCapacityMap = new HashMap<>();
for (CapacityNode node : _simpleCapacitySet) {
simpleCapacityMap.put(node.getInstanceName(), node);
}
for (String resourceName : resourceNameSet) {
// Process current state mapping
populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
currentStateOutput.getCurrentStateMap(resourceName));
// Process pending state mapping
populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
currentStateOutput.getPendingMessageMap(resourceName));
}
}

private <T> void populateCapacityNodeUsageFromStateMap(String resourceName,
Map<String, CapacityNode> simpleCapacityMap, Map<Partition, Map<String, T>> stateMap) {
for (Map.Entry<Partition, Map<String, T>> entry : stateMap.entrySet()) {
for (String instanceName : entry.getValue().keySet()) {
CapacityNode node = simpleCapacityMap.get(instanceName);
if (node != null) {
node.canAdd(resourceName, entry.getKey().getPartitionName());
}
}
}
}

private void refreshDisabledInstancesForAllPartitionsSet() {
_disabledInstancesForAllPartitionsSet.clear();
Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values();
Expand Down
Loading
Loading