diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java index 14d74b2..8d45e68 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java @@ -4,6 +4,7 @@ import com.kama.client.servicecenter.ZKWatcher.watchZK; import com.kama.client.servicecenter.balance.LoadBalance; import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance; +import com.kama.client.servicecenter.balance.impl.RandomLoadBalance; import common.message.RpcRequest; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; @@ -33,7 +34,7 @@ public class ZKServiceCenter implements ServiceCenter { //serviceCache private ServiceCache cache; - private final LoadBalance loadBalance = new ConsistencyHashBalance(); + private final LoadBalance loadBalance = new RandomLoadBalance(); //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接 public ZKServiceCenter() throws InterruptedException { @@ -50,7 +51,7 @@ public ZKServiceCenter() throws InterruptedException { //初始化本地缓存 cache = new ServiceCache(); //加入zookeeper事件监听器 - watchZK watcher = new watchZK(client, cache); + watchZK watcher = new watchZK(client, cache, loadBalance); //监听启动 watcher.watchToUpdate(ROOT_PATH); } diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java index 396d4e8..1682335 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKWatcher/watchZK.java @@ -1,6 +1,7 @@ package com.kama.client.servicecenter.ZKWatcher; import com.kama.client.cache.ServiceCache; +import com.kama.client.servicecenter.balance.LoadBalance; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; @@ -21,11 +22,14 @@ public class watchZK { private CuratorFramework client; //本地缓存 ServiceCache cache; + //负载均衡器 + LoadBalance loadBalance; - public watchZK(CuratorFramework client, ServiceCache cache) { + public watchZK(CuratorFramework client, ServiceCache cache, LoadBalance loadBalance) { this.client = client; this.cache = cache; + this.loadBalance = loadBalance; } /** @@ -46,13 +50,14 @@ public void event(Type type, ChildData childData, ChildData childData1) { // 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据 switch (type.name()) { case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件 - String[] pathList = pasrePath(childData1); + String[] pathList = parsePath(childData1); if (pathList.length <= 2) break; else { String serviceName = pathList[1]; String address = pathList[2]; - //将新注册的服务加入到本地缓存中 + //将新注册的服务加入到本地缓存中 并更新负载均衡 cache.addServiceToCache(serviceName, address); + loadBalance.addNode(address); log.info("节点创建:服务名称 {} 地址 {}", serviceName, address); } break; @@ -62,19 +67,21 @@ public void event(Type type, ChildData childData, ChildData childData1) { } else { log.debug("节点第一次赋值!"); } - String[] oldPathList = pasrePath(childData); - String[] newPathList = pasrePath(childData1); + String[] oldPathList = parsePath(childData); + String[] newPathList = parsePath(childData1); cache.replaceServiceAddress(oldPathList[1], oldPathList[2], newPathList[2]); + loadBalance.delNode(oldPathList[2]); + loadBalance.addNode(newPathList[2]); log.info("节点更新:服务名称 {} 地址从 {} 更新为 {}", oldPathList[1], oldPathList[2], newPathList[2]); break; case "NODE_DELETED": // 节点删除 - String[] pathList_d = pasrePath(childData); + String[] pathList_d = parsePath(childData); if (pathList_d.length <= 2) break; else { String serviceName = pathList_d[1]; String address = pathList_d[2]; - //将新注册的服务加入到本地缓存中 cache.delete(serviceName, address); + loadBalance.delNode(address); log.info("节点删除:服务名称 {} 地址 {}", serviceName, address); } break; @@ -88,7 +95,7 @@ public void event(Type type, ChildData childData, ChildData childData1) { } //解析节点对应地址 - public String[] pasrePath(ChildData childData) { + public String[] parsePath(ChildData childData) { //获取更新的节点的路径 String path = new String(childData.getPath()); log.info("节点路径:{}",path); diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java index 119b9ea..0d7216a 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/ConsistencyHashBalance.java @@ -22,10 +22,9 @@ public class ConsistencyHashBalance implements LoadBalance { private static final int VIRTUAL_NUM = 5; // 虚拟节点分配,key是hash值,value是虚拟节点服务器名称 - private SortedMap shards = new TreeMap(); - + private SortedMap shards = new ConcurrentSkipListMap<>(); // 真实节点列表 - private List realNodes = new LinkedList<>(); + private List realNodes = new CopyOnWriteArrayList<>(); // 获取虚拟节点的个数 public static int getVirtualNum() { @@ -76,7 +75,7 @@ public String getServer(String node, List serviceList) { * * @param node 新加入的节点 */ - public void addNode(String node) { + public synchronized void addNode(String node) { if (!realNodes.contains(node)) { realNodes.add(node); log.info("真实节点[{}] 上线添加", node); @@ -94,7 +93,7 @@ public void addNode(String node) { * * @param node 被移除的节点 */ - public void delNode(String node) { + public synchronized void delNode(String node) { if (realNodes.contains(node)) { realNodes.remove(node); log.info("真实节点[{}] 下线移除", node); diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java index 583030d..66565e7 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RandomLoadBalance.java @@ -19,8 +19,6 @@ public class RandomLoadBalance implements LoadBalance { // 将Random声明为类级别的字段 private final Random random = new Random(); - private final List addressList = new CopyOnWriteArrayList<>(); - @Override public String balance(List addressList) { if (addressList == null || addressList.isEmpty()) { @@ -34,15 +32,11 @@ public String balance(List addressList) { @Override public void addNode(String node) { - // 如果是动态添加节点,可以将节点加入到addressList中 - addressList.add(node); log.info("节点 {} 已加入负载均衡", node); } @Override public void delNode(String node) { - // 如果是动态删除节点,可以将节点从addressList中移除 - addressList.remove(node); log.info("节点 {} 已从负载均衡中移除", node); } } diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java index 15398a9..84ab81a 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/balance/impl/RoundLoadBalance.java @@ -20,8 +20,6 @@ public class RoundLoadBalance implements LoadBalance { // 使用 AtomicInteger 保证线程安全 private AtomicInteger choose = new AtomicInteger(0); - private List addressList = new CopyOnWriteArrayList<>(); - @Override public String balance(List addressList) { if (addressList == null || addressList.isEmpty()) { @@ -38,15 +36,11 @@ public String balance(List addressList) { @Override public void addNode(String node) { - // 如果是动态添加节点,可以将节点加入到 addressList 中 - addressList.add(node); log.info("节点 {} 已加入负载均衡", node); } @Override public void delNode(String node) { - // 如果是动态删除节点,可以将节点从 addressList 中移除 - addressList.remove(node); log.info("节点 {} 已从负载均衡中移除", node); } }