Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bytepool/bytepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (tp *BytePool) Init(drainPeriod time.Duration, maxSize uint32) {
if drainPeriod > 0 {
tp.drainTicker = time.NewTicker(drainPeriod)
go func() {
for _ = range tp.drainTicker.C {
for range tp.drainTicker.C {
tp.Drain()
}
}()
Expand Down
8 changes: 4 additions & 4 deletions bytepool/bytepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func TestDrain(t *testing.T) {
func TestLimits(t *testing.T) {
t.Parallel()

var ti int
var p BytePool
var ti int
var p BytePool
p.Init(0, 127)

p.Put(make([]byte, 129))
Expand Down Expand Up @@ -122,12 +122,12 @@ func TestLimits(t *testing.T) {
t.Fatal("expected different pool length")
}

p.Put(make([]byte, math.MaxUint32 + 1))
p.Put(make([]byte, math.MaxUint32+1))
if p.entries() != 1 {
t.Fatal("expected the pool to have a single item")
}

p.Put(make([]byte, math.MaxInt32 + 1))
p.Put(make([]byte, math.MaxInt32+1))
ti = (1 << log2Ceil(math.MaxUint32)) - 1
if ti <= 0 {
// 32-bit systems: Put() slice-size math.MaxInt32 + 1 fails
Expand Down
7 changes: 4 additions & 3 deletions circularbuffer/circularbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
// the stack.
//
// This package exports three things:
// StackPusher interface
// StackGetter interface
// CircularBuffer structure
//
// StackPusher interface
// StackGetter interface
// CircularBuffer structure
package circularbuffer

import (
Expand Down
23 changes: 14 additions & 9 deletions ewma/ewma.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2014 CloudFlare, Inc.
//
// Tickless implementation of exponentially decaying moving average
// # Tickless implementation of exponentially decaying moving average
//
// Most of EWMA implementations update values every X seconds. This is
// suboptimal. Instead of having a ticker goroutine it is possible to
Expand Down Expand Up @@ -47,6 +47,14 @@ func (e *Ewma) count(next float64, timeDelta time.Duration) float64 {
return e.Current*weight + next*(1-weight)
}

// Set current value of moving average
//
// Useful for reading saved value on restart (for long running averages) or resetting internal state
func (e *Ewma) Set(value float64, timestamp time.Time) {
e.Current = value
e.lastTimestamp = timestamp
}

// Update moving average with the value.
//
// Uses system clock to determine current time to count wight. Returns
Expand All @@ -63,15 +71,12 @@ func (e *Ewma) Update(next float64, timestamp time.Time) float64 {
return e.Current
}

if e.lastTimestamp.IsZero() {
// Ignore the first sample
e.lastTimestamp = timestamp
return e.Current
}

timeDelta := timestamp.Sub(e.lastTimestamp)
e.lastTimestamp = timestamp

e.Current = e.count(next, timeDelta)
if e.lastTimestamp.IsZero() {
e.Current = next
} else {
e.Current = e.count(next, timeDelta)
}
return e.Current
}
16 changes: 9 additions & 7 deletions ewma/ewma_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,20 @@ var testVectorEwma = [][]testTupleEwma{
},
}

func TestEwmaInit(t *testing.T) {
e := NewEwma(time.Duration(1 * time.Minute))
ts := time.Now()
e.Update(100, ts)

if e.Current != 100 {
t.Errorf("Rate after init should be same as first sample")
}
}
func TestEwma(t *testing.T) {
for testNo, test := range testVectorEwma {
e := NewEwma(time.Duration(1 * time.Minute))

// Feed the 0th timestamp
ts := time.Now()
e.Update(0, ts)

if e.Current != 0 {
t.Errorf("Rate after init should be zero")
}

for lineNo, l := range test {
ts = ts.Add(time.Duration(l.delay * float64(time.Second.Nanoseconds())))
e.Update(l.v, ts)
Expand Down
10 changes: 9 additions & 1 deletion ewma/rate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2014 CloudFlare, Inc.
//
// Facilities for tickless measurment of rates
// # Facilities for tickless measurment of rates
//
// Apply exponentially decaying moving average to count rates of
// things per second. Useful for various metrics.
Expand Down Expand Up @@ -32,6 +32,14 @@ func (r *EwmaRate) Init(halfLife time.Duration) *EwmaRate {
return r
}

// Set current value of rate
//
// Useful for reading saved value on restart (for long running averages) or resetting internal state

func (r *EwmaRate) Set(value float64, timestamp time.Time) {
r.Ewma.Set(value, timestamp)
}

// Notify of an event happening.
//
// Uses system clock to determine current time. Returns current rate.
Expand Down
6 changes: 1 addition & 5 deletions ewma/rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ var testVectorRate = [][]testTupleRate{
// Sanity check (half life is 1 second)
{
// Feeding packets every second gets to 1 pps eventually
{false, 1, 0},
{true, 1, 0},
{true, 1, 0.5},
{true, 1, 0.75},
{true, 1, 0.875},
Expand All @@ -42,7 +40,6 @@ var testVectorRate = [][]testTupleRate{

// Burst of 10, 1ms apart, gets us to ~7 pps
{
{true, 1, 0},
{true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1},
{true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1}, {true, 0.001, -1},
{false, 0, 6.9075045629642595},
Expand All @@ -52,7 +49,6 @@ var testVectorRate = [][]testTupleRate{

// 10 packets 100ms apart, get 5 pps
{
{true, 1, 0},
{true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1},
{true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1}, {true, 0.1, -1},
{false, 0, 5.000000000000002},
Expand All @@ -65,7 +61,7 @@ func TestRate(t *testing.T) {
for testNo, test := range testVectorRate {
ts := time.Now()
e := NewEwmaRate(time.Duration(1 * time.Second))

e.Ewma.Set(0, ts)
for lineNo, l := range test {
ts = ts.Add(time.Duration(l.delay * float64(time.Second.Nanoseconds())))
if l.packet {
Expand Down
6 changes: 3 additions & 3 deletions kt/kt.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func expiryCertMetric(certFile string) error {

func certPaths(dir string) (cert, key, ca string, err error) {
certSets := [][]string{
[]string{"service.pem", "service-key.pem", "ca.pem"}, // certmgr
[]string{"tls.crt", "tls.key", "ca.crt"}, // kubernetes pki
{"service.pem", "service-key.pem", "ca.pem"}, // certmgr
{"tls.crt", "tls.key", "ca.crt"}, // kubernetes pki
}

for _, set := range certSets {
Expand Down Expand Up @@ -498,7 +498,7 @@ func (c *Conn) doGetBulkBytes(ctx context.Context, keys map[string][]byte) error
// TSV value for each key with a _ as a prefix.
// KT then returns the value as a TSV set with _ in front of the keys
keystransmit := make([]KV, 0, len(keys))
for k, _ := range keys {
for k := range keys {
// we set the value to nil because we want a sentinel value
// for when no data was found. This is important for
// when we remove the not found keys from the map
Expand Down
2 changes: 1 addition & 1 deletion kt/kt_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestSetGetRemoveBulk(t *testing.T) {
}
removeKeys := make([]string, len(baseKeys))

for k, _ := range baseKeys {
for k := range baseKeys {
testKeys[k] = ""
removeKeys = append(removeKeys, k)
}
Expand Down
17 changes: 9 additions & 8 deletions lrucache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
// otherwise.
//
// This package exports three things:
// LRUCache: is the main implementation. It supports multithreading by
// using guarding mutex lock.
//
// MultiLRUCache: is a sharded implementation. It supports the same
// API as LRUCache and uses it internally, but is not limited to
// a single CPU as every shard is separately locked. Use this
// data structure instead of LRUCache if you have have lock
// contention issues.
// LRUCache: is the main implementation. It supports multithreading by
// using guarding mutex lock.
//
// Cache interface: Both implementations fulfill it.
// MultiLRUCache: is a sharded implementation. It supports the same
// API as LRUCache and uses it internally, but is not limited to
// a single CPU as every shard is separately locked. Use this
// data structure instead of LRUCache if you have have lock
// contention issues.
//
// Cache interface: Both implementations fulfill it.
package lrucache

import (
Expand Down
2 changes: 1 addition & 1 deletion pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *Pool) Put(x interface{}) {
if p.drainTicker == nil && p.DrainPeriod != 0 {
p.drainTicker = time.NewTicker(p.DrainPeriod)
go func() {
for _ = range p.drainTicker.C {
for range p.drainTicker.C {
p.Drain()
}
}()
Expand Down
2 changes: 1 addition & 1 deletion spacesaving/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (ss *Count) GetAll() []Element {

func (ss *Count) Reset() {
empty := countBucket{}
for i, _ := range ss.olist {
for i := range ss.olist {
delete(ss.hash, ss.olist[i].key)
ss.olist[i] = empty
}
Expand Down
22 changes: 10 additions & 12 deletions spacesaving/rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ func TestRateGetAll(t *testing.T) {

ss := (&Rate{}).Init(2, 1*time.Second)

ss.Touch("a",time.Now())
ss.Touch("a",time.Now())
ss.Touch("b",time.Now())
ss.Touch("b",time.Now())
ss.Touch("c",time.Now())
ss.Touch("c",time.Now())
ss.Touch("a", time.Now())
ss.Touch("a", time.Now())
ss.Touch("b", time.Now())
ss.Touch("b", time.Now())
ss.Touch("c", time.Now())
ss.Touch("c", time.Now())

el := ss.GetAll(time.Now())
if el[0].Key != "c" {
Expand All @@ -124,10 +124,10 @@ func TestRateGetAll(t *testing.T) {
t.Error("expecting lenght = 2")
}

ss.Touch("b",time.Now())
ss.Touch("b",time.Now())
ss.Touch("b",time.Now())
ss.Touch("b",time.Now())
ss.Touch("b", time.Now())
ss.Touch("b", time.Now())
ss.Touch("b", time.Now())
ss.Touch("b", time.Now())

el = ss.GetAll(time.Now())
if el[0].Key != "b" {
Expand All @@ -148,10 +148,8 @@ func TestRateGetAllCover(t *testing.T) {
t.Error("expecting lenght = 0")
}


}


// Benchmark updating times with 10% hit rate.
func BenchmarkTouch16384_ten(bb *testing.B) {
benchmark(bb, 16384, 0.1)
Expand Down
8 changes: 4 additions & 4 deletions spacesaving/srate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ func (ss *SimpleRate) recount(rate float64, lastTs, now int64) float64 {

func (ss *SimpleRate) Touch(key string, nowTs time.Time) {
var (
found bool
bucket *srateBucket
now = nowTs.UnixNano()
found bool
bucket *srateBucket
now = nowTs.UnixNano()
)
bucket, found = ss.hash[key];
bucket, found = ss.hash[key]
if found {
// we already have the correct bucket
} else if len(ss.heap) < ss.size {
Expand Down
2 changes: 1 addition & 1 deletion spacesaving/tools/topdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ devloop:
func Poller(lock *sync.Mutex, ss *spacesaving.Rate, pc *pcap.Pcap) {
w := bufio.NewWriter(os.Stdout)

for _ = range time.Tick(3 * time.Second) {
for range time.Tick(3 * time.Second) {
stat, _ := pc.Getstats()

lock.Lock()
Expand Down