From 439a8243526ae2b3d675b1c3e11ae33716a13c3c Mon Sep 17 00:00:00 2001 From: Mariusz Gronczewski Date: Sun, 27 Nov 2016 15:49:16 +0100 Subject: [PATCH 1/2] Use first data point as starting value instead of zero; allow setting up internal state for save/load purpose for long-running averages Current behaviour of just using zero as starting value is incorrect in many cases, for example when trying to track average of non-zero-by-default values like temperature or voltage --- ewma/ewma.go | 21 +++++++++++++-------- ewma/ewma_test.go | 16 +++++++++------- ewma/rate.go | 8 ++++++++ ewma/rate_test.go | 6 +----- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/ewma/ewma.go b/ewma/ewma.go index 7034abc..635587a 100644 --- a/ewma/ewma.go +++ b/ewma/ewma.go @@ -47,6 +47,14 @@ func (e *Ewma) count(next float64, timeDelta time.Duration) float64 { return e.Current*weight + next*(1-weight) } +// Set current value of moving average +// +// Useful for reading saved value on restart (for long running averages) or resetting internal state +func (e *Ewma) Set(value float64, timestamp time.Time) { + e.Current = value + e.lastTimestamp = timestamp +} + // Update moving average with the value. // // Uses system clock to determine current time to count wight. Returns @@ -63,15 +71,12 @@ func (e *Ewma) Update(next float64, timestamp time.Time) float64 { return e.Current } - if e.lastTimestamp.IsZero() { - // Ignore the first sample - e.lastTimestamp = timestamp - return e.Current - } - timeDelta := timestamp.Sub(e.lastTimestamp) e.lastTimestamp = timestamp - - e.Current = e.count(next, timeDelta) + if e.lastTimestamp.IsZero() { + e.Current = next + } else { + e.Current = e.count(next, timeDelta) + } return e.Current } diff --git a/ewma/ewma_test.go b/ewma/ewma_test.go index 3bc856a..835abf1 100644 --- a/ewma/ewma_test.go +++ b/ewma/ewma_test.go @@ -56,18 +56,20 @@ var testVectorEwma = [][]testTupleEwma{ }, } +func TestEwmaInit(t *testing.T) { + e := NewEwma(time.Duration(1 * time.Minute)) + ts := time.Now() + e.Update(100, ts) + + if e.Current != 100 { + t.Errorf("Rate after init should be same as first sample") + } +} func TestEwma(t *testing.T) { for testNo, test := range testVectorEwma { e := NewEwma(time.Duration(1 * time.Minute)) - - // Feed the 0th timestamp ts := time.Now() e.Update(0, ts) - - if e.Current != 0 { - t.Errorf("Rate after init should be zero") - } - for lineNo, l := range test { ts = ts.Add(time.Duration(l.delay * float64(time.Second.Nanoseconds()))) e.Update(l.v, ts) diff --git a/ewma/rate.go b/ewma/rate.go index 4b67aa7..9e4a3ad 100644 --- a/ewma/rate.go +++ b/ewma/rate.go @@ -32,6 +32,14 @@ func (r *EwmaRate) Init(halfLife time.Duration) *EwmaRate { return r } +// Set current value of rate +// +// Useful for reading saved value on restart (for long running averages) or resetting internal state + +func (r *EwmaRate) Set(value float64, timestamp time.Time) { + r.Ewma.Set(value, timestamp) +} + // Notify of an event happening. // // Uses system clock to determine current time. Returns current rate. diff --git a/ewma/rate_test.go b/ewma/rate_test.go index 0044be6..1075842 100644 --- a/ewma/rate_test.go +++ b/ewma/rate_test.go @@ -17,8 +17,6 @@ var testVectorRate = [][]testTupleRate{ // Sanity check (half life is 1 second) { // Feeding packets every second gets to 1 pps eventually - {false, 1, 0}, - {true, 1, 0}, {true, 1, 0.5}, {true, 1, 0.75}, {true, 1, 0.875}, @@ -42,7 +40,6 @@ var testVectorRate = [][]testTupleRate{ // Burst of 10, 1ms apart, gets us to ~7 pps { - {true, 1, 0}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {false, 0, 6.9075045629642595}, @@ -52,7 +49,6 @@ var testVectorRate = [][]testTupleRate{ // 10 packets 100ms apart, get 5 pps { - {true, 1, 0}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {false, 0, 5.000000000000002}, @@ -65,7 +61,7 @@ func TestRate(t *testing.T) { for testNo, test := range testVectorRate { ts := time.Now() e := NewEwmaRate(time.Duration(1 * time.Second)) - + e.Ewma.Set(0, ts) for lineNo, l := range test { ts = ts.Add(time.Duration(l.delay * float64(time.Second.Nanoseconds()))) if l.packet { From 6282a7c5ec9faf0168822be0087d5b2ec0a33010 Mon Sep 17 00:00:00 2001 From: Mariusz Gronczewski Date: Tue, 26 Nov 2024 19:51:06 +0100 Subject: [PATCH 2/2] gofmt --- bytepool/bytepool.go | 2 +- bytepool/bytepool_test.go | 8 ++++---- circularbuffer/circularbuffer.go | 7 ++++--- ewma/ewma.go | 2 +- ewma/rate.go | 2 +- kt/kt.go | 6 +++--- kt/kt_base_test.go | 2 +- lrucache/cache.go | 17 +++++++++-------- pool/pool.go | 2 +- spacesaving/count.go | 2 +- spacesaving/rate_test.go | 22 ++++++++++------------ spacesaving/srate.go | 8 ++++---- spacesaving/tools/topdns.go | 2 +- 13 files changed, 41 insertions(+), 41 deletions(-) diff --git a/bytepool/bytepool.go b/bytepool/bytepool.go index 8b73f0e..2d6d58b 100644 --- a/bytepool/bytepool.go +++ b/bytepool/bytepool.go @@ -37,7 +37,7 @@ func (tp *BytePool) Init(drainPeriod time.Duration, maxSize uint32) { if drainPeriod > 0 { tp.drainTicker = time.NewTicker(drainPeriod) go func() { - for _ = range tp.drainTicker.C { + for range tp.drainTicker.C { tp.Drain() } }() diff --git a/bytepool/bytepool_test.go b/bytepool/bytepool_test.go index 7102f8f..4c7ae76 100644 --- a/bytepool/bytepool_test.go +++ b/bytepool/bytepool_test.go @@ -90,8 +90,8 @@ func TestDrain(t *testing.T) { func TestLimits(t *testing.T) { t.Parallel() - var ti int - var p BytePool + var ti int + var p BytePool p.Init(0, 127) p.Put(make([]byte, 129)) @@ -122,12 +122,12 @@ func TestLimits(t *testing.T) { t.Fatal("expected different pool length") } - p.Put(make([]byte, math.MaxUint32 + 1)) + p.Put(make([]byte, math.MaxUint32+1)) if p.entries() != 1 { t.Fatal("expected the pool to have a single item") } - p.Put(make([]byte, math.MaxInt32 + 1)) + p.Put(make([]byte, math.MaxInt32+1)) ti = (1 << log2Ceil(math.MaxUint32)) - 1 if ti <= 0 { // 32-bit systems: Put() slice-size math.MaxInt32 + 1 fails diff --git a/circularbuffer/circularbuffer.go b/circularbuffer/circularbuffer.go index 79cca59..3b37c53 100644 --- a/circularbuffer/circularbuffer.go +++ b/circularbuffer/circularbuffer.go @@ -14,9 +14,10 @@ // the stack. // // This package exports three things: -// StackPusher interface -// StackGetter interface -// CircularBuffer structure +// +// StackPusher interface +// StackGetter interface +// CircularBuffer structure package circularbuffer import ( diff --git a/ewma/ewma.go b/ewma/ewma.go index 635587a..9f539d0 100644 --- a/ewma/ewma.go +++ b/ewma/ewma.go @@ -1,6 +1,6 @@ // Copyright (c) 2014 CloudFlare, Inc. // -// Tickless implementation of exponentially decaying moving average +// # Tickless implementation of exponentially decaying moving average // // Most of EWMA implementations update values every X seconds. This is // suboptimal. Instead of having a ticker goroutine it is possible to diff --git a/ewma/rate.go b/ewma/rate.go index 1f99b0a..dcfbcbc 100644 --- a/ewma/rate.go +++ b/ewma/rate.go @@ -1,6 +1,6 @@ // Copyright (c) 2014 CloudFlare, Inc. // -// Facilities for tickless measurment of rates +// # Facilities for tickless measurment of rates // // Apply exponentially decaying moving average to count rates of // things per second. Useful for various metrics. diff --git a/kt/kt.go b/kt/kt.go index 7f61f02..afd117a 100644 --- a/kt/kt.go +++ b/kt/kt.go @@ -110,8 +110,8 @@ func expiryCertMetric(certFile string) error { func certPaths(dir string) (cert, key, ca string, err error) { certSets := [][]string{ - []string{"service.pem", "service-key.pem", "ca.pem"}, // certmgr - []string{"tls.crt", "tls.key", "ca.crt"}, // kubernetes pki + {"service.pem", "service-key.pem", "ca.pem"}, // certmgr + {"tls.crt", "tls.key", "ca.crt"}, // kubernetes pki } for _, set := range certSets { @@ -498,7 +498,7 @@ func (c *Conn) doGetBulkBytes(ctx context.Context, keys map[string][]byte) error // TSV value for each key with a _ as a prefix. // KT then returns the value as a TSV set with _ in front of the keys keystransmit := make([]KV, 0, len(keys)) - for k, _ := range keys { + for k := range keys { // we set the value to nil because we want a sentinel value // for when no data was found. This is important for // when we remove the not found keys from the map diff --git a/kt/kt_base_test.go b/kt/kt_base_test.go index 70f647e..b91f7ee 100644 --- a/kt/kt_base_test.go +++ b/kt/kt_base_test.go @@ -272,7 +272,7 @@ func TestSetGetRemoveBulk(t *testing.T) { } removeKeys := make([]string, len(baseKeys)) - for k, _ := range baseKeys { + for k := range baseKeys { testKeys[k] = "" removeKeys = append(removeKeys, k) } diff --git a/lrucache/cache.go b/lrucache/cache.go index 606574e..33b22a7 100644 --- a/lrucache/cache.go +++ b/lrucache/cache.go @@ -8,16 +8,17 @@ // otherwise. // // This package exports three things: -// LRUCache: is the main implementation. It supports multithreading by -// using guarding mutex lock. // -// MultiLRUCache: is a sharded implementation. It supports the same -// API as LRUCache and uses it internally, but is not limited to -// a single CPU as every shard is separately locked. Use this -// data structure instead of LRUCache if you have have lock -// contention issues. +// LRUCache: is the main implementation. It supports multithreading by +// using guarding mutex lock. // -// Cache interface: Both implementations fulfill it. +// MultiLRUCache: is a sharded implementation. It supports the same +// API as LRUCache and uses it internally, but is not limited to +// a single CPU as every shard is separately locked. Use this +// data structure instead of LRUCache if you have have lock +// contention issues. +// +// Cache interface: Both implementations fulfill it. package lrucache import ( diff --git a/pool/pool.go b/pool/pool.go index c4e345e..40c3786 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -59,7 +59,7 @@ func (p *Pool) Put(x interface{}) { if p.drainTicker == nil && p.DrainPeriod != 0 { p.drainTicker = time.NewTicker(p.DrainPeriod) go func() { - for _ = range p.drainTicker.C { + for range p.drainTicker.C { p.Drain() } }() diff --git a/spacesaving/count.go b/spacesaving/count.go index 3d53904..dffd08a 100644 --- a/spacesaving/count.go +++ b/spacesaving/count.go @@ -83,7 +83,7 @@ func (ss *Count) GetAll() []Element { func (ss *Count) Reset() { empty := countBucket{} - for i, _ := range ss.olist { + for i := range ss.olist { delete(ss.hash, ss.olist[i].key) ss.olist[i] = empty } diff --git a/spacesaving/rate_test.go b/spacesaving/rate_test.go index beae834..ddd0015 100644 --- a/spacesaving/rate_test.go +++ b/spacesaving/rate_test.go @@ -106,12 +106,12 @@ func TestRateGetAll(t *testing.T) { ss := (&Rate{}).Init(2, 1*time.Second) - ss.Touch("a",time.Now()) - ss.Touch("a",time.Now()) - ss.Touch("b",time.Now()) - ss.Touch("b",time.Now()) - ss.Touch("c",time.Now()) - ss.Touch("c",time.Now()) + ss.Touch("a", time.Now()) + ss.Touch("a", time.Now()) + ss.Touch("b", time.Now()) + ss.Touch("b", time.Now()) + ss.Touch("c", time.Now()) + ss.Touch("c", time.Now()) el := ss.GetAll(time.Now()) if el[0].Key != "c" { @@ -124,10 +124,10 @@ func TestRateGetAll(t *testing.T) { t.Error("expecting lenght = 2") } - ss.Touch("b",time.Now()) - ss.Touch("b",time.Now()) - ss.Touch("b",time.Now()) - ss.Touch("b",time.Now()) + ss.Touch("b", time.Now()) + ss.Touch("b", time.Now()) + ss.Touch("b", time.Now()) + ss.Touch("b", time.Now()) el = ss.GetAll(time.Now()) if el[0].Key != "b" { @@ -148,10 +148,8 @@ func TestRateGetAllCover(t *testing.T) { t.Error("expecting lenght = 0") } - } - // Benchmark updating times with 10% hit rate. func BenchmarkTouch16384_ten(bb *testing.B) { benchmark(bb, 16384, 0.1) diff --git a/spacesaving/srate.go b/spacesaving/srate.go index c25f0da..dcd813f 100644 --- a/spacesaving/srate.go +++ b/spacesaving/srate.go @@ -84,11 +84,11 @@ func (ss *SimpleRate) recount(rate float64, lastTs, now int64) float64 { func (ss *SimpleRate) Touch(key string, nowTs time.Time) { var ( - found bool - bucket *srateBucket - now = nowTs.UnixNano() + found bool + bucket *srateBucket + now = nowTs.UnixNano() ) - bucket, found = ss.hash[key]; + bucket, found = ss.hash[key] if found { // we already have the correct bucket } else if len(ss.heap) < ss.size { diff --git a/spacesaving/tools/topdns.go b/spacesaving/tools/topdns.go index 59ed5ea..274d1f5 100644 --- a/spacesaving/tools/topdns.go +++ b/spacesaving/tools/topdns.go @@ -88,7 +88,7 @@ devloop: func Poller(lock *sync.Mutex, ss *spacesaving.Rate, pc *pcap.Pcap) { w := bufio.NewWriter(os.Stdout) - for _ = range time.Tick(3 * time.Second) { + for range time.Tick(3 * time.Second) { stat, _ := pc.Getstats() lock.Lock()