From 5f348335d20d7e9cb32befb8a34f8178e74efe55 Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 2 Dec 2015 20:27:51 -0500 Subject: [PATCH 1/4] elementary flapping detector --- flapper.go | 69 +++++++++++++++++++++++++++++++++++++++++++++++++ flapper_test.go | 37 ++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 flapper.go create mode 100644 flapper_test.go diff --git a/flapper.go b/flapper.go new file mode 100644 index 0000000..5cd34f9 --- /dev/null +++ b/flapper.go @@ -0,0 +1,69 @@ +package circuit + +type Flapper struct { + stateChanges []int32 + prevState int32 + samples int + flapRate float64 +} + +func NewFlapper(samples int, rate float64) *Flapper { + return &Flapper{ + stateChanges: make([]int32, samples), + samples: samples, + flapRate: rate, + } +} + +func (f *Flapper) Record(state int32) { + s := int32(0) + if f.prevState != state { + s = 1 + } + + f.prevState = state + + copy(f.stateChanges[0:f.samples-1], f.stateChanges[1:f.samples]) + f.stateChanges[f.samples-1] = s +} + +func (f *Flapper) Rate() float64 { + first := 0 + last := 0 + + for i := 0; i < f.samples; i++ { + if f.stateChanges[i] == 1 { + first = i + break + } + } + + for i := f.samples - 1; i > 0; i-- { + if f.stateChanges[i] == 1 { + last = i + break + } + } + + spread := last - first + add := 0.4 / float64(spread) + + if first == 0 && last == 0 { + return 0.0 + } + + value := 0.8 + multiplier := 1 + for i := first + 1; i <= last; i++ { + if f.stateChanges[i] == 1 { + value += (0.8 + (add * float64(multiplier))) + } + multiplier++ + } + + return value / float64(f.samples) +} + +func (f *Flapper) Flapping() bool { + return f.Rate() >= f.flapRate +} diff --git a/flapper_test.go b/flapper_test.go new file mode 100644 index 0000000..d66465f --- /dev/null +++ b/flapper_test.go @@ -0,0 +1,37 @@ +package circuit + +import "testing" + +func TestFlapRate(t *testing.T) { + f := NewFlapper(20, 0.2) + + f.Record(0) // 0 + f.Record(0) // 1 + f.Record(1) // 2 + f.Record(0) // 3 + f.Record(1) // 4 + f.Record(1) // 5 + f.Record(1) // 6 + f.Record(1) // 7 + f.Record(0) // 8 + f.Record(0) // 9 + f.Record(0) // 10 + f.Record(1) // 11 + f.Record(1) // 12 + f.Record(1) // 13 + f.Record(1) // 14 + f.Record(0) // 15 + f.Record(0) // 16 + f.Record(0) // 17 + f.Record(1) // 18 + f.Record(1) // 19 + + rate := f.Rate() + if rate != 0.33875 { + t.Fatalf("expected 0.33875, got %f", rate) + } + + if !f.Flapping() { + t.Fatal("expected flapper to be flapping but it wasn't") + } +} From 36789fc3b7e4d5497f57530fed78217c621afb08 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 3 Dec 2015 20:24:51 -0500 Subject: [PATCH 2/4] faster, concurrent safe flapper --- flapper.go | 85 ++++++++++++++++++++++++++++++------------------- flapper_test.go | 2 +- 2 files changed, 53 insertions(+), 34 deletions(-) diff --git a/flapper.go b/flapper.go index 5cd34f9..5737b84 100644 --- a/flapper.go +++ b/flapper.go @@ -1,69 +1,88 @@ package circuit +import ( + "sync" + "sync/atomic" +) + type Flapper struct { - stateChanges []int32 - prevState int32 - samples int - flapRate float64 + lock sync.Mutex + changes int32 + prevState int32 + flapRate float64 } -func NewFlapper(samples int, rate float64) *Flapper { +func NewFlapper(flapRate float64) *Flapper { return &Flapper{ - stateChanges: make([]int32, samples), - samples: samples, - flapRate: rate, + flapRate: flapRate, } } func (f *Flapper) Record(state int32) { - s := int32(0) + f.lock.Lock() + f.changes <<= 1 + if f.prevState != state { - s = 1 + f.changes |= 1 } f.prevState = state - - copy(f.stateChanges[0:f.samples-1], f.stateChanges[1:f.samples]) - f.stateChanges[f.samples-1] = s + f.lock.Unlock() } func (f *Flapper) Rate() float64 { - first := 0 - last := 0 + changes := atomic.LoadInt32(&f.changes) + changes &= changeMask - for i := 0; i < f.samples; i++ { - if f.stateChanges[i] == 1 { - first = i - break - } - } + var firstBit, lastBit int32 + var firstIdx, lastIdx int + var count int + + for i := startBit; i > 0; i >>= 1 { + if i&changes > 0 { + if firstBit == 0 { + firstBit = i + firstIdx = count + } - for i := f.samples - 1; i > 0; i-- { - if f.stateChanges[i] == 1 { - last = i - break + lastBit = i + lastIdx = count } - } - spread := last - first - add := 0.4 / float64(spread) + count++ + } - if first == 0 && last == 0 { + if firstIdx == 0 && lastIdx == 0 { return 0.0 } - value := 0.8 + if firstIdx == lastIdx { + return 1.0 / samples + } + + spread := lastIdx - firstIdx + add := valueBase / float64(spread) + + value := valueStart multiplier := 1 - for i := first + 1; i <= last; i++ { - if f.stateChanges[i] == 1 { + for i := firstBit >> 1; i >= lastBit; i >>= 1 { + if i&changes > 0 { value += (0.8 + (add * float64(multiplier))) } multiplier++ } - return value / float64(f.samples) + return value / samples } func (f *Flapper) Flapping() bool { return f.Rate() >= f.flapRate } + +const ( + changeMask = 0xFFFFF + startBit = int32(0x80000) + valueBase = 0.4 + valueStart = 0.8 + samples = 20.0 +) diff --git a/flapper_test.go b/flapper_test.go index d66465f..86897da 100644 --- a/flapper_test.go +++ b/flapper_test.go @@ -3,7 +3,7 @@ package circuit import "testing" func TestFlapRate(t *testing.T) { - f := NewFlapper(20, 0.2) + f := NewFlapper(0.2) f.Record(0) // 0 f.Record(0) // 1 From dc7fa46537371f4bde33851a00d6077b266f3b8a Mon Sep 17 00:00:00 2001 From: rubyist Date: Sun, 6 Dec 2015 11:13:48 -0500 Subject: [PATCH 3/4] flap rate of 0 is essentialy a noop flapper --- flapper.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flapper.go b/flapper.go index 5737b84..88c0285 100644 --- a/flapper.go +++ b/flapper.go @@ -19,6 +19,10 @@ func NewFlapper(flapRate float64) *Flapper { } func (f *Flapper) Record(state int32) { + if f.flapRate == 0.0 { + return + } + f.lock.Lock() f.changes <<= 1 @@ -31,6 +35,10 @@ func (f *Flapper) Record(state int32) { } func (f *Flapper) Rate() float64 { + if f.flapRate == 0.0 { + return 0.0 + } + changes := atomic.LoadInt32(&f.changes) changes &= changeMask @@ -76,6 +84,10 @@ func (f *Flapper) Rate() float64 { } func (f *Flapper) Flapping() bool { + if f.flapRate == 0.0 { + return false + } + return f.Rate() >= f.flapRate } From 9c76848cd45ea1c5d058f68c5ff0d3181c5b5ca9 Mon Sep 17 00:00:00 2001 From: rubyist Date: Sun, 6 Dec 2015 11:15:02 -0500 Subject: [PATCH 4/4] give breakers a Flapper --- circuitbreaker.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/circuitbreaker.go b/circuitbreaker.go index 68b3d2a..6bd8103 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -96,6 +96,7 @@ type Breaker struct { consecFailures int64 counts *window + flapper *Flapper _lastFailure unsafe.Pointer halfOpens int64 nextBackOff time.Duration @@ -110,6 +111,7 @@ type Options struct { ShouldTrip TripFunc WindowTime time.Duration WindowBuckets int + FlapRate float64 } // NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc @@ -144,6 +146,7 @@ func NewBreakerWithOptions(options *Options) *Breaker { ShouldTrip: options.ShouldTrip, nextBackOff: options.BackOff.NextBackOff(), counts: newWindow(options.WindowTime, options.WindowBuckets), + flapper: NewFlapper(options.FlapRate), } } @@ -196,6 +199,7 @@ func (cb *Breaker) Subscribe() <-chan BreakerEvent { // Trip will trip the circuit breaker. After Trip() is called, Tripped() will // return true. func (cb *Breaker) Trip() { + cb.flapper.Record(1) atomic.StoreInt32(&cb.tripped, 1) now := cb.Clock.Now() atomic.StorePointer(&cb._lastFailure, unsafe.Pointer(&now)) @@ -205,6 +209,15 @@ func (cb *Breaker) Trip() { // Reset will reset the circuit breaker. After Reset() is called, Tripped() will // return false. func (cb *Breaker) Reset() { + cb.flapper.Record(0) + + if cb.flapper.Flapping() { + // Remain in half open and increase the backoff + atomic.StoreInt64(&cb.halfOpens, 0) + cb.nextBackOff = cb.BackOff.NextBackOff() + return + } + atomic.StoreInt32(&cb.broken, 0) atomic.StoreInt32(&cb.tripped, 0) atomic.StoreInt64(&cb.halfOpens, 0)