diff --git a/circuitbreaker.go b/circuitbreaker.go index 9299a07..8346d36 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -105,6 +105,7 @@ type Breaker struct { consecFailures int64 counts *window + flapper *Flapper lastFailure int64 halfOpens int64 nextBackOff time.Duration @@ -122,6 +123,7 @@ type Options struct { ShouldTrip TripFunc WindowTime time.Duration WindowBuckets int + FlapRate float64 } // NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc @@ -157,6 +159,7 @@ func NewBreakerWithOptions(options *Options) *Breaker { ShouldTrip: options.ShouldTrip, nextBackOff: options.BackOff.NextBackOff(), counts: newWindow(options.WindowTime, options.WindowBuckets), + flapper: NewFlapper(options.FlapRate), } } @@ -228,6 +231,7 @@ func (cb *Breaker) RemoveListener(listener chan ListenerEvent) bool { // Trip will trip the circuit breaker. After Trip() is called, Tripped() will // return true. func (cb *Breaker) Trip() { + cb.flapper.Record(1) atomic.StoreInt32(&cb.tripped, 1) now := cb.Clock.Now() atomic.StoreInt64(&cb.lastFailure, now.Unix()) @@ -237,6 +241,15 @@ func (cb *Breaker) Trip() { // Reset will reset the circuit breaker. After Reset() is called, Tripped() will // return false. func (cb *Breaker) Reset() { + cb.flapper.Record(0) + + if cb.flapper.Flapping() { + // Remain in half open and increase the backoff + atomic.StoreInt64(&cb.halfOpens, 0) + cb.nextBackOff = cb.BackOff.NextBackOff() + return + } + atomic.StoreInt32(&cb.broken, 0) atomic.StoreInt32(&cb.tripped, 0) atomic.StoreInt64(&cb.halfOpens, 0) diff --git a/flapper.go b/flapper.go new file mode 100644 index 0000000..88c0285 --- /dev/null +++ b/flapper.go @@ -0,0 +1,100 @@ +package circuit + +import ( + "sync" + "sync/atomic" +) + +type Flapper struct { + lock sync.Mutex + changes int32 + prevState int32 + flapRate float64 +} + +func NewFlapper(flapRate float64) *Flapper { + return &Flapper{ + flapRate: flapRate, + } +} + +func (f *Flapper) Record(state int32) { + if f.flapRate == 0.0 { + return + } + + f.lock.Lock() + f.changes <<= 1 + + if f.prevState != state { + f.changes |= 1 + } + + f.prevState = state + f.lock.Unlock() +} + +func (f *Flapper) Rate() float64 { + if f.flapRate == 0.0 { + return 0.0 + } + + changes := atomic.LoadInt32(&f.changes) + changes &= changeMask + + var firstBit, lastBit int32 + var firstIdx, lastIdx int + var count int + + for i := startBit; i > 0; i >>= 1 { + if i&changes > 0 { + if firstBit == 0 { + firstBit = i + firstIdx = count + } + + lastBit = i + lastIdx = count + } + + count++ + } + + if firstIdx == 0 && lastIdx == 0 { + return 0.0 + } + + if firstIdx == lastIdx { + return 1.0 / samples + } + + spread := lastIdx - firstIdx + add := valueBase / float64(spread) + + value := valueStart + multiplier := 1 + for i := firstBit >> 1; i >= lastBit; i >>= 1 { + if i&changes > 0 { + value += (0.8 + (add * float64(multiplier))) + } + multiplier++ + } + + return value / samples +} + +func (f *Flapper) Flapping() bool { + if f.flapRate == 0.0 { + return false + } + + return f.Rate() >= f.flapRate +} + +const ( + changeMask = 0xFFFFF + startBit = int32(0x80000) + valueBase = 0.4 + valueStart = 0.8 + samples = 20.0 +) diff --git a/flapper_test.go b/flapper_test.go new file mode 100644 index 0000000..86897da --- /dev/null +++ b/flapper_test.go @@ -0,0 +1,37 @@ +package circuit + +import "testing" + +func TestFlapRate(t *testing.T) { + f := NewFlapper(0.2) + + f.Record(0) // 0 + f.Record(0) // 1 + f.Record(1) // 2 + f.Record(0) // 3 + f.Record(1) // 4 + f.Record(1) // 5 + f.Record(1) // 6 + f.Record(1) // 7 + f.Record(0) // 8 + f.Record(0) // 9 + f.Record(0) // 10 + f.Record(1) // 11 + f.Record(1) // 12 + f.Record(1) // 13 + f.Record(1) // 14 + f.Record(0) // 15 + f.Record(0) // 16 + f.Record(0) // 17 + f.Record(1) // 18 + f.Record(1) // 19 + + rate := f.Rate() + if rate != 0.33875 { + t.Fatalf("expected 0.33875, got %f", rate) + } + + if !f.Flapping() { + t.Fatal("expected flapper to be flapping but it wasn't") + } +}