diff --git a/rate/rate.go b/rate/rate.go index 563270c..254f67b 100644 --- a/rate/rate.go +++ b/rate/rate.go @@ -372,7 +372,9 @@ func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) r.timeToAct = t.Add(waitDuration) // Update state - lim.last = t + if t.After(lim.last) { + lim.last = t + } lim.tokens = tokens lim.lastEvent = r.timeToAct } diff --git a/rate/rate_test.go b/rate/rate_test.go index 8b93903..2302c18 100644 --- a/rate/rate_test.go +++ b/rate/rate_test.go @@ -61,6 +61,7 @@ var ( t3 = t0.Add(time.Duration(3) * d) t4 = t0.Add(time.Duration(4) * d) t5 = t0.Add(time.Duration(5) * d) + t6 = t0.Add(time.Duration(6) * d) t9 = t0.Add(time.Duration(9) * d) ) @@ -125,7 +126,7 @@ func TestLimiterJumpBackwards(t *testing.T) { {t0, 1, 1, true}, {t0, 0, 1, false}, {t0, 0, 1, false}, - {t1, 1, 1, true}, // got a token + // {t1, 1, 1, true}, // got a token {t1, 0, 1, false}, {t1, 0, 1, false}, {t2, 1, 1, true}, // got another token @@ -423,13 +424,13 @@ func TestReserveJumpBack(t *testing.T) { runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst - runReserve(t, lim, request{t2, 2, t3, true}) + runReserve(t, lim, request{t2, 2, t4, true}) // burst size is 2, so n=3 always fails, and the state of lim should not be changed runReserve(t, lim, request{t0, 3, time.Time{}, false}) - runReserve(t, lim, request{t2, 1, t4, true}) + runReserve(t, lim, request{t2, 1, t5, true}) // the maxReserve is not enough so it fails, and the state of lim should not be changed runReserveMax(t, lim, request{t0, 2, time.Time{}, false}, d) - runReserve(t, lim, request{t2, 1, t5, true}) + runReserve(t, lim, request{t2, 1, t6, true}) } func TestReserveJumpBackCancel(t *testing.T) { @@ -655,3 +656,41 @@ func TestTinyLimit(t *testing.T) { t.Errorf("Limit(1e-10, 1) want false when already used") } } + +func TestLimiter(t *testing.T) { + runTestLimiter := func(concurrency int, totalRate float64, burst int, duration time.Duration) { + limiter := NewLimiter(Limit(totalRate), burst) + var allowed int64 + var wg sync.WaitGroup + ctx, _ := context.WithTimeout(context.Background(), duration) + + for range concurrency { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + err := limiter.Wait(ctx) + if err == nil { + atomic.AddInt64(&allowed, 1) + } else { + return + } + } + } + }() + } + + wg.Wait() + most := int64(totalRate*duration.Seconds()) + int64(burst) + if allowed > most { + t.Errorf("limit failed.At most, it should be:%d,but in fact:%d", most, allowed) + } + } + runTestLimiter(1000, 100000, 100000, 5*time.Second) + runTestLimiter(1000, 100000, 10000, 5*time.Second) + runTestLimiter(100, 100000, 10000, 5*time.Second) +}