Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.kama.client.servicecenter.ZKWatcher.watchZK;
import com.kama.client.servicecenter.balance.LoadBalance;
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
import com.kama.client.servicecenter.balance.impl.RandomLoadBalance;
import common.message.RpcRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
Expand Down Expand Up @@ -33,7 +34,7 @@ public class ZKServiceCenter implements ServiceCenter {
//serviceCache
private ServiceCache cache;

private final LoadBalance loadBalance = new ConsistencyHashBalance();
private final LoadBalance loadBalance = new RandomLoadBalance();

//负责zookeeper客户端的初始化,并与zookeeper服务端进行连接
public ZKServiceCenter() throws InterruptedException {
Expand All @@ -50,7 +51,7 @@ public ZKServiceCenter() throws InterruptedException {
//初始化本地缓存
cache = new ServiceCache();
//加入zookeeper事件监听器
watchZK watcher = new watchZK(client, cache);
watchZK watcher = new watchZK(client, cache, loadBalance);
//监听启动
watcher.watchToUpdate(ROOT_PATH);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.kama.client.servicecenter.ZKWatcher;

import com.kama.client.cache.ServiceCache;
import com.kama.client.servicecenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
Expand All @@ -21,11 +22,14 @@ public class watchZK {
private CuratorFramework client;
//本地缓存
ServiceCache cache;
//负载均衡器
LoadBalance loadBalance;


public watchZK(CuratorFramework client, ServiceCache cache) {
public watchZK(CuratorFramework client, ServiceCache cache, LoadBalance loadBalance) {
this.client = client;
this.cache = cache;
this.loadBalance = loadBalance;
}

/**
Expand All @@ -46,13 +50,14 @@ public void event(Type type, ChildData childData, ChildData childData1) {
// 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据
switch (type.name()) {
case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
String[] pathList = pasrePath(childData1);
String[] pathList = parsePath(childData1);
if (pathList.length <= 2) break;
else {
String serviceName = pathList[1];
String address = pathList[2];
//将新注册的服务加入到本地缓存中
//将新注册的服务加入到本地缓存中 并更新负载均衡
cache.addServiceToCache(serviceName, address);
loadBalance.addNode(address);
log.info("节点创建:服务名称 {} 地址 {}", serviceName, address);
}
break;
Expand All @@ -62,19 +67,21 @@ public void event(Type type, ChildData childData, ChildData childData1) {
} else {
log.debug("节点第一次赋值!");
}
String[] oldPathList = pasrePath(childData);
String[] newPathList = pasrePath(childData1);
String[] oldPathList = parsePath(childData);
String[] newPathList = parsePath(childData1);
cache.replaceServiceAddress(oldPathList[1], oldPathList[2], newPathList[2]);
loadBalance.delNode(oldPathList[2]);
loadBalance.addNode(newPathList[2]);
log.info("节点更新:服务名称 {} 地址从 {} 更新为 {}", oldPathList[1], oldPathList[2], newPathList[2]);
break;
case "NODE_DELETED": // 节点删除
String[] pathList_d = pasrePath(childData);
String[] pathList_d = parsePath(childData);
if (pathList_d.length <= 2) break;
else {
String serviceName = pathList_d[1];
String address = pathList_d[2];
//将新注册的服务加入到本地缓存中
cache.delete(serviceName, address);
loadBalance.delNode(address);
log.info("节点删除:服务名称 {} 地址 {}", serviceName, address);
}
break;
Expand All @@ -88,7 +95,7 @@ public void event(Type type, ChildData childData, ChildData childData1) {
}

//解析节点对应地址
public String[] pasrePath(ChildData childData) {
public String[] parsePath(ChildData childData) {
//获取更新的节点的路径
String path = new String(childData.getPath());
log.info("节点路径:{}",path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ public class ConsistencyHashBalance implements LoadBalance {
private static final int VIRTUAL_NUM = 5;

// 虚拟节点分配,key是hash值,value是虚拟节点服务器名称
private SortedMap<Integer, String> shards = new TreeMap<Integer,String>();

private SortedMap<Integer, String> shards = new ConcurrentSkipListMap<>();
// 真实节点列表
private List<String> realNodes = new LinkedList<>();
private List<String> realNodes = new CopyOnWriteArrayList<>();

// 获取虚拟节点的个数
public static int getVirtualNum() {
Expand Down Expand Up @@ -76,7 +75,7 @@ public String getServer(String node, List<String> serviceList) {
*
* @param node 新加入的节点
*/
public void addNode(String node) {
public synchronized void addNode(String node) {
if (!realNodes.contains(node)) {
realNodes.add(node);
log.info("真实节点[{}] 上线添加", node);
Expand All @@ -94,7 +93,7 @@ public void addNode(String node) {
*
* @param node 被移除的节点
*/
public void delNode(String node) {
public synchronized void delNode(String node) {
if (realNodes.contains(node)) {
realNodes.remove(node);
log.info("真实节点[{}] 下线移除", node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public class RandomLoadBalance implements LoadBalance {
// 将Random声明为类级别的字段
private final Random random = new Random();

private final List<String> addressList = new CopyOnWriteArrayList<>();

@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
Expand All @@ -34,15 +32,11 @@ public String balance(List<String> addressList) {

@Override
public void addNode(String node) {
// 如果是动态添加节点,可以将节点加入到addressList中
addressList.add(node);
log.info("节点 {} 已加入负载均衡", node);
}

@Override
public void delNode(String node) {
// 如果是动态删除节点,可以将节点从addressList中移除
addressList.remove(node);
log.info("节点 {} 已从负载均衡中移除", node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ public class RoundLoadBalance implements LoadBalance {
// 使用 AtomicInteger 保证线程安全
private AtomicInteger choose = new AtomicInteger(0);

private List<String> addressList = new CopyOnWriteArrayList<>();

@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
Expand All @@ -38,15 +36,11 @@ public String balance(List<String> addressList) {

@Override
public void addNode(String node) {
// 如果是动态添加节点,可以将节点加入到 addressList 中
addressList.add(node);
log.info("节点 {} 已加入负载均衡", node);
}

@Override
public void delNode(String node) {
// 如果是动态删除节点,可以将节点从 addressList 中移除
addressList.remove(node);
log.info("节点 {} 已从负载均衡中移除", node);
}
}