Skip to content
82 changes: 47 additions & 35 deletions circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,12 @@ type Breaker struct {

_ [4]byte // pad to fix golang issue #599
consecFailures int64
lastFailure int64 // stored as nanoseconds since the Unix epoch
halfOpens int64
counts *window
nextBackOff time.Duration
tripped int32
broken int32
eventReceivers []chan BreakerEvent
listeners []chan ListenerEvent
nextBackOff time.Time
backoffLock sync.Mutex
}

Expand All @@ -124,6 +122,10 @@ type Options struct {
ShouldTrip TripFunc
WindowTime time.Duration
WindowBuckets int
// SmoothLimited avoid the breaker is opened by a suddenly fault spike,
// e.g. network is interrupted quick flashing and recovery immediately.
// Keep this field zore is default disable this feature.
SmoothLimited int
}

// NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc
Expand Down Expand Up @@ -154,11 +156,11 @@ func NewBreakerWithOptions(options *Options) *Breaker {
}

return &Breaker{
BackOff: options.BackOff,
Clock: options.Clock,
ShouldTrip: options.ShouldTrip,
nextBackOff: options.BackOff.NextBackOff(),
counts: newWindow(options.WindowTime, options.WindowBuckets),
BackOff: options.BackOff,
Clock: options.Clock,
ShouldTrip: options.ShouldTrip,
counts: newWindow(options.WindowTime, options.WindowBuckets,
options.Clock, options.SmoothLimited),
}
}

Expand Down Expand Up @@ -234,22 +236,34 @@ func (cb *Breaker) RemoveListener(listener chan ListenerEvent) bool {
return false
}

func (cb *Breaker) nextBackOffLocked() {
if o := cb.BackOff.NextBackOff(); o != backoff.Stop {
cb.nextBackOff = cb.Clock.Now().Add(o)
} else {
cb.nextBackOff = time.Time{}
}
}

// Trip will trip the circuit breaker. After Trip() is called, Tripped() will
// return true.
func (cb *Breaker) Trip() {
// should happen before Tripped()
if !cb.Tripped() {
cb.backoffLock.Lock()
cb.BackOff.Reset()
cb.nextBackOffLocked()
cb.backoffLock.Unlock()
}
atomic.StoreInt32(&cb.tripped, 1)
now := cb.Clock.Now()
atomic.StoreInt64(&cb.lastFailure, now.UnixNano())
cb.sendEvent(BreakerTripped)
}

// Reset will reset the circuit breaker. After Reset() is called, Tripped() will
// return false.
func (cb *Breaker) Reset() {
cb.ResetCounters()
atomic.StoreInt32(&cb.broken, 0)
atomic.StoreInt32(&cb.tripped, 0)
atomic.StoreInt64(&cb.halfOpens, 0)
cb.ResetCounters()
cb.sendEvent(BreakerReset)
}

Expand Down Expand Up @@ -286,30 +300,27 @@ func (cb *Breaker) Successes() int64 {
return cb.counts.Successes()
}

// Total returns the number of total records for this circuit breaker.
func (cb *Breaker) Total() int64 {
return cb.counts.Total()
}

// Fail is used to indicate a failure condition the Breaker should record. It will
// increment the failure counters and store the time of the last failure. If the
// breaker has a TripFunc it will be called, tripping the breaker if necessary.
func (cb *Breaker) Fail() {
cb.counts.Fail()
atomic.AddInt64(&cb.consecFailures, 1)
now := cb.Clock.Now()
atomic.StoreInt64(&cb.lastFailure, now.UnixNano())
cb.sendEvent(BreakerFail)
if cb.ShouldTrip != nil && cb.ShouldTrip(cb) {
if !cb.Tripped() && cb.ShouldTrip != nil && cb.ShouldTrip(cb) {
cb.Trip()
}
}

// Success is used to indicate a success condition the Breaker should record. If
// the success was triggered by a retry attempt, the breaker will be Reset().
func (cb *Breaker) Success() {
cb.backoffLock.Lock()
cb.BackOff.Reset()
cb.nextBackOff = cb.BackOff.NextBackOff()
cb.backoffLock.Unlock()

state := cb.state()
if state != closed {
if cb.Tripped() && atomic.LoadInt32(&cb.broken) != 1 {
cb.Reset()
}
atomic.StoreInt64(&cb.consecFailures, 0)
Expand All @@ -326,12 +337,15 @@ func (cb *Breaker) ErrorRate() float64 {
// It will be ready if the breaker is in a reset state, or if it is time to retry
// the call for auto resetting.
func (cb *Breaker) Ready() bool {
state := cb.state()
return cb.state(false) != open
}

func (cb *Breaker) ready() bool {
state := cb.state(true)
if state == halfopen {
atomic.StoreInt64(&cb.halfOpens, 0)
cb.sendEvent(BreakerReady)
}
return state == closed || state == halfopen
return state != open
}

// Call wraps a function the Breaker will protect. A failure is recorded
Expand All @@ -348,7 +362,7 @@ func (cb *Breaker) CallContext(
) error {
var err error

if !cb.Ready() {
if !cb.ready() {
return ErrBreakerOpen
}

Expand Down Expand Up @@ -384,25 +398,23 @@ func (cb *Breaker) CallContext(
// closed - the circuit is in a reset state and is operational
// open - the circuit is in a tripped state
// halfopen - the circuit is in a tripped state but the reset timeout has passed
func (cb *Breaker) state() state {
func (cb *Breaker) state(once bool) state {
tripped := cb.Tripped()
if tripped {
if atomic.LoadInt32(&cb.broken) == 1 {
return open
}

last := atomic.LoadInt64(&cb.lastFailure)
since := cb.Clock.Now().Sub(time.Unix(0, last))
now := cb.Clock.Now()

cb.backoffLock.Lock()
defer cb.backoffLock.Unlock()

if cb.nextBackOff != backoff.Stop && since > cb.nextBackOff {
if atomic.CompareAndSwapInt64(&cb.halfOpens, 0, 1) {
cb.nextBackOff = cb.BackOff.NextBackOff()
return halfopen
if !cb.nextBackOff.IsZero() && now.After(cb.nextBackOff) {
if once {
cb.nextBackOffLocked()
}
return open
return halfopen
}
return open
}
Expand Down Expand Up @@ -454,7 +466,7 @@ func ConsecutiveTripFunc(threshold int64) TripFunc {
// This TripFunc will not trip until there have been at least minSamples events.
func RateTripFunc(rate float64, minSamples int64) TripFunc {
return func(cb *Breaker) bool {
samples := cb.Failures() + cb.Successes()
samples := cb.Total()
return samples >= minSamples && cb.ErrorRate() >= rate
}
}
112 changes: 100 additions & 12 deletions circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ func TestBreakerEvents(t *testing.T) {
t.Fatalf("expected to receive a trip event, got %d", e)
}

c.Add(cb.nextBackOff + 1)
cb.Ready()
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)
cb.Call(func() error { return context.Canceled }, 0)
if e := <-events; e != BreakerReady {
t.Fatalf("expected to receive a breaker ready event, got %d", e)
}
if e := <-events; e != BreakerFail {
t.Fatalf("expected to receive a fail event, got %d", e)
}

cb.Reset()
if e := <-events; e != BreakerReset {
Expand All @@ -114,11 +117,14 @@ func TestAddRemoveListener(t *testing.T) {
t.Fatalf("expected to receive a trip event, got %v", e)
}

c.Add(cb.nextBackOff + 1)
cb.Ready()
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)
cb.Call(func() error { return context.Canceled }, 0)
if e := <-events; e.Event != BreakerReady {
t.Fatalf("expected to receive a breaker ready event, got %v", e)
}
if e := <-events; e.Event != BreakerFail {
t.Fatalf("expected to receive a fail event, got %v", e)
}

cb.Reset()
if e := <-events; e.Event != BreakerReset {
Expand Down Expand Up @@ -153,13 +159,13 @@ func TestTrippableBreakerState(t *testing.T) {
if cb.Ready() {
t.Fatal("expected breaker to not be ready")
}
c.Add(cb.nextBackOff + 1)
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)
if !cb.Ready() {
t.Fatal("expected breaker to be ready after reset timeout")
}

cb.Fail()
c.Add(cb.nextBackOff + 1)
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)
if !cb.Ready() {
t.Fatal("expected breaker to be ready after reset timeout, post failure")
}
Expand All @@ -170,15 +176,15 @@ func TestTrippableBreakerManualBreak(t *testing.T) {
cb := NewBreaker()
cb.Clock = c
cb.Break()
c.Add(cb.nextBackOff + 1)
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)

if cb.Ready() {
t.Fatal("expected breaker to still be tripped")
}

cb.Reset()
cb.Trip()
c.Add(cb.nextBackOff + 1)
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)
if !cb.Ready() {
t.Fatal("expected breaker to be ready")
}
Expand Down Expand Up @@ -310,7 +316,7 @@ func TestThresholdBreakerResets(t *testing.T) {
t.Fatal("Expected cb to return an error")
}

c.Add(cb.nextBackOff + 1)
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)
for i := 0; i < 4; i++ {
err = cb.Call(circuit, 0)
if err != nil {
Expand Down Expand Up @@ -420,7 +426,7 @@ func TestRateBreakerResets(t *testing.T) {
t.Fatal("Expected cb to return open open breaker error (open breaker)")
}

c.Add(cb.nextBackOff + 1)
c.Add(cb.nextBackOff.Sub(c.Now()) + 1)
err = cb.Call(circuit, 0)
if err != nil {
t.Fatal("Expected cb to be successful")
Expand Down Expand Up @@ -495,11 +501,49 @@ func TestNeverRetryAfterBackoffStops(t *testing.T) {
}
}

// TestPartialSecondBackoff ensures that the breaker event less than nextBackoff value
// time after tripping the breaker isn't allowed.
func TestHalfOpen(t *testing.T) {
c := clock.NewMock()
cb := NewBreakerWithOptions(&Options{
BackOff: backoff.NewConstantBackOff(500 * time.Millisecond),
Clock: c,
})

// Set the time to 0.5 seconds after the epoch, then trip the breaker.
c.Add(500 * time.Millisecond)
cb.Trip()

var (
wg sync.WaitGroup
halfopened int64
)

c.Add(600 * time.Millisecond)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
cb.Call(func() error {
atomic.AddInt64(&halfopened, 1)
return context.Canceled
}, 0)
wg.Done()
}()
}

wg.Wait()
if n := atomic.LoadInt64(&halfopened); n != 1 {
t.Fatalf("only allow one call passed when it enter halfopen, got %v", n)
}
}

// TestPartialSecondBackoff ensures that the breaker event less than nextBackoff value
// time after tripping the breaker isn't allowed.
func TestPartialSecondBackoff(t *testing.T) {
c := clock.NewMock()
cb := NewBreaker()
cb := NewBreakerWithOptions(&Options{
BackOff: backoff.NewConstantBackOff(500 * time.Millisecond),
})
cb.Clock = c

// Set the time to 0.5 seconds after the epoch, then trip the breaker.
Expand All @@ -509,8 +553,8 @@ func TestPartialSecondBackoff(t *testing.T) {
// Move forward 100 milliseconds in time and ensure that the backoff time
// is set to a larger number than the clock advanced.
c.Add(100 * time.Millisecond)
cb.nextBackOff = 500 * time.Millisecond
if cb.Ready() {
fmt.Println(cb.nextBackOff, c.Now())
t.Fatalf("expected breaker not to be ready after less time than nextBackoff had passed")
}

Expand Down Expand Up @@ -584,3 +628,47 @@ func TestNoDeadlockOnChannelSends(t *testing.T) {
}
wg.Wait()
}

func BenchmarkParallelCall(b *testing.B) {
cb := NewRateBreaker(0.5, 1000)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
cb.Call(func() error { return nil }, 0)
}
})
}

func BenchmarkParallelCallFailed(b *testing.B) {
cb := NewRateBreaker(0.5, 1<<62)
err := fmt.Errorf("BenchmarkParallelCallFailed:%v", b.N)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
cb.Call(func() error { return err }, 0)
}
})
}

func BenchmarkParallelCallSmooth(b *testing.B) {
cb := NewBreakerWithOptions(&Options{
ShouldTrip: RateTripFunc(0.5, 1000),
SmoothLimited: 256,
})
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
cb.Call(func() error { return nil }, 0)
}
})
}

func BenchmarkParallelCallFailedSmooth(b *testing.B) {
cb := NewBreakerWithOptions(&Options{
ShouldTrip: RateTripFunc(0.5, 1<<62),
SmoothLimited: 256,
})
err := fmt.Errorf("BenchmarkParallelCallFailed:%v", b.N)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
cb.Call(func() error { return err }, 0)
}
})
}
Loading