Skip to content

x/time/rate: Limiter allows more configured with multiple goroutines #65508

@ianzhang1988

Description

@ianzhang1988

Go version

go version go1.18.5 linux/amd64

Output of go env in your module/workspace:

GO111MODULE="on"
GOARCH="amd64"
GOBIN=""
GOCACHE="/root/.cache/go-build"
GOENV="/root/.config/go/env"
GOEXE=""
GOEXPERIMENT=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="linux"
GOINSECURE=""
GOMODCACHE="/root/.gvm/pkgsets/go1.18.5/global/pkg/mod"
GONOPROXY=""
GONOSUMDB=""
GOOS="linux"
GOPATH="/root/.gvm/pkgsets/go1.18.5/global"
GOPRIVATE=""
GOPROXY="https://goproxy.cn,direct"
GOROOT="/root/.gvm/gos/go1.18.5"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/root/.gvm/gos/go1.18.5/pkg/tool/linux_amd64"
GOVCS=""
GOVERSION="go1.18.5"
GCCGO="gccgo"
GOAMD64="v1"
AR="ar"
CC="gcc"
CXX="g++"
CGO_ENABLED="0"
GOMOD="/data/zhangyang/goproject/go.mod"
GOWORK=""
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -m64 -fmessage-length=0 -fdebug-prefix-map=/tmp/go-build3572669853=/tmp/go-build -gno-record-gcc-switches"

What did you do?

Here is my code

package main

import (
	"fmt"
	"time"
	. "golang.org/x/time/rate"
)

func Produce(ch chan uint64) {
	var counter uint64 = 0
	for {
		ch <- counter
		counter += 1
	}
}

func ConsumWithLimitDelay(ch chan uint64, chout chan uint64, lim *Limiter) {
	for {
		n := lim.Reserve()
		if !n.OK() {
			continue
		}

		time.Sleep(n.Delay())
		chout <- <-ch
	}
}

func Count(ch chan uint64) {
	var counter uint64 = 0

	lastTime := time.Now()

	for {
		_ = <-ch
		counter += 1
		du := time.Since(lastTime)
		if du > 1*time.Second {
			fmt.Printf("%d %v\n", counter, du)
			counter = 0
			lastTime = lastTime.Add(du)
		}
	}
}

func main() {
	ch := make(chan uint64, 1000)
	ch2 := make(chan uint64, 100)
	go Produce(ch)
	lim := NewLimiter(200000.0, 1000)
	for i := 0; i < 100; i++ {
		go ConsumWithLimitDelay(ch, ch2, lim)
	}
	Count(ch2)
}

limit is set to 200k, but actually is 300k on my machine.

here is what i think is going wrong in rate.go:
sometimes lim.advance(t) would return a time t which is in the past, then update to lim.last and causing problem.
(see // !!! comment in code below)

func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()

	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: t,
		}
	} else if lim.limit == 0 {
		var ok bool
		if lim.burst >= n {
			ok = true
			lim.burst -= n
		}
		return Reservation{
			ok:        ok,
			lim:       lim,
			tokens:    lim.burst,
			timeToAct: t,
		}
	}

        // !!! with this block of code would set rate.go work correctly
	// if t.Before(lim.last) {
	// 	return Reservation{
	// 		ok:    false,
	// 		lim:   lim,
	// 		limit: lim.limit,
	// 	}
	// }

        // !!! t could be in the past in multiple goroutine
	t, tokens := lim.advance(t)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = t.Add(waitDuration)

                // !!! some time here would update last to a past time, causing the problem
		// Update state
		lim.last = t
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	}

	return r
}

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
	last := lim.last
	if t.Before(last) {
		last = t
	}

	// Calculate the new number of tokens, due to time that passed.
	elapsed := t.Sub(last)
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return t, tokens
}

What did you see happen?

limiter can't limit properly in multiple goroutine

What did you expect to see?

limiter limit properly in multiple goroutine

Metadata

Metadata

Assignees

No one assigned

    Labels

    NeedsInvestigationSomeone must examine and confirm this is a valid issue and not a duplicate of an existing one.

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions