From 39e558bae610faed036ad10025896ac5ffcb21e3 Mon Sep 17 00:00:00 2001 From: GP Date: Fri, 18 Apr 2025 19:22:59 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=88=A0=E9=99=A4=E4=BA=86=E5=A4=9A?= =?UTF-8?q?=E4=BD=99=E7=9A=84=E6=88=90=E5=91=98=E5=8F=98=E9=87=8F=EF=BC=8C?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E8=8A=82=E7=82=B9=E4=B8=8A=E4=B8=8B=E7=BA=BF?= =?UTF-8?q?=E6=97=B6=E5=8A=A8=E6=80=81=E6=9B=B4=E6=96=B0=E5=93=88=E5=B8=8C?= =?UTF-8?q?=E7=8E=AF=EF=BC=8C=E5=B9=B6=E4=BF=AE=E5=A4=8D=E4=BA=86=E9=83=A8?= =?UTF-8?q?=E5=88=86=E6=8B=BC=E5=86=99=E9=94=99=E8=AF=AF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/servicecenter/ZKServiceCenter.java | 5 ++-- .../servicecenter/ZKWatcher/watchZK.java | 23 ++++++++++++------- .../balance/impl/ConsistencyHashBalance.java | 12 ++++++++-- .../balance/impl/RandomLoadBalance.java | 6 ----- .../balance/impl/RoundLoadBalance.java | 6 ----- 5 files changed, 28 insertions(+), 24 deletions(-) diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java index 14d74b2..8d45e68 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java @@ -4,6 +4,7 @@ import com.kama.client.servicecenter.ZKWatcher.watchZK; import com.kama.client.servicecenter.balance.LoadBalance; import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance; +import com.kama.client.servicecenter.balance.impl.RandomLoadBalance; import common.message.RpcRequest; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; @@ -33,7 +34,7 @@ public class ZKServiceCenter implements ServiceCenter { //serviceCache private ServiceCache cache; - private final LoadBalance loadBalance = new ConsistencyHashBalance(); + private final LoadBalance loadBalance = new RandomLoadBalance(); //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接 public ZKServiceCenter() throws InterruptedException { @@ -50,7 +51,7 @@ public ZKServiceCenter() throws InterruptedException { //初始化本地缓存 cache = new ServiceCache(); //加入zookeeper事件监听器 - watchZK watcher = new watchZK(client, cache); + watchZK watcher = new watchZK(client, cache, loadBalance); //监听启动 watcher.watchToUpdate(ROOT_PATH); } diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java index 396d4e8..1682335 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java @@ -1,6 +1,7 @@ package com.kama.client.servicecenter.ZKWatcher; import com.kama.client.cache.ServiceCache; +import com.kama.client.servicecenter.balance.LoadBalance; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; @@ -21,11 +22,14 @@ public class watchZK { private CuratorFramework client; //本地缓存 ServiceCache cache; + //负载均衡器 + LoadBalance loadBalance; - public watchZK(CuratorFramework client, ServiceCache cache) { + public watchZK(CuratorFramework client, ServiceCache cache, LoadBalance loadBalance) { this.client = client; this.cache = cache; + this.loadBalance = loadBalance; } /** @@ -46,13 +50,14 @@ public void event(Type type, ChildData childData, ChildData childData1) { // 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据 switch (type.name()) { case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件 - String[] pathList = pasrePath(childData1); + String[] pathList = parsePath(childData1); if (pathList.length <= 2) break; else { String serviceName = pathList[1]; String address = pathList[2]; - //将新注册的服务加入到本地缓存中 + //将新注册的服务加入到本地缓存中 并更新负载均衡 cache.addServiceToCache(serviceName, address); + loadBalance.addNode(address); log.info("节点创建:服务名称 {} 地址 {}", serviceName, address); } break; @@ -62,19 +67,21 @@ public void event(Type type, ChildData childData, ChildData childData1) { } else { log.debug("节点第一次赋值!"); } - String[] oldPathList = pasrePath(childData); - String[] newPathList = pasrePath(childData1); + String[] oldPathList = parsePath(childData); + String[] newPathList = parsePath(childData1); cache.replaceServiceAddress(oldPathList[1], oldPathList[2], newPathList[2]); + loadBalance.delNode(oldPathList[2]); + loadBalance.addNode(newPathList[2]); log.info("节点更新:服务名称 {} 地址从 {} 更新为 {}", oldPathList[1], oldPathList[2], newPathList[2]); break; case "NODE_DELETED": // 节点删除 - String[] pathList_d = pasrePath(childData); + String[] pathList_d = parsePath(childData); if (pathList_d.length <= 2) break; else { String serviceName = pathList_d[1]; String address = pathList_d[2]; - //将新注册的服务加入到本地缓存中 cache.delete(serviceName, address); + loadBalance.delNode(address); log.info("节点删除:服务名称 {} 地址 {}", serviceName, address); } break; @@ -88,7 +95,7 @@ public void event(Type type, ChildData childData, ChildData childData1) { } //解析节点对应地址 - public String[] pasrePath(ChildData childData) { + public String[] parsePath(ChildData childData) { //获取更新的节点的路径 String path = new String(childData.getPath()); log.info("节点路径:{}",path); diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java index 119b9ea..98fcf25 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java @@ -53,20 +53,28 @@ public void init(List serviceList) { * @return 负责该请求的真实节点名称 */ public String getServer(String node, List serviceList) { + // 检查虚拟节点映射是否为空,如果是则初始化 if (shards.isEmpty()) { - init(serviceList); // 初始化,如果shards为空 + init(serviceList); // 使用serviceList初始化虚拟节点 } + // 计算输入节点的哈希值 int hash = getHash(node); Integer key = null; + // 获取大于等于当前哈希值的所有节点映射 SortedMap subMap = shards.tailMap(hash); + + // 确定目标节点: + // - 如果没有更大的节点,则使用第一个节点(环状结构) + // - 否则使用第一个大于等于的节点 if (subMap.isEmpty()) { - key = shards.firstKey(); // 如果没有大于该hash的节点,则返回最小的hash值 + key = shards.firstKey(); // 环状结构处理 } else { key = subMap.firstKey(); } + // 5. 获取虚拟节点并返回真实节点名称 String virtualNode = shards.get(key); return virtualNode.substring(0, virtualNode.indexOf("&&")); } diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java index 583030d..66565e7 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java @@ -19,8 +19,6 @@ public class RandomLoadBalance implements LoadBalance { // 将Random声明为类级别的字段 private final Random random = new Random(); - private final List addressList = new CopyOnWriteArrayList<>(); - @Override public String balance(List addressList) { if (addressList == null || addressList.isEmpty()) { @@ -34,15 +32,11 @@ public String balance(List addressList) { @Override public void addNode(String node) { - // 如果是动态添加节点,可以将节点加入到addressList中 - addressList.add(node); log.info("节点 {} 已加入负载均衡", node); } @Override public void delNode(String node) { - // 如果是动态删除节点,可以将节点从addressList中移除 - addressList.remove(node); log.info("节点 {} 已从负载均衡中移除", node); } } diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java index 15398a9..84ab81a 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java @@ -20,8 +20,6 @@ public class RoundLoadBalance implements LoadBalance { // 使用 AtomicInteger 保证线程安全 private AtomicInteger choose = new AtomicInteger(0); - private List addressList = new CopyOnWriteArrayList<>(); - @Override public String balance(List addressList) { if (addressList == null || addressList.isEmpty()) { @@ -38,15 +36,11 @@ public String balance(List addressList) { @Override public void addNode(String node) { - // 如果是动态添加节点,可以将节点加入到 addressList 中 - addressList.add(node); log.info("节点 {} 已加入负载均衡", node); } @Override public void delNode(String node) { - // 如果是动态删除节点,可以将节点从 addressList 中移除 - addressList.remove(node); log.info("节点 {} 已从负载均衡中移除", node); } } From d178130bf63846ff1156aad82edb2443fcb17dc4 Mon Sep 17 00:00:00 2001 From: GP Date: Fri, 18 Apr 2025 19:39:03 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=B0=86=E5=AD=98=E5=82=A8=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E7=9A=84=E9=9B=86=E5=90=88=E6=8D=A2=E6=88=90=E4=BA=86?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E5=AE=89=E5=85=A8=E7=9A=84=E9=9B=86=E5=90=88?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E4=BF=AE=E6=94=B9add=E5=92=8Cdel=E4=B8=BA?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=96=B9=E6=B3=95=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../balance/impl/ConsistencyHashBalance.java | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java index 98fcf25..0d7216a 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java @@ -22,10 +22,9 @@ public class ConsistencyHashBalance implements LoadBalance { private static final int VIRTUAL_NUM = 5; // 虚拟节点分配,key是hash值,value是虚拟节点服务器名称 - private SortedMap shards = new TreeMap(); - + private SortedMap shards = new ConcurrentSkipListMap<>(); // 真实节点列表 - private List realNodes = new LinkedList<>(); + private List realNodes = new CopyOnWriteArrayList<>(); // 获取虚拟节点的个数 public static int getVirtualNum() { @@ -53,28 +52,20 @@ public void init(List serviceList) { * @return 负责该请求的真实节点名称 */ public String getServer(String node, List serviceList) { - // 检查虚拟节点映射是否为空,如果是则初始化 if (shards.isEmpty()) { - init(serviceList); // 使用serviceList初始化虚拟节点 + init(serviceList); // 初始化,如果shards为空 } - // 计算输入节点的哈希值 int hash = getHash(node); Integer key = null; - // 获取大于等于当前哈希值的所有节点映射 SortedMap subMap = shards.tailMap(hash); - - // 确定目标节点: - // - 如果没有更大的节点,则使用第一个节点(环状结构) - // - 否则使用第一个大于等于的节点 if (subMap.isEmpty()) { - key = shards.firstKey(); // 环状结构处理 + key = shards.firstKey(); // 如果没有大于该hash的节点,则返回最小的hash值 } else { key = subMap.firstKey(); } - // 5. 获取虚拟节点并返回真实节点名称 String virtualNode = shards.get(key); return virtualNode.substring(0, virtualNode.indexOf("&&")); } @@ -84,7 +75,7 @@ public String getServer(String node, List serviceList) { * * @param node 新加入的节点 */ - public void addNode(String node) { + public synchronized void addNode(String node) { if (!realNodes.contains(node)) { realNodes.add(node); log.info("真实节点[{}] 上线添加", node); @@ -102,7 +93,7 @@ public void addNode(String node) { * * @param node 被移除的节点 */ - public void delNode(String node) { + public synchronized void delNode(String node) { if (realNodes.contains(node)) { realNodes.remove(node); log.info("真实节点[{}] 下线移除", node);