From 936078aca9c3600927e1434708d8075fde6bd2a6 Mon Sep 17 00:00:00 2001 From: zhangenyao Date: Fri, 24 Oct 2025 17:26:11 +0800 Subject: [PATCH] rate: fix limiter becoming invalid under high rate and concurrency The rate limiter could become ineffective when the request rate or concurrency were high. This is because we allowed time reverse jump. This change modifies the the time calculation of the limiter. If input an old time, we don't change time of limiter. Fixes golang/go#65508 --- rate/rate.go | 4 +++- rate/rate_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/rate/rate.go b/rate/rate.go index 563270c..254f67b 100644 --- a/rate/rate.go +++ b/rate/rate.go @@ -372,7 +372,9 @@ func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) r.timeToAct = t.Add(waitDuration) // Update state - lim.last = t + if t.After(lim.last) { + lim.last = t + } lim.tokens = tokens lim.lastEvent = r.timeToAct } diff --git a/rate/rate_test.go b/rate/rate_test.go index 8b93903..2302c18 100644 --- a/rate/rate_test.go +++ b/rate/rate_test.go @@ -61,6 +61,7 @@ var ( t3 = t0.Add(time.Duration(3) * d) t4 = t0.Add(time.Duration(4) * d) t5 = t0.Add(time.Duration(5) * d) + t6 = t0.Add(time.Duration(6) * d) t9 = t0.Add(time.Duration(9) * d) ) @@ -125,7 +126,7 @@ func TestLimiterJumpBackwards(t *testing.T) { {t0, 1, 1, true}, {t0, 0, 1, false}, {t0, 0, 1, false}, - {t1, 1, 1, true}, // got a token + // {t1, 1, 1, true}, // got a token {t1, 0, 1, false}, {t1, 0, 1, false}, {t2, 1, 1, true}, // got another token @@ -423,13 +424,13 @@ func TestReserveJumpBack(t *testing.T) { runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst - runReserve(t, lim, request{t2, 2, t3, true}) + runReserve(t, lim, request{t2, 2, t4, true}) // burst size is 2, so n=3 always fails, and the state of lim should not be changed runReserve(t, lim, request{t0, 3, time.Time{}, false}) - runReserve(t, lim, request{t2, 1, t4, true}) + runReserve(t, lim, request{t2, 1, t5, true}) // the maxReserve is not enough so it fails, and the state of lim should not be changed runReserveMax(t, lim, request{t0, 2, time.Time{}, false}, d) - runReserve(t, lim, request{t2, 1, t5, true}) + runReserve(t, lim, request{t2, 1, t6, true}) } func TestReserveJumpBackCancel(t *testing.T) { @@ -655,3 +656,41 @@ func TestTinyLimit(t *testing.T) { t.Errorf("Limit(1e-10, 1) want false when already used") } } + +func TestLimiter(t *testing.T) { + runTestLimiter := func(concurrency int, totalRate float64, burst int, duration time.Duration) { + limiter := NewLimiter(Limit(totalRate), burst) + var allowed int64 + var wg sync.WaitGroup + ctx, _ := context.WithTimeout(context.Background(), duration) + + for range concurrency { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + err := limiter.Wait(ctx) + if err == nil { + atomic.AddInt64(&allowed, 1) + } else { + return + } + } + } + }() + } + + wg.Wait() + most := int64(totalRate*duration.Seconds()) + int64(burst) + if allowed > most { + t.Errorf("limit failed.At most, it should be:%d,but in fact:%d", most, allowed) + } + } + runTestLimiter(1000, 100000, 100000, 5*time.Second) + runTestLimiter(1000, 100000, 10000, 5*time.Second) + runTestLimiter(100, 100000, 10000, 5*time.Second) +}