diff --git a/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/flow/BaseFlow.java b/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/flow/BaseFlow.java index 378d9b3d6..3a964ff85 100644 --- a/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/flow/BaseFlow.java +++ b/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/flow/BaseFlow.java @@ -42,6 +42,7 @@ import com.tencent.polaris.api.rpc.Criteria; import com.tencent.polaris.api.rpc.RequestBaseEntity; import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.util.Utils; import com.tencent.polaris.logging.LoggerFactory; import org.slf4j.Logger; @@ -96,6 +97,9 @@ public static Instance commonGetOneInstance(Extensions extensions, ServiceKey se ServiceConfig serviceConfig = extensions.getConfiguration().getProvider().getService(); RouteInfo routeInfo = new RouteInfo( null, null, dstSvcInfo, null, "", serviceConfig); + if (StringUtils.isNotBlank(protocol)) { + routeInfo.putRouterMetadata("metadataRoute", metadata); + } ResourcesResponse resourcesResponse = BaseFlow .syncGetResources(extensions, false, provider, flowControlParam); LOG.debug("[ConnectionManager]success to discover service {}", svcEventKey); diff --git a/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java b/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java index 3dd4e5a97..eea419ae3 100644 --- a/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java +++ b/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java @@ -24,11 +24,7 @@ import com.tencent.polaris.api.plugin.common.PluginTypes; import com.tencent.polaris.api.plugin.compose.Extensions; import com.tencent.polaris.api.plugin.loadbalance.LoadBalancer; -import com.tencent.polaris.api.pojo.DefaultInstance; -import com.tencent.polaris.api.pojo.DefaultServiceInstances; -import com.tencent.polaris.api.pojo.Instance; -import com.tencent.polaris.api.pojo.ServiceInstances; -import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.pojo.*; import com.tencent.polaris.api.rpc.Criteria; import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.IPAddressUtils; @@ -190,11 +186,8 @@ public int nodeListSize() { } private Instance getDiscoverInstance() throws PolarisException { - Instance instance = BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol, + return BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol, clientId); - LOG.info("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), - instance.getPort()); - return instance; } @JustForTest diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/route/RouteInfo.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/route/RouteInfo.java index f6a6043f7..3fdb8fa9b 100644 --- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/route/RouteInfo.java +++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/route/RouteInfo.java @@ -256,6 +256,15 @@ public Map getRouterMetadata(String routerType) { return Collections.unmodifiableMap(metadata); } + public void putRouterMetadata(String routerType, Map metadata) { + Map tempMetadata = routerMetadata.get(routerType); + if (tempMetadata == null || tempMetadata.isEmpty()) { + tempMetadata = new HashMap<>(); + routerMetadata.put(routerType, tempMetadata); + } + tempMetadata.putAll(metadata); + } + public void setRouterArguments(Map> routerArguments) { Map> routerMetadata = this.routerMetadata; diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/AsyncRateLimitConnector.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/AsyncRateLimitConnector.java index ac65da47c..c205b996b 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/AsyncRateLimitConnector.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/AsyncRateLimitConnector.java @@ -84,7 +84,13 @@ public StreamCounterSet getStreamCounterSet(Extensions extensions, ServiceKey re } if (null != streamCounterSet) { //切换了节点,去掉初始化记录 - streamCounterSet.deleteInitRecord(serviceIdentifier); + InitializeRecord removedRecord = streamCounterSet.deleteInitRecord(serviceIdentifier); + if (removedRecord != null) { + RateLimitWindow rateLimitWindow = removedRecord.getRateLimitWindow(); + uniqueKey = rateLimitWindow != null ? rateLimitWindow.getUniqueKey() : null; + LOG.info("[getStreamCounterSet] host switched, and initRecord removed serviceIdentifier: {}, window " + + "{} {}", serviceIdentifier, rateLimitWindow, uniqueKey); + } //切换了节点,老的不再使用 if (streamCounterSet.decreaseReference()) { nodeToStream.remove(node); diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/QuotaFlow.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/QuotaFlow.java index ab85810ea..2a9b92052 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/QuotaFlow.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/QuotaFlow.java @@ -92,6 +92,15 @@ public void init(Extensions extensions) throws PolarisException { FlowCache flowCache = extensions.getFlowCache(); return flowCache.loadPluginCacheObject(API_ID, key, path -> TrieUtil.buildSimpleApiTrieNode((String) path)); }; + rateLimitExtension.submitExpireJob(() -> { + try { + for (Map.Entry entry : svcToWindowSet.entrySet()) { + entry.getValue().cleanupContainers(); + } + } catch (Throwable e) { + LOG.error("Failed to cleanup expired rate limit window", e); + } + }); // init tsf rate limit master utils if need Map metadata = rateLimitConfig.getMetadata(); diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitExtension.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitExtension.java index f4db90fed..14c2d9332 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitExtension.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitExtension.java @@ -25,20 +25,22 @@ import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.api.utils.ThreadPoolUtils; import com.tencent.polaris.client.util.NamedThreadFactory; +import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.ratelimit.client.sync.RemoteSyncTask; -import com.tencent.polaris.ratelimit.client.utils.RateLimitConstants; import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto; +import org.slf4j.Logger; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Random; import java.util.concurrent.*; import static com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter.*; public class RateLimitExtension extends Destroyable { + private static final Logger LOG = LoggerFactory.getLogger(RateLimitExtension.class); + private final Extensions extensions; private final Map rateLimiters = new HashMap<>(); @@ -113,9 +115,17 @@ private String getRateLimiterName(RateLimitProto.Rule.Resource resource, String * @param task 任务 */ public void submitSyncTask(RemoteSyncTask task, long initialDelay, long delay) { + if (scheduledTasks.containsKey(task.getWindow().getUniqueKey())) { + LOG.warn("task has exist, ignore, task {}, window {}, uniqueKey {} ", task, task.getWindow(), + task.getWindow().getUniqueKey()); + task.getWindow().setStatus(RateLimitWindow.WindowStatus.CREATED.ordinal()); + return; + } ScheduledFuture scheduledFuture = syncExecutor - .scheduleWithFixedDelay(task, 0, delay, TimeUnit.MILLISECONDS); + .scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS); scheduledTasks.put(task.getWindow().getUniqueKey(), scheduledFuture); + LOG.info("submit sync task success, task {}, future {}, window {}, uniqueKey {} ", task, scheduledFuture, + task.getWindow(), task.getWindow().getUniqueKey()); } private static final int EXPIRE_INTERVAL_SECOND = 5; @@ -130,8 +140,33 @@ public void submitExpireJob(Runnable task) { .scheduleWithFixedDelay(task, EXPIRE_INTERVAL_SECOND, EXPIRE_INTERVAL_SECOND, TimeUnit.SECONDS); } - public void stopSyncTask(String uniqueKey) { + /** + * 停止同步任务 + * + * @param uniqueKey 窗口唯一标识 + * @param window 限流窗口 + */ + public void stopSyncTask(String uniqueKey, RateLimitWindow window) { + // 从connector初始化列表清理 + Runnable cleanTask = () -> { + try { + AsyncRateLimitConnector connector = window.getWindowSet().getAsyncRateLimitConnector(); + ServiceIdentifier identifier = new ServiceIdentifier(window.getSvcKey().getService(), + window.getSvcKey().getNamespace(), window.getLabels()); + StreamCounterSet streamCounterSet = connector.getStreamCounterSet( + window.getWindowSet().getRateLimitExtension().getExtensions(), + window.getRemoteCluster(), window.getServiceAddressRepository(), window.getUniqueKey(), identifier); + if (streamCounterSet != null) { + streamCounterSet.deleteInitRecord(identifier, window); + } + LOG.info("clean task run success, window {}", window); + } catch (Throwable e) { + LOG.error("clean task run failed, window {}", window.getUniqueKey(), e); + } + }; + syncExecutor.schedule(cleanTask, 10, TimeUnit.MILLISECONDS); ScheduledFuture future = scheduledTasks.remove(uniqueKey); + LOG.info("scheduledTasks remove uniqueKey {}, future {}", uniqueKey, future); if (null != future) { future.cancel(true); } diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindow.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindow.java index d4b9e6f25..b678037c2 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindow.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindow.java @@ -18,13 +18,14 @@ package com.tencent.polaris.ratelimit.client.flow; import com.tencent.polaris.api.config.consumer.LoadBalanceConfig; +import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; import com.tencent.polaris.api.config.provider.RateLimitConfig; import com.tencent.polaris.api.plugin.compose.Extensions; import com.tencent.polaris.api.plugin.ratelimiter.InitCriteria; import com.tencent.polaris.api.plugin.ratelimiter.QuotaBucket; import com.tencent.polaris.api.plugin.ratelimiter.QuotaResult; import com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter; -import com.tencent.polaris.api.pojo.*; +import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.flow.FlowControlParam; import com.tencent.polaris.client.remote.ServiceAddressRepository; @@ -39,11 +40,12 @@ import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Amount; import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.RateLimitCluster; import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Rule; -import java.util.Random; import org.slf4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -97,6 +99,8 @@ public enum WindowStatus { private final AtomicLong lastInitTimeMs = new AtomicLong(); + private final AtomicLong lastSyncTimeMs = new AtomicLong(); + // 执行正式分配的令牌桶 private final QuotaBucket allocatingBucket; @@ -144,7 +148,7 @@ public RateLimitWindow(RateLimitWindowSet windowSet, CommonQuotaRequest quotaReq this.syncParam = quotaRequest.getFlowControlParam(); remoteCluster = getLimiterClusterService(rule.getCluster(), rateLimitConfig); serviceAddressRepository = buildServiceAddressRepository(rateLimitConfig.getLimiterAddresses(), - uniqueKey, windowSet.getExtensions(), remoteCluster, null, LoadBalanceConfig.LOAD_BALANCE_RING_HASH, "grpc"); + uniqueKey, windowSet.getExtensions(), remoteCluster); allocatingBucket = getQuotaBucket(initCriteria, windowSet.getRateLimitExtension()); lastAccessTimeMs.set(System.currentTimeMillis()); this.rateLimitConfig = rateLimitConfig; @@ -152,8 +156,10 @@ public RateLimitWindow(RateLimitWindowSet windowSet, CommonQuotaRequest quotaReq } private ServiceAddressRepository buildServiceAddressRepository(List addresses, String hash, Extensions extensions, - ServiceKey remoteCluster, List routers, String lbPolicy, String protocol) { - return new ServiceAddressRepository(addresses, hash, extensions, remoteCluster, routers, lbPolicy, protocol); + ServiceKey remoteCluster) { + List routers = new ArrayList<>(); + routers.add(ServiceRouterConfig.DEFAULT_ROUTER_METADATA); + return new ServiceAddressRepository(addresses, hash, extensions, remoteCluster, routers, LoadBalanceConfig.LOAD_BALANCE_RING_HASH, "grpc"); } @@ -249,10 +255,12 @@ public void init() { } if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE && !isTsfCluster) { //本地限流,则直接可用 + LOG.info("[RateLimitWindow] local window {} initiated", this); status.set(WindowStatus.INITIALIZED.ordinal()); return; } //加入轮询队列,走异步调度 + LOG.info("[RateLimitWindow] remote window {} first init", this); if (rule.getMetadataMap().containsKey("limiter") && StringUtils.equalsIgnoreCase("tsf", rule.getMetadataMap().get("limiter"))) { windowSet.getRateLimitExtension().submitSyncTask(new TsfRemoteSyncTask(this), 0L, 1000L); @@ -270,8 +278,13 @@ public void unInit() { return; } status.set(WindowStatus.DELETED.ordinal()); + LOG.info("[RateLimitWindow] window {} {} is set to DELETED", uniqueKey, this); //从轮询队列中剔除 - windowSet.getRateLimitExtension().stopSyncTask(uniqueKey); + if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE) { + return; + } + LOG.info("[RateLimitWindow] stopSyncTask( uniqueKey {}, window {} ) ", uniqueKey, this); + windowSet.getRateLimitExtension().stopSyncTask(uniqueKey, this); } } @@ -301,16 +314,21 @@ public void returnQuota(CommonQuotaRequest request) { /** * 窗口已经过期 + * TSF 设置为不过期 * * @return boolean */ public boolean isExpired() { - long curTimeMs = System.currentTimeMillis(); - boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs; - if (expired) { - LOG.info("[RateLimit]window has expired, expireDurationMs {}, uniqueKey {}", expireDurationMs, uniqueKey); + if (!isTsfCluster) { + long curTimeMs = System.currentTimeMillis(); + boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs; + if (expired) { + LOG.info("[RateLimit] window has expired, expireDurationMs {}, uniqueKey {}, window {}", expireDurationMs, + uniqueKey, this); + } + return expired; } - return expired; + return false; } public long getLastInitTimeMs() { @@ -321,6 +339,14 @@ public void setLastInitTimeMs(long lastInitTimeMs) { this.lastInitTimeMs.set(lastInitTimeMs); } + public long getLastSyncTimeMs() { + return lastSyncTimeMs.get(); + } + + public void setLastSyncTimeMs(long lastSyncTimeMs) { + this.lastSyncTimeMs.set(lastSyncTimeMs); + } + /** * 获取当前窗口的状态 * diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindowSet.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindowSet.java index 94a466087..5de4bc5dd 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindowSet.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindowSet.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; public class RateLimitWindowSet { @@ -133,6 +134,26 @@ public void deleteRules(Set rules) { } } + /** + * 过期清理单个rule下所有WindowContainer + */ + public void cleanupContainers() { + AtomicInteger rulesExpired = new AtomicInteger(0); + windowByRule.entrySet().removeIf(entry -> { + boolean expired = entry.getValue().checkAndCleanExpiredWindows(); + if (expired) { + rulesExpired.incrementAndGet(); + LOG.info("[RateLimitWindowSet] rule {} for service {} has been expired, window container {}", + entry.getKey(), serviceKey, entry.getValue()); + } + return expired; + }); + if (rulesExpired.get() > 0) { + LOG.info("[RateLimitWindowSet] {} rules have been cleaned up due to expiration, service {}", + rulesExpired, serviceKey); + } + } + public AsyncRateLimitConnector getAsyncRateLimitConnector() { return asyncRateLimitConnector; } diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamCounterSet.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamCounterSet.java index 65ad435a5..8253e07ff 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamCounterSet.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamCounterSet.java @@ -19,12 +19,11 @@ import com.tencent.polaris.client.pojo.Node; import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; - /** * 计数器对象 */ @@ -97,11 +96,20 @@ public boolean decreaseReference() { return false; } - public void deleteInitRecord(ServiceIdentifier serviceIdentifier) { + public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier) { + StreamResource streamResource = currentStreamResource.get(); + if (null != streamResource) { + return streamResource.deleteInitRecord(serviceIdentifier); + } + return null; + } + + public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier, RateLimitWindow window) { StreamResource streamResource = currentStreamResource.get(); if (null != streamResource) { - streamResource.deleteInitRecord(serviceIdentifier); + return streamResource.deleteInitRecord(serviceIdentifier, window); } + return null; } diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamResource.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamResource.java index 6c57ee7ef..a7e6563b3 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamResource.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamResource.java @@ -123,6 +123,7 @@ public StreamResource(Node node) { private ManagedChannel createConnection(Node node) { ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(node.getHost(), node.getPort()) .usePlaintext(); + LOG.info("[ServerConnector]connection {} start to connect", node); return builder.build(); } @@ -188,17 +189,35 @@ public InitializeRecord getInitRecord(ServiceIdentifier serviceIdentifier, RateL if (record == null) { LOG.info("[RateLimit] add init record for {}, stream is {}", serviceIdentifier, this.hostNode); initRecord.putIfAbsent(serviceIdentifier, new InitializeRecord(rateLimitWindow)); + LOG.info("[RateLimit] record is null, write init record for task window is {} {} {}", rateLimitWindow, + rateLimitWindow.getUniqueKey(), rateLimitWindow.getStatus()); } else if (record.getRateLimitWindow() != rateLimitWindow) { // 存在旧窗口映射关系,说明已经淘汰 initRecord.put(serviceIdentifier, new InitializeRecord(rateLimitWindow)); RateLimitWindow oldWindow = record.getRateLimitWindow(); - LOG.info("remove init record for window {} {}", oldWindow.getUniqueKey(), oldWindow.getStatus()); + LOG.warn("[RateLimit] remove init record for window {} {} {}, task window is {} {} {}", oldWindow, + oldWindow.getUniqueKey(), oldWindow.getStatus(), rateLimitWindow, + rateLimitWindow.getUniqueKey(), rateLimitWindow.getStatus()); } return initRecord.get(serviceIdentifier); } - public void deleteInitRecord(ServiceIdentifier serviceIdentifier) { + public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier) { LOG.info("[RateLimit] delete init record for {}, stream is {}", serviceIdentifier, this.hostNode); - initRecord.remove(serviceIdentifier); + return initRecord.remove(serviceIdentifier); + } + + // 淘汰时删除窗口初始化映射信息 + public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier, RateLimitWindow rateLimitWindow) { + InitializeRecord record = initRecord.get(serviceIdentifier); + if (record != null && record.getRateLimitWindow() == rateLimitWindow) { + record.getDurationRecord().forEach((duration, counterKey) -> counters.remove(counterKey)); + initRecord.remove(serviceIdentifier); + LOG.info("[RateLimit] delete init record for {}, window {}", serviceIdentifier, rateLimitWindow.getUniqueKey()); + } else if (record != null && record.getRateLimitWindow() != rateLimitWindow) { + String recordWindow = record.getRateLimitWindow() != null ? record.getRateLimitWindow().getUniqueKey() : null; + LOG.warn("[RateLimit] delete init record for {}, window {} failed with {}", serviceIdentifier, rateLimitWindow.getUniqueKey(), recordWindow); + } + return record; } /** @@ -233,21 +252,35 @@ private void handleRateLimitInitResponse(RateLimitInitResponse rateLimitInitResp return; } //重新初始化后,之前的记录就不要了 + RateLimitWindow rateLimitWindow = initializeRecord.getRateLimitWindow(); + initializeRecord.getDurationRecord().forEach((duration, counterKey) -> counters.remove(counterKey)); initializeRecord.getDurationRecord().clear(); long remoteQuotaTimeMilli = rateLimitInitResponse.getTimestamp(); long localQuotaTimeMilli = getLocalTimeMilli(remoteQuotaTimeMilli); - RateLimitWindow rateLimitWindow = initializeRecord.getRateLimitWindow(); + long currentTimeMs = System.currentTimeMillis(); + rateLimitWindow.setLastSyncTimeMs(currentTimeMs); rateLimitWindow.setLastInitTimeMs(0); // 重置上次初始化时间,从而在metric变更或上报失败时可再次立刻再初始化 countersList.forEach(counter -> { initializeRecord.getDurationRecord().putIfAbsent(counter.getDuration(), counter.getCounterKey()); - counters.putIfAbsent(counter.getCounterKey(), - new DurationBaseCallback(counter.getDuration(), rateLimitWindow)); + DurationBaseCallback callback = new DurationBaseCallback(counter.getDuration(), rateLimitWindow); + DurationBaseCallback pre = counters.putIfAbsent(counter.getCounterKey(), callback); + if (pre != null && pre.getRateLimitWindow() != rateLimitWindow) { + counters.put(counter.getCounterKey(), callback); + LOG.warn("[handleRateLimitInitResponse] remove counter for window {}, new window {} {}", + pre.getRateLimitWindow().getUniqueKey(), rateLimitWindow.getUniqueKey(), + counter.getCounterKey()); + } RemoteQuotaInfo remoteQuotaInfo = new RemoteQuotaInfo(counter.getLeft(), counter.getClientCount(), - localQuotaTimeMilli, counter.getDuration() * 1000); + localQuotaTimeMilli, counter.getDuration() * 1000L); rateLimitWindow.getAllocatingBucket().onRemoteUpdate(remoteQuotaInfo); }); - LOG.info("[RateLimit] window {} has turn to initialized", rateLimitWindow.getUniqueKey()); - rateLimitWindow.setStatus(WindowStatus.INITIALIZED.ordinal()); + if (rateLimitWindow.getStatus() == WindowStatus.INITIALIZING) { + LOG.info("[handleRateLimitInitResponse] window {} has turn to initialized", rateLimitWindow.getUniqueKey()); + rateLimitWindow.setStatus(WindowStatus.INITIALIZED.ordinal()); + } else { + LOG.warn("[handleRateLimitInitResponse] failed to set window to INITIALIZED. window {} {}, status {} ", + rateLimitWindow, rateLimitWindow.getUniqueKey(), rateLimitWindow.getStatus()); + } } /** @@ -272,9 +305,15 @@ boolean handleRateLimitReportResponse(RateLimitReportResponse rateLimitReportRes long localQuotaTimeMilli = getLocalTimeMilli(remoteQuotaTimeMilli); quotaLeftsList.forEach(quotaLeft -> { DurationBaseCallback callback = counters.get(quotaLeft.getCounterKey()); + if (callback == null) { + LOG.warn("[handleRateLimitReportResponse] callback not found for counterKey {}, may have been expired", + quotaLeft.getCounterKey()); + return; + } RemoteQuotaInfo remoteQuotaInfo = new RemoteQuotaInfo(quotaLeft.getLeft(), quotaLeft.getClientCount(), - localQuotaTimeMilli, callback.getDuration() * 1000); + localQuotaTimeMilli, callback.getDuration() * 1000L); callback.getRateLimitWindow().getAllocatingBucket().onRemoteUpdate(remoteQuotaInfo); + callback.getRateLimitWindow().setLastSyncTimeMs(System.currentTimeMillis()); }); return true; } @@ -352,8 +391,30 @@ public boolean sendRateLimitRequest(RateLimitRequest rateLimitRequest) { } } - public boolean hasInit(ServiceIdentifier serviceIdentifier) { - return initRecord.containsKey(serviceIdentifier); + public boolean hasInit(ServiceIdentifier serviceIdentifier, RateLimitWindow rateLimitWindow) { + InitializeRecord record = initRecord.get(serviceIdentifier); + if (record == null || record.getDurationRecord().isEmpty()) { + return false; + } + if (record.getRateLimitWindow() != rateLimitWindow) { + record.getDurationRecord().forEach((duration, counterKey) -> counters.remove(counterKey)); + initRecord.remove(serviceIdentifier); // 清理索引触发重新初始化 + LOG.warn("[hasInit] init record {} is removed for switched. record window {} {}, param window {} {}", + initRecord, record.getRateLimitWindow(), record.getRateLimitWindow().getUniqueKey(), rateLimitWindow, + rateLimitWindow.getUniqueKey()); + return false; + } + if (System.currentTimeMillis() - rateLimitWindow.getLastSyncTimeMs() + > RateLimitConstants.WINDOW_INDEX_EXPIRE_TIME) { + record.getDurationRecord().forEach((duration, counterKey) -> counters.remove(counterKey)); + initRecord.remove(serviceIdentifier); // 清理索引触发重新初始化 + LOG.warn("[hasInit] init record is removed for expired. last sync time {}. " + + "record window {} {}, param window {} {}. ", rateLimitWindow.getLastSyncTimeMs(), + record.getRateLimitWindow(), record.getRateLimitWindow().getUniqueKey(), rateLimitWindow, + rateLimitWindow.getUniqueKey()); + return false; + } + return true; } public Integer getCounterKey(ServiceIdentifier serviceIdentifier, Integer duration) { diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/WindowContainer.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/WindowContainer.java index dcda1f4db..34ff25f8c 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/WindowContainer.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/WindowContainer.java @@ -19,10 +19,12 @@ import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import org.slf4j.Logger; public class WindowContainer { @@ -71,4 +73,37 @@ public RateLimitWindow getMainWindow() { return mainWindow; } + /** + * 检查并淘汰窗口 + * + * @return 是否淘汰 + */ + public boolean checkAndCleanExpiredWindows() { + if (null != mainWindow) { + boolean expired = mainWindow.isExpired(); + if (expired) { + LOG.info("[RateLimit] mainWindow have been cleaned up due to expiration, service {}", serviceKey); + mainWindow.unInit(); + } + return expired; + } + int expiredLabels = 0; + Iterator> iterator = windowByLabel.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + String labelKey = entry.getKey(); + RateLimitWindow window = entry.getValue(); + if (window.isExpired()) { + expiredLabels++; + iterator.remove(); // 使用迭代器的 remove 方法删除当前元素 + LOG.info("[WindowContainer] windowByLabel remove label key {} , window {}", labelKey, window); + window.unInit(); + } + } + if (expiredLabels > 0) { + LOG.info("[RateLimit] {} labels have been cleaned up due to expiration, service {}", expiredLabels, + serviceKey); + } + return windowByLabel.isEmpty(); + } } diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/sync/PolarisRemoteSyncTask.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/sync/PolarisRemoteSyncTask.java index 93a0758a7..2a72acd88 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/sync/PolarisRemoteSyncTask.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/sync/PolarisRemoteSyncTask.java @@ -181,7 +181,7 @@ private void doRemoteAcquire() { } StreamResource streamResource = streamCounterSet.checkAndCreateResource(serviceIdentifier, window); - if (!streamResource.hasInit(serviceIdentifier)) { + if (!streamResource.hasInit(serviceIdentifier, window)) { doRemoteInit(true); return; } diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimitConstants.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimitConstants.java index 44414016c..e0fb54e3f 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimitConstants.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimitConstants.java @@ -70,7 +70,12 @@ public interface RateLimitConstants { /** * 等待服务端返回结果的时间 1000ms */ - int INIT_WAIT_RESPONSE_TIME = 1 * 1000; + int INIT_WAIT_RESPONSE_TIME = 1000; + + /** + * 窗口索引有效期时间 2000ms,服务器2000ms未收到请求则淘汰窗口 + */ + int WINDOW_INDEX_EXPIRE_TIME = 2000; /** * 服务端的返回code diff --git a/polaris-ratelimit/polaris-ratelimit-factory/src/test/java/com/tencent/polaris/ratelimit/test/core/SubmitPolarisRemoteSyncTaskTest.java b/polaris-ratelimit/polaris-ratelimit-factory/src/test/java/com/tencent/polaris/ratelimit/test/core/SubmitPolarisRemoteSyncTaskTest.java new file mode 100644 index 000000000..d0d301dbb --- /dev/null +++ b/polaris-ratelimit/polaris-ratelimit-factory/src/test/java/com/tencent/polaris/ratelimit/test/core/SubmitPolarisRemoteSyncTaskTest.java @@ -0,0 +1,246 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.ratelimit.test.core; + +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.factory.config.ConfigurationImpl; +import com.tencent.polaris.ratelimit.client.flow.RateLimitExtension; +import com.tencent.polaris.ratelimit.client.flow.RateLimitWindow; +import com.tencent.polaris.ratelimit.client.sync.RemoteSyncTask; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.Mockito.*; + +/** + * Test for {@link RateLimitExtension}. + * + * @author Haotian Zhang + */ +@RunWith(MockitoJUnitRunner.class) +public class SubmitPolarisRemoteSyncTaskTest { + + @Mock + private ScheduledExecutorService syncExecutor; + + @Mock + private ScheduledExecutorService windowExpireExecutor; + + @Mock + private ScheduledFuture mockFuture; + + @Mock + private RemoteSyncTask remoteSyncTask; + + @Mock + private RateLimitWindow rateLimitWindow; + + private SDKContext context; + + private RateLimitExtension rateLimitExtension; + + private Map> scheduledTasks; + + @Before + public void setUp() throws Exception { + ConfigurationImpl config = new ConfigurationImpl(); + config.setDefault(); + config.getGlobal().getAPI().setReportEnable(false); + config.getGlobal().getStatReporter().setEnable(false); + context = SDKContext.initContextByConfig(config); + context.init(); + Extensions extensions = context.getExtensions(); + rateLimitExtension = new RateLimitExtension(extensions); + + // 使用反射替换 syncExecutor,避免真实线程池执行 + setPrivateField(rateLimitExtension, "syncExecutor", syncExecutor); + + // 使用反射替换 windowExpireExecutor,避免真实线程池执行 + setPrivateField(rateLimitExtension, "windowExpireExecutor", windowExpireExecutor); + + // 使用反射获取 scheduledTasks + scheduledTasks = getPrivateField(rateLimitExtension, "scheduledTasks"); + + // Mock 限流窗口 + when(remoteSyncTask.getWindow()).thenReturn(rateLimitWindow); + when(rateLimitWindow.getUniqueKey()).thenReturn("test-unique-key"); + } + + @After + public void tearDown() throws Exception { + context.destroy(); + } + + /** + * 测试提交同步任务 — 任务已存在时不重新提交 + * 测试场景:scheduledTasks 中已存在对应 uniqueKey 的任务 + * 验证内容:1. syncExecutor.scheduleWithFixedDelay 不被调用 + * 2. 窗口状态被回退为 CREATED + * 3. scheduledTasks 中仍为原始任务 + */ + @Test + public void testSubmitSyncTask_TaskAlreadyExists() { + // Arrange:放入一个已存在的任务 + scheduledTasks.put("test-unique-key", mockFuture); + + // Act:执行 submitSyncTask + rateLimitExtension.submitSyncTask(remoteSyncTask, 0, 30); + + // Assert:确保任务没有被重新提交 + verify(syncExecutor, never()).scheduleWithFixedDelay(any(), anyLong(), anyLong(), any()); + + // Assert:确保任务状态被设置为 CREATED + verify(rateLimitWindow, times(1)).setStatus(RateLimitWindow.WindowStatus.CREATED.ordinal()); + + // Assert:确保 scheduledTasks 仍然是原来的任务 + assertThat((Object) scheduledTasks.get("test-unique-key")).isEqualTo(mockFuture); + } + + /** + * 测试提交同步任务 — 新任务成功提交 + * 测试场景:scheduledTasks 中不存在对应 uniqueKey 的任务 + * 验证内容:1. syncExecutor.scheduleWithFixedDelay 被调用且参数精确匹配 + * 2. 任务被添加到 scheduledTasks + */ + @Test + public void testSubmitSyncTask_NewTask() { + // Arrange:Mock 任务调度返回 + Mockito.>when( + syncExecutor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)) + ).thenReturn(mockFuture); + + // Act:执行 submitSyncTask + rateLimitExtension.submitSyncTask(remoteSyncTask, 0, 30); + + // Assert:确保任务被正确提交,验证参数精确匹配 + verify(syncExecutor, times(1)).scheduleWithFixedDelay( + eq(remoteSyncTask), eq(0L), eq(30L), eq(TimeUnit.MILLISECONDS)); + + // Assert:确保任务被添加到 scheduledTasks + assertThat((Object) scheduledTasks.get("test-unique-key")).isEqualTo(mockFuture); + } + + /** + * 测试停止同步任务 — 任务存在时正确停止 + * 测试场景:scheduledTasks 中存在对应 uniqueKey 的任务 + * 验证内容:1. 任务从 scheduledTasks 中被移除 + * 2. future.cancel(true) 被调用 + * 3. cleanTask 被通过 syncExecutor.schedule 提交 + */ + @Test + public void testStopSyncTask_TaskExists() { + // Arrange:放入一个已存在的任务 + scheduledTasks.put("test-unique-key", mockFuture); + + // Act:执行 stopSyncTask + rateLimitExtension.stopSyncTask("test-unique-key", rateLimitWindow); + + // Assert:确保任务被移除 + assertThat((Object) scheduledTasks.get("test-unique-key")).isNull(); + + // Assert:确保 future.cancel(true) 被调用 + verify(mockFuture, times(1)).cancel(true); + + // Assert:确保 cleanTask 被提交到 syncExecutor + verify(syncExecutor, times(1)).schedule(any(Runnable.class), eq(10L), eq(TimeUnit.MILLISECONDS)); + } + + /** + * 测试停止同步任务 — 任务不存在时不调用 cancel + * 测试场景:scheduledTasks 中不存在对应 uniqueKey 的任务 + * 验证内容:1. future.cancel 不被调用 + * 2. cleanTask 仍被提交(清理逻辑始终执行) + * 3. 不抛出异常 + */ + @Test + public void testStopSyncTask_TaskNotExists() { + // Arrange:scheduledTasks 为空,不放入任何任务 + + // Act & Assert:执行 stopSyncTask 不抛异常 + assertThatCode(() -> rateLimitExtension.stopSyncTask("non-existent-key", rateLimitWindow)) + .doesNotThrowAnyException(); + + // Assert:确保 future.cancel 不被调用(没有 future 可以取消) + verify(mockFuture, never()).cancel(anyBoolean()); + + // Assert:确保 cleanTask 仍然被提交到 syncExecutor + verify(syncExecutor, times(1)).schedule(any(Runnable.class), eq(10L), eq(TimeUnit.MILLISECONDS)); + } + + /** + * 测试提交过期检查任务 + * 测试场景:调用 submitExpireJob 提交一个定时任务 + * 验证内容:windowExpireExecutor.scheduleWithFixedDelay 被调用且参数为 5 秒间隔 + */ + @Test + public void testSubmitExpireJob() { + // Arrange:准备一个模拟任务 + Runnable expireTask = mock(Runnable.class); + + // Act:执行 submitExpireJob + rateLimitExtension.submitExpireJob(expireTask); + + // Assert:确保过期任务被正确提交到 windowExpireExecutor,间隔 5 秒 + verify(windowExpireExecutor, times(1)).scheduleWithFixedDelay( + eq(expireTask), eq(5L), eq(5L), eq(TimeUnit.SECONDS)); + } + + /** + * 测试 destroy 方法触发线程池关闭 + * 测试场景:调用 destroy 方法 + * 验证内容:syncExecutor 和 windowExpireExecutor 被关闭 + */ + @Test + public void testDestroy() { + // Act:执行 destroy + rateLimitExtension.destroy(); + + // Assert:确保两个线程池都被关闭 + verify(syncExecutor, times(1)).shutdown(); + verify(windowExpireExecutor, times(1)).shutdown(); + } + + @SuppressWarnings("unchecked") + private static T getPrivateField(Object object, String fieldName) + throws NoSuchFieldException, IllegalAccessException { + Field field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(object); + } + + private static void setPrivateField(Object object, String fieldName, Object value) + throws NoSuchFieldException, IllegalAccessException { + Field field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, value); + } +} \ No newline at end of file diff --git a/polaris-ratelimit/polaris-ratelimit-factory/src/test/java/com/tencent/polaris/ratelimit/test/core/WindowExpireAndRecoverTest.java b/polaris-ratelimit/polaris-ratelimit-factory/src/test/java/com/tencent/polaris/ratelimit/test/core/WindowExpireAndRecoverTest.java new file mode 100644 index 000000000..572737feb --- /dev/null +++ b/polaris-ratelimit/polaris-ratelimit-factory/src/test/java/com/tencent/polaris/ratelimit/test/core/WindowExpireAndRecoverTest.java @@ -0,0 +1,605 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.ratelimit.test.core; + +import com.google.protobuf.Duration; +import com.google.protobuf.StringValue; +import com.google.protobuf.UInt32Value; +import com.tencent.polaris.api.config.Configuration; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.client.util.Utils; +import com.tencent.polaris.ratelimit.api.core.LimitAPI; +import com.tencent.polaris.ratelimit.api.flow.LimitFlow; +import com.tencent.polaris.ratelimit.api.rpc.Argument; +import com.tencent.polaris.ratelimit.api.rpc.QuotaRequest; +import com.tencent.polaris.ratelimit.api.rpc.QuotaResponse; +import com.tencent.polaris.ratelimit.api.rpc.QuotaResultCode; +import com.tencent.polaris.ratelimit.client.api.DefaultLimitAPI; +import com.tencent.polaris.ratelimit.client.flow.DefaultLimitFlow; +import com.tencent.polaris.ratelimit.client.flow.QuotaFlow; +import com.tencent.polaris.ratelimit.client.flow.RateLimitWindow; +import com.tencent.polaris.ratelimit.client.flow.RateLimitWindow.WindowStatus; +import com.tencent.polaris.ratelimit.client.flow.RateLimitWindowSet; +import com.tencent.polaris.ratelimit.client.flow.WindowContainer; +import com.tencent.polaris.ratelimit.factory.LimitAPIFactory; +import com.tencent.polaris.specification.api.v1.model.ModelProto.MatchString; +import com.tencent.polaris.specification.api.v1.model.ModelProto.MatchString.MatchStringType; +import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Amount; +import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.MatchArgument; +import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.RateLimit; +import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Rule; +import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Rule.AmountMode; +import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Rule.Type; +import com.tencent.polaris.test.common.TestUtils; +import com.tencent.polaris.test.mock.discovery.NamingServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * 集成测试:验证限流窗口过期和恢复机制 + *

+ * 测试完整链路:QuotaFlow → RateLimitWindowSet → WindowContainer → RateLimitWindow + * 涵盖窗口创建、过期检测、清理以及重新创建恢复的端到端流程 + * + * @author Haotian Zhang + */ +public class WindowExpireAndRecoverTest { + + private static final int PORT = 10097; + + private static final String EXPIRE_TEST_SERVICE = "java_expire_test_service"; + + /** + * 限流规则:1秒内最多2个请求(小的限流值便于快速触发限流) + * validDuration=1s → expireDurationMs = 1*1000 + 1000(EXPIRE_FACTOR_MS) = 2000ms + */ + private static final int MAX_AMOUNT = 2; + + private static final int VALID_DURATION_SECONDS = 1; + + /** + * 窗口过期时间 = MaxDuration(1s) + EXPIRE_FACTOR_MS(1000ms) = 2000ms + */ + private static final long EXPECTED_EXPIRE_DURATION_MS = VALID_DURATION_SECONDS * 1000L + 1000L; + + private NamingServer namingServer; + + @Before + public void setUp() { + try { + namingServer = NamingServer.startNamingServer(PORT); + } catch (IOException e) { + throw new RuntimeException(e); + } + ServiceKey serviceKey = new ServiceKey(Consts.NAMESPACE_TEST, EXPIRE_TEST_SERVICE); + namingServer.getNamingService().addService(serviceKey); + + // 构建限流规则:local 模式,1秒2个请求 + RateLimit.Builder rateLimitBuilder = RateLimit.newBuilder(); + Rule.Builder ruleBuilder = Rule.newBuilder(); + ruleBuilder.setType(Type.LOCAL); + ruleBuilder.setPriority(UInt32Value.newBuilder().setValue(0).build()); + ruleBuilder.setAction(StringValue.newBuilder().setValue("reject").build()); + ruleBuilder.setAmountMode(AmountMode.GLOBAL_TOTAL); + ruleBuilder.addArguments( + MatchArgument.newBuilder() + .setType(MatchArgument.Type.CUSTOM) + .setKey(Consts.LABEL_METHOD) + .setValue(MatchString.newBuilder() + .setType(MatchStringType.EXACT) + .setValue(StringValue.newBuilder().setValue(Consts.METHOD_PAY).build()) + .build())); + ruleBuilder.addAmounts( + Amount.newBuilder() + .setMaxAmount(UInt32Value.newBuilder().setValue(MAX_AMOUNT).build()) + .setValidDuration(Duration.newBuilder().setSeconds(VALID_DURATION_SECONDS).build())); + ruleBuilder.setRevision(StringValue.newBuilder().setValue("expire-test-rev-001").build()); + rateLimitBuilder.addRules(ruleBuilder.build()); + rateLimitBuilder.setRevision(StringValue.newBuilder().setValue("expire-test-global-rev").build()); + namingServer.getNamingService().setRateLimit(serviceKey, rateLimitBuilder.build()); + } + + @After + public void tearDown() { + if (null != namingServer) { + namingServer.terminate(); + } + } + + /** + * 测试窗口过期后配额恢复 + * 测试目的:验证限流窗口在长时间不使用后过期被清理,再次请求时创建新窗口并恢复配额 + * 测试场景: + * 1. 发起请求消耗配额直到被限流 + * 2. 通过反射修改 lastAccessTimeMs 模拟过期 + * 3. 手动触发过期清理 + * 4. 再次发起请求,验证配额已恢复 + * 验证内容: + * 1. 首次请求后窗口正常创建,状态为 INITIALIZED + * 2. 过期清理后窗口被移除 + * 3. 再次请求后新窗口被创建,配额正常可用 + */ + @Test + public void testWindowExpireAndRecover() throws Exception { + Configuration configuration = TestUtils.createSimpleConfiguration(PORT); + try (LimitAPI limitAPI = LimitAPIFactory.createLimitAPIByConfig(configuration)) { + RateLimitUtils.adjustTime(); + + // === 阶段1:首次请求,消耗配额并验证限流生效 === + boolean hasLimited = false; + boolean hasPassed = false; + for (int i = 0; i < MAX_AMOUNT + 2; i++) { + QuotaResponse response = acquireQuota(limitAPI, Consts.METHOD_PAY); + if (response.getCode() == QuotaResultCode.QuotaResultOk) { + hasPassed = true; + } else if (response.getCode() == QuotaResultCode.QuotaResultLimited) { + hasLimited = true; + } + } + assertThat(hasPassed).as("应该有请求通过").isTrue(); + assertThat(hasLimited).as("应该有请求被限流").isTrue(); + + // === 阶段2:获取内部的窗口对象,验证窗口已创建 === + QuotaFlow quotaFlow = extractQuotaFlow(limitAPI); + Map svcToWindowSet = getPrivateField(quotaFlow, "svcToWindowSet"); + ServiceKey serviceKey = new ServiceKey(Consts.NAMESPACE_TEST, EXPIRE_TEST_SERVICE); + RateLimitWindowSet windowSet = svcToWindowSet.get(serviceKey); + assertThat(windowSet).as("窗口集合应该存在").isNotNull(); + + Map windowByRule = getPrivateField(windowSet, "windowByRule"); + assertThat(windowByRule).as("规则窗口容器应该存在").isNotEmpty(); + + // 获取第一个窗口容器中的 mainWindow + WindowContainer container = windowByRule.values().iterator().next(); + RateLimitWindow mainWindow = container.getMainWindow(); + assertThat(mainWindow).as("限流窗口应该已创建").isNotNull(); + assertThat(mainWindow.getStatus()).as("窗口状态应为 INITIALIZED").isEqualTo(WindowStatus.INITIALIZED); + + // === 阶段3:通过反射修改 lastAccessTimeMs,模拟窗口过期 === + AtomicLong lastAccessTimeMs = getPrivateField(mainWindow, "lastAccessTimeMs"); + long originalAccessTime = lastAccessTimeMs.get(); + // 将 lastAccessTimeMs 设置为很久以前,使得 currentTime - lastAccessTimeMs > expireDurationMs + lastAccessTimeMs.set(System.currentTimeMillis() - EXPECTED_EXPIRE_DURATION_MS - 5000); + + // 验证 isExpired 返回 true + assertThat(mainWindow.isExpired()).as("修改访问时间后窗口应该过期").isTrue(); + + // === 阶段4:手动触发过期清理 === + windowSet.cleanupContainers(); + + // 验证窗口容器已被清理 + assertThat(windowByRule).as("过期窗口容器应该被移除").isEmpty(); + assertThat(mainWindow.getStatus()).as("窗口状态应变为 DELETED").isEqualTo(WindowStatus.DELETED); + + // === 阶段5:再次请求,验证新窗口被创建且配额恢复 === + RateLimitUtils.adjustTime(); + hasPassed = false; + for (int i = 0; i < MAX_AMOUNT; i++) { + QuotaResponse response = acquireQuota(limitAPI, Consts.METHOD_PAY); + if (response.getCode() == QuotaResultCode.QuotaResultOk) { + hasPassed = true; + } + } + assertThat(hasPassed).as("过期清理后再次请求应该通过(配额恢复)").isTrue(); + + // 验证新的窗口已被创建 + assertThat(windowByRule).as("应该创建了新的窗口容器").isNotEmpty(); + WindowContainer newContainer = windowByRule.values().iterator().next(); + RateLimitWindow newWindow = newContainer.getMainWindow(); + assertThat(newWindow).as("新限流窗口应该存在").isNotNull(); + assertThat(newWindow).as("新窗口应该是不同的实例").isNotSameAs(mainWindow); + assertThat(newWindow.getStatus()).as("新窗口状态应为 INITIALIZED").isEqualTo(WindowStatus.INITIALIZED); + + // 等待内部线程完成 + Utils.sleepUninterrupted(2000); + } + } + + /** + * 测试窗口未过期时不应被清理 + * 测试目的:验证最近刚访问过的窗口不会被误清理 + * 测试场景:发起请求后立即执行过期清理 + * 验证内容: + * 1. 窗口仍然存在于容器中 + * 2. 窗口状态保持 INITIALIZED + * 3. isExpired 返回 false + */ + @Test + public void testWindowNotExpiredShouldNotBeCleanedUp() throws Exception { + Configuration configuration = TestUtils.createSimpleConfiguration(PORT); + try (LimitAPI limitAPI = LimitAPIFactory.createLimitAPIByConfig(configuration)) { + RateLimitUtils.adjustTime(); + + // 发起请求,创建窗口 + QuotaResponse response = acquireQuota(limitAPI, Consts.METHOD_PAY); + assertThat(response.getCode()).as("首次请求应该通过").isEqualTo(QuotaResultCode.QuotaResultOk); + + // 获取内部窗口对象 + QuotaFlow quotaFlow = extractQuotaFlow(limitAPI); + Map svcToWindowSet = getPrivateField(quotaFlow, "svcToWindowSet"); + ServiceKey serviceKey = new ServiceKey(Consts.NAMESPACE_TEST, EXPIRE_TEST_SERVICE); + RateLimitWindowSet windowSet = svcToWindowSet.get(serviceKey); + Map windowByRule = getPrivateField(windowSet, "windowByRule"); + WindowContainer container = windowByRule.values().iterator().next(); + RateLimitWindow mainWindow = container.getMainWindow(); + + // 验证窗口未过期 + assertThat(mainWindow.isExpired()).as("刚访问过的窗口不应过期").isFalse(); + + // 执行过期清理 + windowSet.cleanupContainers(); + + // 验证窗口仍然存在 + assertThat(windowByRule).as("窗口容器不应被移除").isNotEmpty(); + assertThat(mainWindow.getStatus()).as("窗口状态应保持 INITIALIZED").isEqualTo(WindowStatus.INITIALIZED); + + Utils.sleepUninterrupted(2000); + } + } + + /** + * 测试窗口过期后再次限流恢复正常 + * 测试目的:验证过期恢复后新窗口的限流逻辑完整可用 + * 测试场景: + * 1. 消耗全部配额至限流 + * 2. 模拟过期并清理 + * 3. 重新请求验证限流恢复并再次触发限流 + * 验证内容:新窗口的限流规则与原窗口一致(MAX_AMOUNT 个请求通过后触发限流) + */ + @Test + public void testExpiredWindowRecoverAndRelimit() throws Exception { + Configuration configuration = TestUtils.createSimpleConfiguration(PORT); + try (LimitAPI limitAPI = LimitAPIFactory.createLimitAPIByConfig(configuration)) { + RateLimitUtils.adjustTime(); + + // 阶段1:消耗配额 + for (int i = 0; i < MAX_AMOUNT + 2; i++) { + acquireQuota(limitAPI, Consts.METHOD_PAY); + } + + // 阶段2:获取窗口并模拟过期 + QuotaFlow quotaFlow = extractQuotaFlow(limitAPI); + Map svcToWindowSet = getPrivateField(quotaFlow, "svcToWindowSet"); + ServiceKey serviceKey = new ServiceKey(Consts.NAMESPACE_TEST, EXPIRE_TEST_SERVICE); + RateLimitWindowSet windowSet = svcToWindowSet.get(serviceKey); + Map windowByRule = getPrivateField(windowSet, "windowByRule"); + WindowContainer container = windowByRule.values().iterator().next(); + RateLimitWindow mainWindow = container.getMainWindow(); + + // 模拟过期 + AtomicLong lastAccessTimeMs = getPrivateField(mainWindow, "lastAccessTimeMs"); + lastAccessTimeMs.set(System.currentTimeMillis() - EXPECTED_EXPIRE_DURATION_MS - 5000); + windowSet.cleanupContainers(); + assertThat(windowByRule).as("窗口容器应被清理").isEmpty(); + + // 阶段3:等待新的时间窗口开始 + RateLimitUtils.adjustTime(); + + // 阶段4:再次发起请求,验证限流规则恢复 + boolean hasPassed = false; + boolean hasLimited = false; + for (int i = 0; i < MAX_AMOUNT + 2; i++) { + QuotaResponse response = acquireQuota(limitAPI, Consts.METHOD_PAY); + if (response.getCode() == QuotaResultCode.QuotaResultOk) { + hasPassed = true; + } else if (response.getCode() == QuotaResultCode.QuotaResultLimited) { + hasLimited = true; + } + } + assertThat(hasPassed).as("恢复后应该有请求通过").isTrue(); + assertThat(hasLimited).as("恢复后限流规则应该生效,请求应被限流").isTrue(); + + Utils.sleepUninterrupted(2000); + } + } + + /** + * 测试多次过期恢复循环 + * 测试目的:验证窗口可以多次经历过期-恢复循环,每次都能正常工作 + * 测试场景:执行3轮过期-恢复循环 + * 验证内容:每轮恢复后配额均正常可用 + */ + @Test + public void testMultipleExpireRecoverCycles() throws Exception { + Configuration configuration = TestUtils.createSimpleConfiguration(PORT); + try (LimitAPI limitAPI = LimitAPIFactory.createLimitAPIByConfig(configuration)) { + QuotaFlow quotaFlow = extractQuotaFlow(limitAPI); + Map svcToWindowSet = getPrivateField(quotaFlow, "svcToWindowSet"); + ServiceKey serviceKey = new ServiceKey(Consts.NAMESPACE_TEST, EXPIRE_TEST_SERVICE); + + for (int cycle = 0; cycle < 3; cycle++) { + RateLimitUtils.adjustTime(); + + // 发起请求创建/使用窗口 + boolean hasPassed = false; + boolean hasLimited = false; + for (int i = 0; i < MAX_AMOUNT + 2; i++) { + QuotaResponse response = acquireQuota(limitAPI, Consts.METHOD_PAY); + if (response.getCode() == QuotaResultCode.QuotaResultOk) { + hasPassed = true; + } else if (response.getCode() == QuotaResultCode.QuotaResultLimited) { + hasLimited = true; + } + } + assertThat(hasPassed).as("第 %d 轮:应该有请求通过", cycle + 1).isTrue(); + assertThat(hasLimited).as("第 %d 轮:应该有请求被限流", cycle + 1).isTrue(); + + // 模拟过期 + RateLimitWindowSet windowSet = svcToWindowSet.get(serviceKey); + assertThat(windowSet).as("第 %d 轮:窗口集合应该存在", cycle + 1).isNotNull(); + Map windowByRule = getPrivateField(windowSet, "windowByRule"); + WindowContainer container = windowByRule.values().iterator().next(); + RateLimitWindow window = container.getMainWindow(); + AtomicLong lastAccessTimeMs = getPrivateField(window, "lastAccessTimeMs"); + lastAccessTimeMs.set(System.currentTimeMillis() - EXPECTED_EXPIRE_DURATION_MS - 5000); + + // 清理 + windowSet.cleanupContainers(); + assertThat(windowByRule).as("第 %d 轮:窗口容器应被清理", cycle + 1).isEmpty(); + } + + Utils.sleepUninterrupted(2000); + } + } + + /** + * 测试并发请求与过期清理同时进行时的线程安全性 + * 测试目的:验证多个请求线程并发调用 getQuota() 的同时,过期清理线程执行 cleanupContainers(), + * 不会抛出 ConcurrentModificationException 或其他并发异常 + * 测试场景: + * 1. 创建窗口并消耗配额 + * 2. 通过反射模拟窗口过期 + * 3. 启动多个请求线程并发调用 getQuota(),同时另一个线程执行 cleanupContainers() + * 4. 等待所有线程完成 + * 验证内容: + * 1. 没有线程抛出异常 + * 2. 所有请求线程都成功返回了结果(QuotaResultOk 或 QuotaResultLimited) + * 3. 执行完毕后窗口状态一致 + */ + @Test + public void testConcurrentRequestsWithExpireCleanup() throws Exception { + Configuration configuration = TestUtils.createSimpleConfiguration(PORT); + try (LimitAPI limitAPI = LimitAPIFactory.createLimitAPIByConfig(configuration)) { + RateLimitUtils.adjustTime(); + + // 阶段1:创建窗口 + QuotaResponse initialResponse = acquireQuota(limitAPI, Consts.METHOD_PAY); + assertThat(initialResponse.getCode()).as("首次请求应通过").isEqualTo(QuotaResultCode.QuotaResultOk); + + // 获取内部对象 + QuotaFlow quotaFlow = extractQuotaFlow(limitAPI); + Map svcToWindowSet = getPrivateField(quotaFlow, "svcToWindowSet"); + ServiceKey serviceKey = new ServiceKey(Consts.NAMESPACE_TEST, EXPIRE_TEST_SERVICE); + RateLimitWindowSet windowSet = svcToWindowSet.get(serviceKey); + Map windowByRule = getPrivateField(windowSet, "windowByRule"); + WindowContainer container = windowByRule.values().iterator().next(); + RateLimitWindow mainWindow = container.getMainWindow(); + + // 阶段2:模拟窗口过期 + AtomicLong lastAccessTimeMs = getPrivateField(mainWindow, "lastAccessTimeMs"); + lastAccessTimeMs.set(System.currentTimeMillis() - EXPECTED_EXPIRE_DURATION_MS - 5000); + + // 阶段3:并发执行请求和清理 + int requestThreadCount = 10; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(requestThreadCount + 1); + AtomicInteger exceptionCount = new AtomicInteger(0); + AtomicInteger successResponseCount = new AtomicInteger(0); + List exceptions = Collections.synchronizedList(new ArrayList<>()); + + // 启动请求线程 + for (int i = 0; i < requestThreadCount; i++) { + new Thread(() -> { + try { + startLatch.await(); + QuotaResponse response = acquireQuota(limitAPI, Consts.METHOD_PAY); + if (response.getCode() == QuotaResultCode.QuotaResultOk + || response.getCode() == QuotaResultCode.QuotaResultLimited) { + successResponseCount.incrementAndGet(); + } + } catch (Throwable t) { + exceptionCount.incrementAndGet(); + exceptions.add(t); + } finally { + doneLatch.countDown(); + } + }, "request-thread-" + i).start(); + } + + // 启动清理线程 + new Thread(() -> { + try { + startLatch.await(); + windowSet.cleanupContainers(); + } catch (Throwable t) { + exceptionCount.incrementAndGet(); + exceptions.add(t); + } finally { + doneLatch.countDown(); + } + }, "cleanup-thread").start(); + + // 同时释放所有线程 + startLatch.countDown(); + boolean allDone = doneLatch.await(10, TimeUnit.SECONDS); + + // 验证 + assertThat(allDone).as("所有线程应在超时前完成").isTrue(); + assertThat(exceptionCount.get()) + .as("不应有线程抛出异常,但发现异常: %s", + exceptions.isEmpty() ? "无" : exceptions.get(0).toString()) + .isEqualTo(0); + assertThat(successResponseCount.get()) + .as("所有请求线程应成功返回结果").isEqualTo(requestThreadCount); + + Utils.sleepUninterrupted(2000); + } + } + + /** + * 测试过期清理后多线程并发恢复窗口 + * 测试目的:验证窗口过期清理后,多个线程同时发起请求时,通过 ConcurrentHashMap.computeIfAbsent + * 只创建一个新窗口实例,且所有线程都能正常获取到配额结果 + * 测试场景: + * 1. 创建窗口并模拟过期,执行清理 + * 2. 确认窗口已被清理 + * 3. 多线程同时发起请求,触发新窗口创建 + * 验证内容: + * 1. 所有线程都成功获取到配额结果,无异常 + * 2. 恢复后只创建了一个窗口容器(computeIfAbsent 的原子性保证) + * 3. 所有请求线程中至少有一些通过了限流(配额恢复有效) + */ + @Test + public void testConcurrentRecoverAfterExpire() throws Exception { + Configuration configuration = TestUtils.createSimpleConfiguration(PORT); + try (LimitAPI limitAPI = LimitAPIFactory.createLimitAPIByConfig(configuration)) { + RateLimitUtils.adjustTime(); + + // 阶段1:创建窗口并消耗配额 + for (int i = 0; i < MAX_AMOUNT + 1; i++) { + acquireQuota(limitAPI, Consts.METHOD_PAY); + } + + // 获取内部对象 + QuotaFlow quotaFlow = extractQuotaFlow(limitAPI); + Map svcToWindowSet = getPrivateField(quotaFlow, "svcToWindowSet"); + ServiceKey serviceKey = new ServiceKey(Consts.NAMESPACE_TEST, EXPIRE_TEST_SERVICE); + RateLimitWindowSet windowSet = svcToWindowSet.get(serviceKey); + Map windowByRule = getPrivateField(windowSet, "windowByRule"); + + // 阶段2:模拟过期并清理 + WindowContainer container = windowByRule.values().iterator().next(); + RateLimitWindow oldWindow = container.getMainWindow(); + AtomicLong lastAccessTimeMs = getPrivateField(oldWindow, "lastAccessTimeMs"); + lastAccessTimeMs.set(System.currentTimeMillis() - EXPECTED_EXPIRE_DURATION_MS - 5000); + windowSet.cleanupContainers(); + assertThat(windowByRule).as("窗口容器应已被清理").isEmpty(); + + // 阶段3:等待新的时间窗口开始 + RateLimitUtils.adjustTime(); + + // 阶段4:多线程并发发起请求,触发新窗口创建 + int threadCount = 10; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + AtomicInteger exceptionCount = new AtomicInteger(0); + AtomicInteger passedCount = new AtomicInteger(0); + AtomicInteger limitedCount = new AtomicInteger(0); + List exceptions = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < threadCount; i++) { + new Thread(() -> { + try { + startLatch.await(); + QuotaResponse response = acquireQuota(limitAPI, Consts.METHOD_PAY); + if (response.getCode() == QuotaResultCode.QuotaResultOk) { + passedCount.incrementAndGet(); + } else if (response.getCode() == QuotaResultCode.QuotaResultLimited) { + limitedCount.incrementAndGet(); + } + } catch (Throwable t) { + exceptionCount.incrementAndGet(); + exceptions.add(t); + } finally { + doneLatch.countDown(); + } + }, "recover-thread-" + i).start(); + } + + // 同时释放所有线程 + startLatch.countDown(); + boolean allDone = doneLatch.await(10, TimeUnit.SECONDS); + + // 验证 + assertThat(allDone).as("所有线程应在超时前完成").isTrue(); + assertThat(exceptionCount.get()) + .as("不应有线程抛出异常,但发现异常: %s", + exceptions.isEmpty() ? "无" : exceptions.get(0).toString()) + .isEqualTo(0); + assertThat(passedCount.get() + limitedCount.get()) + .as("所有线程应成功获取到配额结果").isEqualTo(threadCount); + assertThat(passedCount.get()) + .as("恢复后应有请求通过(配额已恢复)").isGreaterThan(0); + + // 验证只创建了一个窗口容器 + assertThat(windowByRule).as("应只创建一个窗口容器").hasSize(1); + WindowContainer newContainer = windowByRule.values().iterator().next(); + RateLimitWindow newWindow = newContainer.getMainWindow(); + assertThat(newWindow).as("新限流窗口应存在").isNotNull(); + assertThat(newWindow).as("新窗口应与旧窗口不同").isNotSameAs(oldWindow); + + Utils.sleepUninterrupted(2000); + } + } + + /** + * 发起配额请求 + */ + private QuotaResponse acquireQuota(LimitAPI limitAPI, String method) { + QuotaRequest request = new QuotaRequest(); + request.setNamespace(Consts.NAMESPACE_TEST); + request.setService(EXPIRE_TEST_SERVICE); + Set arguments = new HashSet<>(); + arguments.add(Argument.buildCustom(Consts.LABEL_METHOD, method)); + request.setArguments(arguments); + return limitAPI.getQuota(request); + } + + /** + * 通过反射从 LimitAPI 中提取 QuotaFlow 对象 + * 调用链:DefaultLimitAPI → limitFlow(DefaultLimitFlow) → quotaFlow(QuotaFlow) + */ + private QuotaFlow extractQuotaFlow(LimitAPI limitAPI) throws Exception { + DefaultLimitAPI defaultLimitAPI = (DefaultLimitAPI) limitAPI; + LimitFlow limitFlow = getPrivateField(defaultLimitAPI, "limitFlow"); + DefaultLimitFlow defaultLimitFlow = (DefaultLimitFlow) limitFlow; + return getPrivateField(defaultLimitFlow, "quotaFlow"); + } + + @SuppressWarnings("unchecked") + private static T getPrivateField(Object object, String fieldName) + throws NoSuchFieldException, IllegalAccessException { + Class clazz = object.getClass(); + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(object); + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } +}