From 5d8e843f9cf52e51554d6693c37a1fac636a3c0f Mon Sep 17 00:00:00 2001 From: basgys Date: Sun, 15 Mar 2026 15:51:59 +0100 Subject: [PATCH] feat: make AdaptiveThrottle instances independently configurable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add three new AdaptiveThrottleOption constructors that let each instance override the behaviour previously only configurable via globals: WithRejectedErrorFunc — replaces global IsRejectedError WithClientSideRejectionError — replaces global ClientSideRejectionError WithNow — replaces global Now Each falls back to the corresponding global when not set, preserving full backward compatibility. The globals are now deprecated in favour of the per-instance options. The constructor also uses WithNow when seeding the initial windowed counters, so clock control is complete from the moment of construction. --- adaptive.go | 125 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 96 insertions(+), 29 deletions(-) diff --git a/adaptive.go b/adaptive.go index 39f1292..36bd421 100644 --- a/adaptive.go +++ b/adaptive.go @@ -46,6 +46,10 @@ type AdaptiveThrottle struct { requests []windowedCounter accepts []windowedCounter validate func(p Priority, priorities int) (Priority, error) + + isRejectedError func(error) bool + clientSideRejectionError error + now func() time.Time } // NewAdaptiveThrottle returns an AdaptiveThrottle. @@ -66,7 +70,12 @@ func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *Ada option.f(&opts) } - now := Now() + nowFn := opts.now + if nowFn == nil { + nowFn = Now + } + + now := nowFn() requests := make([]windowedCounter, priorities) accepts := make([]windowedCounter, priorities) for i := range requests { @@ -80,12 +89,15 @@ func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *Ada } return &AdaptiveThrottle{ - k: opts.k, - priorities: priorities, - requests: requests, - accepts: accepts, - minPerWindow: opts.minRate * opts.d.Seconds(), - validate: validate, + k: opts.k, + priorities: priorities, + requests: requests, + accepts: accepts, + minPerWindow: opts.minRate * opts.d.Seconds(), + validate: validate, + isRejectedError: opts.isRejectedError, + clientSideRejectionError: opts.clientSideRejectionError, + now: opts.now, } } @@ -111,7 +123,7 @@ func (t *AdaptiveThrottle) Throttle( return err } - now := Now() + now := t.nowTime() rejectionProbability := t.rejectionProbability(priority, now) if rand.Float64() < rejectionProbability { // As Bulwark starts rejecting requests, requests will continue to exceed @@ -122,16 +134,17 @@ func (t *AdaptiveThrottle) Throttle( // increase the probability of dropping new requests. t.reject(priority, now) + rejErr := t.rejectionError() if len(fallbackFn) > 0 { - return fallbackFn[0](ctx, ClientSideRejectionError, true) + return fallbackFn[0](ctx, rejErr, true) } - return ClientSideRejectionError + return rejErr } err = fn(ctx) - now = Now() + now = t.nowTime() switch { case err == nil: t.accept(priority, now) @@ -140,7 +153,7 @@ func (t *AdaptiveThrottle) Throttle( err = err.(errRejected).inner fallthrough - case IsRejectedError(err): + case t.checkIsRejected(err): t.reject(priority, now) default: t.accept(priority, now) @@ -197,6 +210,27 @@ func (t *AdaptiveThrottle) reject(p Priority, now time.Time) { t.m.Unlock() } +func (t *AdaptiveThrottle) nowTime() time.Time { + if t.now != nil { + return t.now() + } + return Now() +} + +func (t *AdaptiveThrottle) checkIsRejected(err error) bool { + if t.isRejectedError != nil { + return t.isRejectedError(err) + } + return IsRejectedError(err) +} + +func (t *AdaptiveThrottle) rejectionError() error { + if t.clientSideRejectionError != nil { + return t.clientSideRejectionError + } + return ClientSideRejectionError +} + // Additional options for the AdaptiveThrottle type. These options do not frequently need to be // tuned as the defaults work in a majority of cases. type AdaptiveThrottleOption struct { @@ -204,10 +238,13 @@ type AdaptiveThrottleOption struct { } type adaptiveThrottleOptions struct { - k float64 - minRate float64 - d time.Duration - validate func(p Priority, priorities int) (Priority, error) + k float64 + minRate float64 + d time.Duration + validate func(p Priority, priorities int) (Priority, error) + isRejectedError func(error) bool + clientSideRejectionError error + now func() time.Time } // WithAdaptiveThrottleRatio sets the ratio of the measured success rate and the rate that the throttle @@ -266,6 +303,33 @@ func WithPriorityValidator(fn func(p Priority, priorities int) (Priority, error) }} } +// WithRejectedErrorFunc sets the per-instance function that determines whether +// an error returned by the throttled function should be counted as a rejection. +// When set, this takes precedence over the global IsRejectedError. +func WithRejectedErrorFunc(fn func(error) bool) AdaptiveThrottleOption { + return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) { + opts.isRejectedError = fn + }} +} + +// WithClientSideRejectionError sets the per-instance error returned when the +// throttle rejects a request on the client side without forwarding it to the +// backend. When set, this takes precedence over the global ClientSideRejectionError. +func WithClientSideRejectionError(err error) AdaptiveThrottleOption { + return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) { + opts.clientSideRejectionError = err + }} +} + +// WithNow sets the per-instance time source. This is primarily useful in tests +// to control the clock without affecting other AdaptiveThrottle instances. +// When set, this takes precedence over the global Now. +func WithNow(fn func() time.Time) AdaptiveThrottleOption { + return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) { + opts.now = fn + }} +} + func Throttle[T any]( ctx context.Context, at *AdaptiveThrottle, @@ -278,7 +342,7 @@ func Throttle[T any]( return res, err } - now := Now() + now := at.nowTime() rejectionProbability := at.rejectionProbability(priority, now) if rand.Float64() < rejectionProbability { // As Bulwark starts rejecting requests, requests will continue to exceed @@ -290,16 +354,17 @@ func Throttle[T any]( at.reject(priority, now) var zero T + rejErr := at.rejectionError() if len(fallbackFn) > 0 { - return fallbackFn[0](ctx, ClientSideRejectionError, true) + return fallbackFn[0](ctx, rejErr, true) } - return zero, ClientSideRejectionError + return zero, rejErr } res, err = throttledFn(ctx) - now = Now() + now = at.nowTime() switch { case err == nil: at.accept(priority, now) @@ -308,7 +373,7 @@ func Throttle[T any]( err = err.(errRejected).inner fallthrough - case IsRejectedError(err): + case at.checkIsRejected(err): at.reject(priority, now) default: at.accept(priority, now) @@ -340,7 +405,7 @@ func WithAdaptiveThrottle[T any]( return res, err } - now := Now() + now := at.nowTime() rejectionProbability := at.rejectionProbability(priority, now) if rand.Float64() < rejectionProbability { // As Bulwark starts rejecting requests, requests will continue to exceed @@ -352,12 +417,12 @@ func WithAdaptiveThrottle[T any]( at.reject(priority, now) var zero T - return zero, ClientSideRejectionError + return zero, at.rejectionError() } res, err = throttledFn() - now = Now() + now = at.nowTime() switch { case err == nil: at.accept(priority, now) @@ -366,7 +431,7 @@ func WithAdaptiveThrottle[T any]( err = err.(errRejected).inner fallthrough - case IsRejectedError(err): + case at.checkIsRejected(err): at.reject(priority, now) default: at.accept(priority, now) @@ -438,17 +503,19 @@ var ( // DefaultClientSideRejectionError is the default error returned when the // client rejects the request due to the adaptive throttle. DefaultClientSideRejectionError = faults.Unavailable(time.Second) + // Deprecated: Use WithClientSideRejectionError to configure this per instance. + // // ClientSideRejectionError is the error returned when the client rejects the // request due to the adaptive throttle. ClientSideRejectionError = DefaultClientSideRejectionError + // Deprecated: Use WithRejectedErrorFunc to configure this per instance. + // // IsRejectedError is a global function that determines whether an error // should be considered for the throttling. Any error that indicates that the // backend is unhealthy should be considered for the throttling. - // - // This function can be overridden to customise the behaviour of Bulwark. - // For example, it is possible to use a whitelist of errors that should be - // accepted and reject the rest. IsRejectedError = DefaultRejectedError + // Deprecated: Use WithNow to configure this per instance. + // // Now returns the current time. It is a variable to allow tests to override // the current time. Now = time.Now