Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions adaptive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package bulwark
import (
"context"
"errors"
"math/rand"
"math/rand/v2"
"sync"
"time"

Expand Down Expand Up @@ -50,6 +50,7 @@ type AdaptiveThrottle struct {
isRejectedError func(error) bool
clientSideRejectionError error
now func() time.Time
rng *rand.Rand
}

// NewAdaptiveThrottle returns an AdaptiveThrottle.
Expand Down Expand Up @@ -88,6 +89,11 @@ func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *Ada
validate = ClampInvalidPriority
}

var rng *rand.Rand
if opts.randSource != nil {
rng = rand.New(opts.randSource)
}

return &AdaptiveThrottle{
k: opts.k,
priorities: priorities,
Expand All @@ -98,6 +104,7 @@ func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *Ada
isRejectedError: opts.isRejectedError,
clientSideRejectionError: opts.clientSideRejectionError,
now: opts.now,
rng: rng,
}
}

Expand Down Expand Up @@ -125,7 +132,7 @@ func (t *AdaptiveThrottle) Throttle(

now := t.nowTime()
rejectionProbability := t.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
if t.randFloat64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
// accepts. While it may seem counterintuitive, given that locally rejected
// requests aren't actually propagated, this is the preferred behavior. As the
Expand Down Expand Up @@ -231,6 +238,13 @@ func (t *AdaptiveThrottle) rejectionError() error {
return ClientSideRejectionError
}

func (t *AdaptiveThrottle) randFloat64() float64 {
if t.rng != nil {
return t.rng.Float64()
}
return rand.Float64()
}

// Additional options for the AdaptiveThrottle type. These options do not frequently need to be
// tuned as the defaults work in a majority of cases.
type AdaptiveThrottleOption struct {
Expand All @@ -245,6 +259,7 @@ type adaptiveThrottleOptions struct {
isRejectedError func(error) bool
clientSideRejectionError error
now func() time.Time
randSource rand.Source
}

// WithAdaptiveThrottleRatio sets the ratio of the measured success rate and the rate that the throttle
Expand Down Expand Up @@ -330,6 +345,21 @@ func WithNow(fn func() time.Time) AdaptiveThrottleOption {
}}
}

// WithRandomSource sets the per-instance random source used to sample the
// rejection probability. This is primarily useful in tests to produce
// deterministic behaviour: a source that always returns 0 will shed every
// request whose rejection probability is greater than zero, and a source that
// always returns math.MaxUint64 will never shed.
//
// The provided source must be safe for concurrent use if the AdaptiveThrottle
// is used concurrently. rand.NewPCG and rand.NewChaCha8 are not concurrent-safe
// by default.
func WithRandomSource(src rand.Source) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.randSource = src
}}
}

func Throttle[T any](
ctx context.Context,
at *AdaptiveThrottle,
Expand All @@ -344,7 +374,7 @@ func Throttle[T any](

now := at.nowTime()
rejectionProbability := at.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
if at.randFloat64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
// accepts. While it may seem counterintuitive, given that locally rejected
// requests aren't actually propagated, this is the preferred behavior. As the
Expand Down Expand Up @@ -407,7 +437,7 @@ func WithAdaptiveThrottle[T any](

now := at.nowTime()
rejectionProbability := at.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
if at.randFloat64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
// accepts. While it may seem counterintuitive, given that locally rejected
// requests aren't actually propagated, this is the preferred behavior. As the
Expand Down
Loading
Loading