Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rate/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration)
r.timeToAct = t.Add(waitDuration)

// Update state
lim.last = t
if t.After(lim.last) {
lim.last = t
}
lim.tokens = tokens
lim.lastEvent = r.timeToAct
}
Expand Down
47 changes: 43 additions & 4 deletions rate/rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
t3 = t0.Add(time.Duration(3) * d)
t4 = t0.Add(time.Duration(4) * d)
t5 = t0.Add(time.Duration(5) * d)
t6 = t0.Add(time.Duration(6) * d)
t9 = t0.Add(time.Duration(9) * d)
)

Expand Down Expand Up @@ -125,7 +126,7 @@ func TestLimiterJumpBackwards(t *testing.T) {
{t0, 1, 1, true},
{t0, 0, 1, false},
{t0, 0, 1, false},
{t1, 1, 1, true}, // got a token
// {t1, 1, 1, true}, // got a token
{t1, 0, 1, false},
{t1, 0, 1, false},
{t2, 1, 1, true}, // got another token
Expand Down Expand Up @@ -423,13 +424,13 @@ func TestReserveJumpBack(t *testing.T) {

runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst
runReserve(t, lim, request{t2, 2, t3, true})
runReserve(t, lim, request{t2, 2, t4, true})
// burst size is 2, so n=3 always fails, and the state of lim should not be changed
runReserve(t, lim, request{t0, 3, time.Time{}, false})
runReserve(t, lim, request{t2, 1, t4, true})
runReserve(t, lim, request{t2, 1, t5, true})
// the maxReserve is not enough so it fails, and the state of lim should not be changed
runReserveMax(t, lim, request{t0, 2, time.Time{}, false}, d)
runReserve(t, lim, request{t2, 1, t5, true})
runReserve(t, lim, request{t2, 1, t6, true})
}

func TestReserveJumpBackCancel(t *testing.T) {
Expand Down Expand Up @@ -655,3 +656,41 @@ func TestTinyLimit(t *testing.T) {
t.Errorf("Limit(1e-10, 1) want false when already used")
}
}

func TestLimiter(t *testing.T) {
runTestLimiter := func(concurrency int, totalRate float64, burst int, duration time.Duration) {
limiter := NewLimiter(Limit(totalRate), burst)
var allowed int64
var wg sync.WaitGroup
ctx, _ := context.WithTimeout(context.Background(), duration)

for range concurrency {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
err := limiter.Wait(ctx)
if err == nil {
atomic.AddInt64(&allowed, 1)
} else {
return
}
}
}
}()
}

wg.Wait()
most := int64(totalRate*duration.Seconds()) + int64(burst)
if allowed > most {
t.Errorf("limit failed.At most, it should be:%d,but in fact:%d", most, allowed)
}
}
runTestLimiter(1000, 100000, 100000, 5*time.Second)
runTestLimiter(1000, 100000, 10000, 5*time.Second)
runTestLimiter(100, 100000, 10000, 5*time.Second)
}