From 197b088f7af83fc089f6d5b14bcf4bf0637b6aff Mon Sep 17 00:00:00 2001 From: rubyist Date: Fri, 29 Jul 2016 11:46:26 -0400 Subject: [PATCH 1/4] use a goroutine to remove window's bucket lock Replace the ring buffer with a slice of buckets. Use a goroutine and a ticker to perodically increment the current bucket index. This allows bucket access without requiring a mutex and removes the reset bucket math. --- circuitbreaker.go | 5 +- window.go | 132 +++++++++++++++++++--------------------------- window_test.go | 23 ++++---- 3 files changed, 69 insertions(+), 91 deletions(-) diff --git a/circuitbreaker.go b/circuitbreaker.go index 7df5667..5a600f9 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -150,12 +150,15 @@ func NewBreakerWithOptions(options *Options) *Breaker { options.WindowBuckets = DefaultWindowBuckets } + win := newWindow(options.WindowTime, options.WindowBuckets, options.Clock) + win.Run() + return &Breaker{ BackOff: options.BackOff, Clock: options.Clock, ShouldTrip: options.ShouldTrip, nextBackOff: options.BackOff.NextBackOff(), - counts: newWindow(options.WindowTime, options.WindowBuckets), + counts: win, } } diff --git a/window.go b/window.go index ab83187..5b24d94 100644 --- a/window.go +++ b/window.go @@ -1,8 +1,7 @@ package circuit import ( - "container/ring" - "sync" + "sync/atomic" "time" "github.com/facebookgo/clock" @@ -43,75 +42,82 @@ func (b *bucket) Success() { // advance to the next bucket, reseting its counts. This allows the keeping of // rolling statistics on the counts. type window struct { - buckets *ring.Ring - bucketTime time.Duration - bucketLock sync.RWMutex - lastAccess time.Time + buckets []bucket + bucketIdx int64 clock clock.Clock + stop chan struct{} + bucketTime time.Duration } // newWindow creates a new window. windowTime is the time covering the entire // window. windowBuckets is the number of buckets the window is divided into. // An example: a 10 second window with 10 buckets will have 10 buckets covering // 1 second each. -func newWindow(windowTime time.Duration, windowBuckets int) *window { - buckets := ring.New(windowBuckets) - for i := 0; i < buckets.Len(); i++ { - buckets.Value = &bucket{} - buckets = buckets.Next() +func newWindow(windowTime time.Duration, windowBuckets int, clock clock.Clock) *window { + w := &window{ + buckets: make([]bucket, windowBuckets), + bucketTime: time.Duration(windowTime.Nanoseconds() / int64(windowBuckets)), + clock: clock, + stop: make(chan struct{}), } - clock := clock.New() + return w +} - bucketTime := time.Duration(windowTime.Nanoseconds() / int64(windowBuckets)) - return &window{ - buckets: buckets, - bucketTime: bucketTime, - clock: clock, - lastAccess: clock.Now(), - } +// Run starts the goroutine that increments the bucket index and sets up the +// next bucket. +func (w *window) Run() { + c := make(chan struct{}) + go func() { + close(c) + ticker := w.clock.Ticker(w.bucketTime) + for { + select { + case <-ticker.C: + idx := atomic.LoadInt64(&w.bucketIdx) + idx = (idx + 1) % int64(len(w.buckets)) + w.buckets[idx].Reset() + atomic.StoreInt64(&w.bucketIdx, idx) + case <-w.stop: + return + } + } + }() + <-c +} + +// Stop stops the index incrementing goroutine. +func (w *window) Stop() { + w.stop <- struct{}{} } // Fail records a failure in the current bucket. func (w *window) Fail() { - w.bucketLock.Lock() - b := w.getLatestBucket() - b.Fail() - w.bucketLock.Unlock() + idx := atomic.LoadInt64(&w.bucketIdx) + w.buckets[idx].Fail() } // Success records a success in the current bucket. func (w *window) Success() { - w.bucketLock.Lock() - b := w.getLatestBucket() - b.Success() - w.bucketLock.Unlock() + idx := atomic.LoadInt64(&w.bucketIdx) + w.buckets[idx].Success() } // Failures returns the total number of failures recorded in all buckets. func (w *window) Failures() int64 { - w.bucketLock.RLock() - var failures int64 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) - failures += b.failure - }) - - w.bucketLock.RUnlock() + for i := 0; i < len(w.buckets); i++ { + failures += w.buckets[i].failure + } return failures } // Successes returns the total number of successes recorded in all buckets. func (w *window) Successes() int64 { - w.bucketLock.RLock() - var successes int64 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) - successes += b.success - }) - w.bucketLock.RUnlock() + for i := 0; i < len(w.buckets); i++ { + successes += w.buckets[i].success + } return successes } @@ -121,13 +127,11 @@ func (w *window) ErrorRate() float64 { var total int64 var failures int64 - w.bucketLock.RLock() - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) + for i := 0; i < len(w.buckets); i++ { + b := w.buckets[i] total += b.failure + b.success failures += b.failure - }) - w.bucketLock.RUnlock() + } if total == 0 { return 0.0 @@ -138,37 +142,7 @@ func (w *window) ErrorRate() float64 { // Reset resets the count of all buckets. func (w *window) Reset() { - w.bucketLock.Lock() - - w.buckets.Do(func(x interface{}) { - x.(*bucket).Reset() - }) - w.bucketLock.Unlock() -} - -// getLatestBucket returns the current bucket. If the bucket time has elapsed -// it will move to the next bucket, resetting its counts and updating the last -// access time before returning it. getLatestBucket assumes that the caller has -// locked the bucketLock -func (w *window) getLatestBucket() *bucket { - var b *bucket - b = w.buckets.Value.(*bucket) - elapsed := w.clock.Now().Sub(w.lastAccess) - - if elapsed > w.bucketTime { - // Reset the buckets between now and number of buckets ago. If - // that is more that the existing buckets, reset all. - for i := 0; i < w.buckets.Len(); i++ { - w.buckets = w.buckets.Next() - b = w.buckets.Value.(*bucket) - b.Reset() - elapsed = time.Duration(int64(elapsed) - int64(w.bucketTime)) - if elapsed < w.bucketTime { - // Done resetting buckets. - break - } - } - w.lastAccess = w.clock.Now() + for i := 0; i < len(w.buckets); i++ { + w.buckets[i].Reset() } - return b } diff --git a/window_test.go b/window_test.go index 7b60fbf..25904f1 100644 --- a/window_test.go +++ b/window_test.go @@ -8,7 +8,7 @@ import ( ) func TestWindowCounts(t *testing.T) { - w := newWindow(time.Millisecond*10, 2) + w := newWindow(time.Millisecond*10, 2, clock.NewMock()) w.Fail() w.Fail() w.Success() @@ -38,35 +38,36 @@ func TestWindowCounts(t *testing.T) { func TestWindowSlides(t *testing.T) { c := clock.NewMock() - w := newWindow(time.Millisecond*10, 2) - w.clock = c - w.lastAccess = c.Now() + w := newWindow(time.Millisecond*10, 2, c) + w.Run() w.Fail() - c.Add(time.Millisecond * 6) + c.Add(time.Millisecond * 5) w.Fail() + w.Stop() counts := 0 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) + for _, b := range w.buckets { if b.failure > 0 { counts++ } - }) + } if counts != 2 { t.Fatalf("expected 2 buckets to have failures, got %d", counts) } + w.Run() c.Add(time.Millisecond * 15) w.Success() + w.Stop() + counts = 0 - w.buckets.Do(func(x interface{}) { - b := x.(*bucket) + for _, b := range w.buckets { if b.failure > 0 { counts++ } - }) + } if counts != 0 { t.Fatalf("expected 0 buckets to have failures, got %d", counts) From 57c8d8759046966c662d2aa44f005e0b257e5905 Mon Sep 17 00:00:00 2001 From: rubyist Date: Fri, 29 Jul 2016 12:08:07 -0400 Subject: [PATCH 2/4] make buckets atomic --- window.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/window.go b/window.go index 5b24d94..4a102e9 100644 --- a/window.go +++ b/window.go @@ -23,18 +23,26 @@ type bucket struct { // Reset resets the counts to 0 func (b *bucket) Reset() { - b.failure = 0 - b.success = 0 + atomic.StoreInt64(&b.failure, 0) + atomic.StoreInt64(&b.success, 0) } // Fail increments the failure count func (b *bucket) Fail() { - b.failure++ + atomic.AddInt64(&b.failure, 1) } // Sucecss increments the success count func (b *bucket) Success() { - b.success++ + atomic.AddInt64(&b.success, 1) +} + +func (b *bucket) Failures() int64 { + return atomic.LoadInt64(&b.failure) +} + +func (b *bucket) Successes() int64 { + return atomic.LoadInt64(&b.success) } // window maintains a ring of buckets and increments the failure and success @@ -107,7 +115,7 @@ func (w *window) Success() { func (w *window) Failures() int64 { var failures int64 for i := 0; i < len(w.buckets); i++ { - failures += w.buckets[i].failure + failures += w.buckets[i].Failures() } return failures } @@ -116,7 +124,7 @@ func (w *window) Failures() int64 { func (w *window) Successes() int64 { var successes int64 for i := 0; i < len(w.buckets); i++ { - successes += w.buckets[i].success + successes += w.buckets[i].Successes() } return successes } @@ -128,9 +136,9 @@ func (w *window) ErrorRate() float64 { var failures int64 for i := 0; i < len(w.buckets); i++ { - b := w.buckets[i] - total += b.failure + b.success - failures += b.failure + b := &w.buckets[i] + total += b.Failures() + b.Successes() + failures += b.Failures() } if total == 0 { From 376fd417966c5f430717e3d3f566a08c6a25f1ae Mon Sep 17 00:00:00 2001 From: rubyist Date: Fri, 29 Jul 2016 12:13:53 -0400 Subject: [PATCH 3/4] bucketIdx at the top of the struct Pretty sure there will be problems on 32 bit archs if it isn't --- window.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/window.go b/window.go index 4a102e9..20c333b 100644 --- a/window.go +++ b/window.go @@ -50,8 +50,8 @@ func (b *bucket) Successes() int64 { // advance to the next bucket, reseting its counts. This allows the keeping of // rolling statistics on the counts. type window struct { - buckets []bucket bucketIdx int64 + buckets []bucket clock clock.Clock stop chan struct{} bucketTime time.Duration @@ -62,14 +62,12 @@ type window struct { // An example: a 10 second window with 10 buckets will have 10 buckets covering // 1 second each. func newWindow(windowTime time.Duration, windowBuckets int, clock clock.Clock) *window { - w := &window{ + return &window{ buckets: make([]bucket, windowBuckets), bucketTime: time.Duration(windowTime.Nanoseconds() / int64(windowBuckets)), clock: clock, stop: make(chan struct{}), } - - return w } // Run starts the goroutine that increments the bucket index and sets up the From de32d96eb3b1a4dccd42141acee9765707ebf94d Mon Sep 17 00:00:00 2001 From: rubyist Date: Fri, 29 Jul 2016 12:16:52 -0400 Subject: [PATCH 4/4] don't run/stop unless we're stopped/running This channel was a hack for the tests, let the tests do cheap hacks --- window.go | 12 +++++++++--- window_test.go | 3 +++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/window.go b/window.go index 20c333b..2f7b519 100644 --- a/window.go +++ b/window.go @@ -55,6 +55,7 @@ type window struct { clock clock.Clock stop chan struct{} bucketTime time.Duration + isRunning bool } // newWindow creates a new window. windowTime is the time covering the entire @@ -73,9 +74,11 @@ func newWindow(windowTime time.Duration, windowBuckets int, clock clock.Clock) * // Run starts the goroutine that increments the bucket index and sets up the // next bucket. func (w *window) Run() { - c := make(chan struct{}) + if w.isRunning { + return + } + go func() { - close(c) ticker := w.clock.Ticker(w.bucketTime) for { select { @@ -89,11 +92,14 @@ func (w *window) Run() { } } }() - <-c } // Stop stops the index incrementing goroutine. func (w *window) Stop() { + if !w.isRunning { + return + } + w.stop <- struct{}{} } diff --git a/window_test.go b/window_test.go index 25904f1..ef861cc 100644 --- a/window_test.go +++ b/window_test.go @@ -1,6 +1,7 @@ package circuit import ( + "runtime" "testing" "time" @@ -40,6 +41,7 @@ func TestWindowSlides(t *testing.T) { w := newWindow(time.Millisecond*10, 2, c) w.Run() + runtime.Gosched() w.Fail() c.Add(time.Millisecond * 5) @@ -58,6 +60,7 @@ func TestWindowSlides(t *testing.T) { } w.Run() + runtime.Gosched() c.Add(time.Millisecond * 15) w.Success() w.Stop()