Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 96 additions & 29 deletions adaptive.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type AdaptiveThrottle struct {
requests []windowedCounter
accepts []windowedCounter
validate func(p Priority, priorities int) (Priority, error)

isRejectedError func(error) bool
clientSideRejectionError error
now func() time.Time
}

// NewAdaptiveThrottle returns an AdaptiveThrottle.
Expand All @@ -66,7 +70,12 @@ func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *Ada
option.f(&opts)
}

now := Now()
nowFn := opts.now
if nowFn == nil {
nowFn = Now
}

now := nowFn()
requests := make([]windowedCounter, priorities)
accepts := make([]windowedCounter, priorities)
for i := range requests {
Expand All @@ -80,12 +89,15 @@ func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *Ada
}

return &AdaptiveThrottle{
k: opts.k,
priorities: priorities,
requests: requests,
accepts: accepts,
minPerWindow: opts.minRate * opts.d.Seconds(),
validate: validate,
k: opts.k,
priorities: priorities,
requests: requests,
accepts: accepts,
minPerWindow: opts.minRate * opts.d.Seconds(),
validate: validate,
isRejectedError: opts.isRejectedError,
clientSideRejectionError: opts.clientSideRejectionError,
now: opts.now,
}
}

Expand All @@ -111,7 +123,7 @@ func (t *AdaptiveThrottle) Throttle(
return err
}

now := Now()
now := t.nowTime()
rejectionProbability := t.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
Expand All @@ -122,16 +134,17 @@ func (t *AdaptiveThrottle) Throttle(
// increase the probability of dropping new requests.
t.reject(priority, now)

rejErr := t.rejectionError()
if len(fallbackFn) > 0 {
return fallbackFn[0](ctx, ClientSideRejectionError, true)
return fallbackFn[0](ctx, rejErr, true)
}

return ClientSideRejectionError
return rejErr
}

err = fn(ctx)

now = Now()
now = t.nowTime()
switch {
case err == nil:
t.accept(priority, now)
Expand All @@ -140,7 +153,7 @@ func (t *AdaptiveThrottle) Throttle(
err = err.(errRejected).inner

fallthrough
case IsRejectedError(err):
case t.checkIsRejected(err):
t.reject(priority, now)
default:
t.accept(priority, now)
Expand Down Expand Up @@ -197,17 +210,41 @@ func (t *AdaptiveThrottle) reject(p Priority, now time.Time) {
t.m.Unlock()
}

func (t *AdaptiveThrottle) nowTime() time.Time {
if t.now != nil {
return t.now()
}
return Now()
}

func (t *AdaptiveThrottle) checkIsRejected(err error) bool {
if t.isRejectedError != nil {
return t.isRejectedError(err)
}
return IsRejectedError(err)
}

func (t *AdaptiveThrottle) rejectionError() error {
if t.clientSideRejectionError != nil {
return t.clientSideRejectionError
}
return ClientSideRejectionError
}

// Additional options for the AdaptiveThrottle type. These options do not frequently need to be
// tuned as the defaults work in a majority of cases.
type AdaptiveThrottleOption struct {
f func(*adaptiveThrottleOptions)
}

type adaptiveThrottleOptions struct {
k float64
minRate float64
d time.Duration
validate func(p Priority, priorities int) (Priority, error)
k float64
minRate float64
d time.Duration
validate func(p Priority, priorities int) (Priority, error)
isRejectedError func(error) bool
clientSideRejectionError error
now func() time.Time
}

// WithAdaptiveThrottleRatio sets the ratio of the measured success rate and the rate that the throttle
Expand Down Expand Up @@ -266,6 +303,33 @@ func WithPriorityValidator(fn func(p Priority, priorities int) (Priority, error)
}}
}

// WithRejectedErrorFunc sets the per-instance function that determines whether
// an error returned by the throttled function should be counted as a rejection.
// When set, this takes precedence over the global IsRejectedError.
func WithRejectedErrorFunc(fn func(error) bool) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.isRejectedError = fn
}}
}

// WithClientSideRejectionError sets the per-instance error returned when the
// throttle rejects a request on the client side without forwarding it to the
// backend. When set, this takes precedence over the global ClientSideRejectionError.
func WithClientSideRejectionError(err error) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.clientSideRejectionError = err
}}
}

// WithNow sets the per-instance time source. This is primarily useful in tests
// to control the clock without affecting other AdaptiveThrottle instances.
// When set, this takes precedence over the global Now.
func WithNow(fn func() time.Time) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.now = fn
}}
}

func Throttle[T any](
ctx context.Context,
at *AdaptiveThrottle,
Expand All @@ -278,7 +342,7 @@ func Throttle[T any](
return res, err
}

now := Now()
now := at.nowTime()
rejectionProbability := at.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
Expand All @@ -290,16 +354,17 @@ func Throttle[T any](
at.reject(priority, now)
var zero T

rejErr := at.rejectionError()
if len(fallbackFn) > 0 {
return fallbackFn[0](ctx, ClientSideRejectionError, true)
return fallbackFn[0](ctx, rejErr, true)
}

return zero, ClientSideRejectionError
return zero, rejErr
}

res, err = throttledFn(ctx)

now = Now()
now = at.nowTime()
switch {
case err == nil:
at.accept(priority, now)
Expand All @@ -308,7 +373,7 @@ func Throttle[T any](
err = err.(errRejected).inner

fallthrough
case IsRejectedError(err):
case at.checkIsRejected(err):
at.reject(priority, now)
default:
at.accept(priority, now)
Expand Down Expand Up @@ -340,7 +405,7 @@ func WithAdaptiveThrottle[T any](
return res, err
}

now := Now()
now := at.nowTime()
rejectionProbability := at.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
Expand All @@ -352,12 +417,12 @@ func WithAdaptiveThrottle[T any](
at.reject(priority, now)
var zero T

return zero, ClientSideRejectionError
return zero, at.rejectionError()
}

res, err = throttledFn()

now = Now()
now = at.nowTime()
switch {
case err == nil:
at.accept(priority, now)
Expand All @@ -366,7 +431,7 @@ func WithAdaptiveThrottle[T any](
err = err.(errRejected).inner

fallthrough
case IsRejectedError(err):
case at.checkIsRejected(err):
at.reject(priority, now)
default:
at.accept(priority, now)
Expand Down Expand Up @@ -438,17 +503,19 @@ var (
// DefaultClientSideRejectionError is the default error returned when the
// client rejects the request due to the adaptive throttle.
DefaultClientSideRejectionError = faults.Unavailable(time.Second)
// Deprecated: Use WithClientSideRejectionError to configure this per instance.
//
// ClientSideRejectionError is the error returned when the client rejects the
// request due to the adaptive throttle.
ClientSideRejectionError = DefaultClientSideRejectionError
// Deprecated: Use WithRejectedErrorFunc to configure this per instance.
//
// IsRejectedError is a global function that determines whether an error
// should be considered for the throttling. Any error that indicates that the
// backend is unhealthy should be considered for the throttling.
//
// This function can be overridden to customise the behaviour of Bulwark.
// For example, it is possible to use a whitelist of errors that should be
// accepted and reject the rest.
IsRejectedError = DefaultRejectedError
// Deprecated: Use WithNow to configure this per instance.
//
// Now returns the current time. It is a variable to allow tests to override
// the current time.
Now = time.Now
Expand Down
Loading