From 62249645b41a07c40d506365759b6db5e658e329 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Fri, 10 Jun 2022 13:46:14 -0400 Subject: [PATCH 01/10] Introduce new test routine based on rate controlled requests --- cmd/ratectrl/main.go | 299 +++++++++++++++++++++++++++++++++++++++++ cmd/ratectrl/report.go | 43 ++++++ 2 files changed, 342 insertions(+) create mode 100644 cmd/ratectrl/main.go create mode 100644 cmd/ratectrl/report.go diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go new file mode 100644 index 0000000..f39ae10 --- /dev/null +++ b/cmd/ratectrl/main.go @@ -0,0 +1,299 @@ +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "math/big" + "math/rand" + "time" + + "github.com/dgryski/go-pcgr" + "github.com/jamiealquiza/tachymeter" + mct "github.com/memcached/mctester" + + "golang.org/x/sync/errgroup" + + "go.uber.org/ratelimit" +) + +func main() { + fmt.Println("starting") + + clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss") + connCount := flag.Int("conncount", 1, "number of client connections to establish") + duration := flag.Duration("duration", 0, "length of time that the test will run (0 for unlimited)") + keyLength := flag.Int("keylength", 10, "number of random characters to append to key") + keyPrefix := flag.String("keyprefix", "mctester:", "prefix to append to all generated keys") + keySpace := flag.Int("keyspace", 1000, "number of unique keys to generate") + pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) number of GET requests to stack within the same syscall") + delRatio := flag.Int("ratiodel", 0, "proportion of requests that should be sent as `deletes`") + getRatio := flag.Int("ratioget", 90, "proportion of requests that should be sent as `gets`") + setRatio := flag.Int("ratioset", 10, "proportion of requests that should be sent as `sets`") + rngSeed := flag.Int64("rngseed", time.Now().UnixNano(), "seed value used when initializing RNG") + rps := flag.Int("rps", 0, "target number of requests per second (0 for unlimited)") + server := flag.String("server", "127.0.0.1:11211", "`ip:port` for Memcached instance under test") + socket := flag.String("socket", "", "domain socket used for connections") + stripKeyPrefix := flag.Bool("stripkeyprefix", false, "remove key prefix before comparing with response") + keyTTL := flag.Uint("ttl", 180, "TTL to set with new items") + validateGets := flag.Bool("validate", false, "compare the value returned from a `get` to what was initially `set`") + valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") + warmPercent := flag.Int("warm", 90, "percent of keys to `set` in Memcached before testing begins") + useZipf := flag.Bool("zipf", false, "use Zipf instead of uniform randomness (slow)") + zipfS := flag.Float64("zipfS", 1.01, "zipf S value (general pull toward zero) must be > 1.0") + zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number") + + flag.Parse() + + testConfig := &Config{ + ClientFlags: *clientFlags, + ConnCount: *connCount, + DelRatio: *delRatio, + Duration: *duration, + GetRatio: *getRatio, + KeyLength: *keyLength, + KeyPrefix: *keyPrefix, + KeySpace: *keySpace, + KeyTTL: *keyTTL, + Pipelines: *pipelines, + RngSeed: *rngSeed, + RPS: *rps, + Servers: []string{*server}, + SetRatio: *setRatio, + Socket: *socket, + StripKeyPrefix: *stripKeyPrefix, + UseZipf: *useZipf, + ValidateGets: *validateGets, + ValueSize: *valueSize, + WarmPercent: *warmPercent, + ZipfS: *zipfS, + ZipfV: *zipfV, + } + + testConfig.Run() +} + +type Config struct { + ClientFlags uint + ConnCount int + DelRatio int + Duration time.Duration + GetRatio int + KeyLength int + KeyPrefix string + KeySpace int + KeyTTL uint + Pipelines uint + RngSeed int64 + RPS int + Servers []string + SetRatio int + Socket string + StripKeyPrefix bool + UseZipf bool + ValidateGets bool + ValueSize uint + WarmPercent int + ZipfS float64 // (> 1, generally 1.01-2) pulls the power curve toward 0) + ZipfV float64 // v (< keySpace) puts the main part of the curve before this number + + cacheEntries []CacheEntry + tachymeter *tachymeter.Tachymeter +} + +type CacheEntry struct { + key string + value []byte +} + +func (conf *Config) GenerateEntries() (entries []CacheEntry) { + entries = make([]CacheEntry, conf.KeySpace) + subRS := pcgr.New(1, 0) + + for i := 0; i < conf.KeySpace; i++ { + subRS.Seed(conf.RngSeed + int64(i)) + key := mct.RandString(&subRS, conf.KeyLength, conf.KeyPrefix) + + valSeed := new(big.Int).SetBytes([]byte(key)).Int64() + subRS.Seed(valSeed) + value := mct.RandBytes(&subRS, int(conf.ValueSize)) + + entries[i] = CacheEntry{key, value} + } + + return +} + +func (conf *Config) Run() (err error) { + g, _ := errgroup.WithContext(context.Background()) + + samples := conf.RPS * conf.ConnCount + if samples < 1000 { + samples = 1000 + } + + conf.cacheEntries = conf.GenerateEntries() + + if conf.WarmPercent > 0 { + err = conf.WarmCache() + if err != nil { + return + } + } + + threadStats := make(chan Stats, conf.ConnCount) + conf.tachymeter = tachymeter.New(&tachymeter.Config{Size: samples}) + startTime := time.Now() + + for worker := 0; worker < conf.ConnCount; worker++ { + index := worker + g.Go(func() error { + return conf.Worker(index, threadStats) + }) + } + + err = g.Wait() + endTime := time.Now() + if err != nil { + return + } + + conf.tachymeter.SetWallTime(time.Since(startTime)) + close(threadStats) + testStats := &Stats{} + for stats := range threadStats { + testStats.Add(&stats) + } + if !conf.ValidateGets { + testStats.KeyCollisions = -1 + } + + report := &Report{ + StartTime: startTime, + EndTime: endTime, + Config: conf, + Metrics: conf.tachymeter.Calc(), + Stats: testStats, + } + err = report.PrettyPrint() + + return +} + +func (conf *Config) WarmCache() error { + mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + rs := pcgr.New(conf.RngSeed, 0) + randR := rand.New(&rs) + + for keyIndex := 0; keyIndex < conf.KeySpace; keyIndex++ { + if randR.Intn(100) < conf.WarmPercent { + entry := conf.cacheEntries[keyIndex] + key := entry.key + value := entry.value + + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + } + } + + return nil +} + +func (conf *Config) Worker(index int, results chan Stats) error { + stats := Stats{} + mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + + workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) + rs := pcgr.New(workerSeed, 0) + randR := rand.New(&rs) + + var zipRS *rand.Zipf + if conf.UseZipf { + zipRS = rand.NewZipf(randR, conf.ZipfS, conf.ZipfV, uint64(conf.KeySpace)) + if zipRS == nil { + fmt.Printf("bad arguments to zipf: S: %f V: %f\n", conf.ZipfS, conf.ZipfV) + return nil + } + } + + var rl ratelimit.Limiter + if conf.RPS > 0 { + rl = ratelimit.New(conf.RPS) + } else { + rl = ratelimit.NewUnlimited() + } + + for start := time.Now(); ; { + iterStart := time.Now() + if iterStart.Sub(start) > conf.Duration { + break + } + + var index int + if conf.UseZipf { + index = int(zipRS.Uint64()) + } else { + index = randR.Intn(conf.KeySpace) + } + + entry := conf.cacheEntries[index] + key := entry.key + + switch rng := randR.Intn(conf.DelRatio + conf.SetRatio + conf.GetRatio); { + case rng < conf.DelRatio: + rl.Take() + code, err := mc.Delete(key) + if err != nil { + fmt.Println(err) + return err + } + + switch code { + case mct.McDELETED: + stats.DeleteHits++ + case mct.McNOT_FOUND: + stats.DeleteMisses++ + } + case rng < (conf.DelRatio + conf.SetRatio): + value := entry.value + rl.Take() + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + + stats.SetsTotal++ + default: + rl.Take() + _, value, code, err := mc.Get(key) + if err != nil { + fmt.Println(err, value) + return err + } + + switch code { + case mct.McHIT: + stats.GetHits++ + + expectedValue := entry.value + if conf.ValidateGets && !bytes.Equal(value, expectedValue) { + stats.KeyCollisions++ + fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) + } + + case mct.McMISS: + stats.GetMisses++ + } + } + + conf.tachymeter.AddTime(time.Since(iterStart)) + } + + results <- stats + return nil +} diff --git a/cmd/ratectrl/report.go b/cmd/ratectrl/report.go new file mode 100644 index 0000000..0669956 --- /dev/null +++ b/cmd/ratectrl/report.go @@ -0,0 +1,43 @@ +package main + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/jamiealquiza/tachymeter" +) + +type Report struct { + StartTime time.Time + EndTime time.Time + Config *Config + Metrics *tachymeter.Metrics + Stats *Stats +} + +func (report *Report) PrettyPrint() (err error) { + jsonReport, err := json.MarshalIndent(report, "", "\t") + if err == nil { + fmt.Println(string(jsonReport)) + } + return +} + +type Stats struct { + DeleteHits int + DeleteMisses int + GetHits int + GetMisses int + KeyCollisions int + SetsTotal int +} + +func (stats *Stats) Add(other *Stats) { + stats.DeleteHits += other.DeleteHits + stats.DeleteMisses += other.DeleteMisses + stats.GetHits += other.GetHits + stats.GetMisses += other.GetMisses + stats.KeyCollisions += other.KeyCollisions + stats.SetsTotal += other.SetsTotal +} From 828c941fd41c883337c8c347a684164fc84b08ab Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Tue, 14 Jun 2022 11:56:17 -0400 Subject: [PATCH 02/10] Implement modules and move common code to `internal` package --- cmd/basic/main.go | 3 ++- cmd/ratectrl/main.go | 3 ++- cmd/server/loader_basic.go | 3 ++- go.mod | 14 +++++++++++ go.sum | 24 +++++++++++++++++++ protocol.go => internal/protocol.go | 2 +- protocol_test.go => internal/protocol_test.go | 2 +- support.go => internal/support.go | 2 +- 8 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 go.mod create mode 100644 go.sum rename protocol.go => internal/protocol.go (99%) rename protocol_test.go => internal/protocol_test.go (99%) rename support.go => internal/support.go (99%) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index ebacd52..1de2c7b 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -8,8 +8,9 @@ import ( "runtime/pprof" "time" + mct "mctester/internal" + "github.com/dgryski/go-pcgr" - mct "github.com/memcached/mctester" ) var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file") diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go index f39ae10..f3fc594 100644 --- a/cmd/ratectrl/main.go +++ b/cmd/ratectrl/main.go @@ -9,9 +9,10 @@ import ( "math/rand" "time" + mct "mctester/internal" + "github.com/dgryski/go-pcgr" "github.com/jamiealquiza/tachymeter" - mct "github.com/memcached/mctester" "golang.org/x/sync/errgroup" diff --git a/cmd/server/loader_basic.go b/cmd/server/loader_basic.go index e923401..90a8ec1 100644 --- a/cmd/server/loader_basic.go +++ b/cmd/server/loader_basic.go @@ -5,8 +5,9 @@ import ( "math/rand" "time" + mct "mctester/internal" + "github.com/dgryski/go-pcgr" - mct "github.com/memcached/mctester" ) // Basic persistent load test, using text protocol: diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d888042 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module mctester + +go 1.18 + +require ( + github.com/jamiealquiza/tachymeter v2.0.0+incompatible + go.uber.org/ratelimit v0.2.0 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f +) + +require ( + github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect + github.com/dgryski/go-pcgr v0.0.0-20211101192959-4b34ab9ccb8c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6e9ae11 --- /dev/null +++ b/go.sum @@ -0,0 +1,24 @@ +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-pcgr v0.0.0-20211101192959-4b34ab9ccb8c h1:f26exq4Heidug80gN340GXU86doiFKGUYFluBO+mbDk= +github.com/dgryski/go-pcgr v0.0.0-20211101192959-4b34ab9ccb8c/go.mod h1:ztV/u9hqJRBCT0P03v0Ueol7unBefCKL+paOoIZkR88= +github.com/jamiealquiza/tachymeter v2.0.0+incompatible h1:mGiF1DGo8l6vnGT8FXNNcIXht/YmjzfraiUprXYwJ6g= +github.com/jamiealquiza/tachymeter v2.0.0+incompatible/go.mod h1:Ayf6zPZKEnLsc3winWEXJRkTBhdHo58HODAu1oFJkYU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA= +go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/protocol.go b/internal/protocol.go similarity index 99% rename from protocol.go rename to internal/protocol.go index 7ed0ac0..42c2cc2 100644 --- a/protocol.go +++ b/internal/protocol.go @@ -6,7 +6,7 @@ // https://github.com/stathat/consistent MIT licensed. // maybe just an example that uses it separately? -package mctester +package internal import ( "bufio" diff --git a/protocol_test.go b/internal/protocol_test.go similarity index 99% rename from protocol_test.go rename to internal/protocol_test.go index 357558c..c72e2cc 100644 --- a/protocol_test.go +++ b/internal/protocol_test.go @@ -1,4 +1,4 @@ -package mctester +package internal import ( "bytes" diff --git a/support.go b/internal/support.go similarity index 99% rename from support.go rename to internal/support.go index 2c9ae65..d2263c4 100644 --- a/support.go +++ b/internal/support.go @@ -1,4 +1,4 @@ -package mctester +package internal // "github.com/cespare/xxhash" import ( From c159a697825654e279caca2ff94b961b0bd8577a Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Tue, 14 Jun 2022 13:36:36 -0400 Subject: [PATCH 03/10] Use regular single quotes in flag descriptions --- cmd/ratectrl/main.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go index f3fc594..2317a99 100644 --- a/cmd/ratectrl/main.go +++ b/cmd/ratectrl/main.go @@ -29,18 +29,18 @@ func main() { keyPrefix := flag.String("keyprefix", "mctester:", "prefix to append to all generated keys") keySpace := flag.Int("keyspace", 1000, "number of unique keys to generate") pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) number of GET requests to stack within the same syscall") - delRatio := flag.Int("ratiodel", 0, "proportion of requests that should be sent as `deletes`") - getRatio := flag.Int("ratioget", 90, "proportion of requests that should be sent as `gets`") - setRatio := flag.Int("ratioset", 10, "proportion of requests that should be sent as `sets`") + delRatio := flag.Int("ratiodel", 0, "proportion of requests that should be sent as 'deletes'") + getRatio := flag.Int("ratioget", 90, "proportion of requests that should be sent as 'gets'") + setRatio := flag.Int("ratioset", 10, "proportion of requests that should be sent as 'sets'") rngSeed := flag.Int64("rngseed", time.Now().UnixNano(), "seed value used when initializing RNG") rps := flag.Int("rps", 0, "target number of requests per second (0 for unlimited)") server := flag.String("server", "127.0.0.1:11211", "`ip:port` for Memcached instance under test") socket := flag.String("socket", "", "domain socket used for connections") stripKeyPrefix := flag.Bool("stripkeyprefix", false, "remove key prefix before comparing with response") keyTTL := flag.Uint("ttl", 180, "TTL to set with new items") - validateGets := flag.Bool("validate", false, "compare the value returned from a `get` to what was initially `set`") + validateGets := flag.Bool("validate", false, "compare the value returned from a 'get' to what was initially 'set'") valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") - warmPercent := flag.Int("warm", 90, "percent of keys to `set` in Memcached before testing begins") + warmPercent := flag.Int("warm", 90, "percent of keys to 'set' in Memcached before testing begins") useZipf := flag.Bool("zipf", false, "use Zipf instead of uniform randomness (slow)") zipfS := flag.Float64("zipfS", 1.01, "zipf S value (general pull toward zero) must be > 1.0") zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number") From be656c14ef71b1fa521a531dee9069839b487179 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Wed, 15 Jun 2022 13:05:49 -0400 Subject: [PATCH 04/10] Enforce rate limiting globally across all workers instead of on a per-worker level --- cmd/ratectrl/main.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go index 2317a99..fc41e1a 100644 --- a/cmd/ratectrl/main.go +++ b/cmd/ratectrl/main.go @@ -100,6 +100,7 @@ type Config struct { ZipfV float64 // v (< keySpace) puts the main part of the curve before this number cacheEntries []CacheEntry + rateLimiter ratelimit.Limiter tachymeter *tachymeter.Tachymeter } @@ -143,6 +144,12 @@ func (conf *Config) Run() (err error) { } } + if conf.RPS > 0 { + conf.rateLimiter = ratelimit.New(conf.RPS) + } else { + conf.rateLimiter = ratelimit.NewUnlimited() + } + threadStats := make(chan Stats, conf.ConnCount) conf.tachymeter = tachymeter.New(&tachymeter.Config{Size: samples}) startTime := time.Now() @@ -205,8 +212,9 @@ func (conf *Config) WarmCache() error { } func (conf *Config) Worker(index int, results chan Stats) error { - stats := Stats{} mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + stats := Stats{} + rl := conf.rateLimiter workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) rs := pcgr.New(workerSeed, 0) @@ -221,13 +229,6 @@ func (conf *Config) Worker(index int, results chan Stats) error { } } - var rl ratelimit.Limiter - if conf.RPS > 0 { - rl = ratelimit.New(conf.RPS) - } else { - rl = ratelimit.NewUnlimited() - } - for start := time.Now(); ; { iterStart := time.Now() if iterStart.Sub(start) > conf.Duration { From e2c745197c52d4ef7f5b1fa808577d259137df69 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Wed, 15 Jun 2022 15:39:32 -0400 Subject: [PATCH 05/10] Expand module name to include full repo path --- cmd/basic/main.go | 2 +- cmd/ratectrl/main.go | 2 +- cmd/server/loader_basic.go | 2 +- go.mod | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index 1de2c7b..a83a098 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -8,7 +8,7 @@ import ( "runtime/pprof" "time" - mct "mctester/internal" + mct "github.com/memcached/mctester/internal" "github.com/dgryski/go-pcgr" ) diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go index fc41e1a..359b643 100644 --- a/cmd/ratectrl/main.go +++ b/cmd/ratectrl/main.go @@ -9,7 +9,7 @@ import ( "math/rand" "time" - mct "mctester/internal" + mct "github.com/memcached/mctester/internal" "github.com/dgryski/go-pcgr" "github.com/jamiealquiza/tachymeter" diff --git a/cmd/server/loader_basic.go b/cmd/server/loader_basic.go index 90a8ec1..c89e3db 100644 --- a/cmd/server/loader_basic.go +++ b/cmd/server/loader_basic.go @@ -5,7 +5,7 @@ import ( "math/rand" "time" - mct "mctester/internal" + mct "github.com/memcached/mctester/internal" "github.com/dgryski/go-pcgr" ) diff --git a/go.mod b/go.mod index d888042..7c8b03a 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module mctester +module github.com/memcached/mctester go 1.18 From d8fd8b2f74e2f3d8f83aa7dceb7adc9ecdc9dc45 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Wed, 15 Jun 2022 16:31:30 -0400 Subject: [PATCH 06/10] Move `ratectrl` test logic out of main package --- cmd/ratectrl/main.go | 240 +------------------------------ pkg/ratectrl/config.go | 241 ++++++++++++++++++++++++++++++++ {cmd => pkg}/ratectrl/report.go | 2 +- 3 files changed, 244 insertions(+), 239 deletions(-) create mode 100644 pkg/ratectrl/config.go rename {cmd => pkg}/ratectrl/report.go (97%) diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go index 359b643..3327eb6 100644 --- a/cmd/ratectrl/main.go +++ b/cmd/ratectrl/main.go @@ -1,22 +1,11 @@ package main import ( - "bytes" - "context" "flag" "fmt" - "math/big" - "math/rand" "time" - mct "github.com/memcached/mctester/internal" - - "github.com/dgryski/go-pcgr" - "github.com/jamiealquiza/tachymeter" - - "golang.org/x/sync/errgroup" - - "go.uber.org/ratelimit" + "github.com/memcached/mctester/pkg/ratectrl" ) func main() { @@ -47,7 +36,7 @@ func main() { flag.Parse() - testConfig := &Config{ + testConfig := &ratectrl.Config{ ClientFlags: *clientFlags, ConnCount: *connCount, DelRatio: *delRatio, @@ -74,228 +63,3 @@ func main() { testConfig.Run() } - -type Config struct { - ClientFlags uint - ConnCount int - DelRatio int - Duration time.Duration - GetRatio int - KeyLength int - KeyPrefix string - KeySpace int - KeyTTL uint - Pipelines uint - RngSeed int64 - RPS int - Servers []string - SetRatio int - Socket string - StripKeyPrefix bool - UseZipf bool - ValidateGets bool - ValueSize uint - WarmPercent int - ZipfS float64 // (> 1, generally 1.01-2) pulls the power curve toward 0) - ZipfV float64 // v (< keySpace) puts the main part of the curve before this number - - cacheEntries []CacheEntry - rateLimiter ratelimit.Limiter - tachymeter *tachymeter.Tachymeter -} - -type CacheEntry struct { - key string - value []byte -} - -func (conf *Config) GenerateEntries() (entries []CacheEntry) { - entries = make([]CacheEntry, conf.KeySpace) - subRS := pcgr.New(1, 0) - - for i := 0; i < conf.KeySpace; i++ { - subRS.Seed(conf.RngSeed + int64(i)) - key := mct.RandString(&subRS, conf.KeyLength, conf.KeyPrefix) - - valSeed := new(big.Int).SetBytes([]byte(key)).Int64() - subRS.Seed(valSeed) - value := mct.RandBytes(&subRS, int(conf.ValueSize)) - - entries[i] = CacheEntry{key, value} - } - - return -} - -func (conf *Config) Run() (err error) { - g, _ := errgroup.WithContext(context.Background()) - - samples := conf.RPS * conf.ConnCount - if samples < 1000 { - samples = 1000 - } - - conf.cacheEntries = conf.GenerateEntries() - - if conf.WarmPercent > 0 { - err = conf.WarmCache() - if err != nil { - return - } - } - - if conf.RPS > 0 { - conf.rateLimiter = ratelimit.New(conf.RPS) - } else { - conf.rateLimiter = ratelimit.NewUnlimited() - } - - threadStats := make(chan Stats, conf.ConnCount) - conf.tachymeter = tachymeter.New(&tachymeter.Config{Size: samples}) - startTime := time.Now() - - for worker := 0; worker < conf.ConnCount; worker++ { - index := worker - g.Go(func() error { - return conf.Worker(index, threadStats) - }) - } - - err = g.Wait() - endTime := time.Now() - if err != nil { - return - } - - conf.tachymeter.SetWallTime(time.Since(startTime)) - close(threadStats) - testStats := &Stats{} - for stats := range threadStats { - testStats.Add(&stats) - } - if !conf.ValidateGets { - testStats.KeyCollisions = -1 - } - - report := &Report{ - StartTime: startTime, - EndTime: endTime, - Config: conf, - Metrics: conf.tachymeter.Calc(), - Stats: testStats, - } - err = report.PrettyPrint() - - return -} - -func (conf *Config) WarmCache() error { - mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) - rs := pcgr.New(conf.RngSeed, 0) - randR := rand.New(&rs) - - for keyIndex := 0; keyIndex < conf.KeySpace; keyIndex++ { - if randR.Intn(100) < conf.WarmPercent { - entry := conf.cacheEntries[keyIndex] - key := entry.key - value := entry.value - - _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) - if err != nil { - fmt.Println(err) - return err - } - } - } - - return nil -} - -func (conf *Config) Worker(index int, results chan Stats) error { - mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) - stats := Stats{} - rl := conf.rateLimiter - - workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) - rs := pcgr.New(workerSeed, 0) - randR := rand.New(&rs) - - var zipRS *rand.Zipf - if conf.UseZipf { - zipRS = rand.NewZipf(randR, conf.ZipfS, conf.ZipfV, uint64(conf.KeySpace)) - if zipRS == nil { - fmt.Printf("bad arguments to zipf: S: %f V: %f\n", conf.ZipfS, conf.ZipfV) - return nil - } - } - - for start := time.Now(); ; { - iterStart := time.Now() - if iterStart.Sub(start) > conf.Duration { - break - } - - var index int - if conf.UseZipf { - index = int(zipRS.Uint64()) - } else { - index = randR.Intn(conf.KeySpace) - } - - entry := conf.cacheEntries[index] - key := entry.key - - switch rng := randR.Intn(conf.DelRatio + conf.SetRatio + conf.GetRatio); { - case rng < conf.DelRatio: - rl.Take() - code, err := mc.Delete(key) - if err != nil { - fmt.Println(err) - return err - } - - switch code { - case mct.McDELETED: - stats.DeleteHits++ - case mct.McNOT_FOUND: - stats.DeleteMisses++ - } - case rng < (conf.DelRatio + conf.SetRatio): - value := entry.value - rl.Take() - _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) - if err != nil { - fmt.Println(err) - return err - } - - stats.SetsTotal++ - default: - rl.Take() - _, value, code, err := mc.Get(key) - if err != nil { - fmt.Println(err, value) - return err - } - - switch code { - case mct.McHIT: - stats.GetHits++ - - expectedValue := entry.value - if conf.ValidateGets && !bytes.Equal(value, expectedValue) { - stats.KeyCollisions++ - fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) - } - - case mct.McMISS: - stats.GetMisses++ - } - } - - conf.tachymeter.AddTime(time.Since(iterStart)) - } - - results <- stats - return nil -} diff --git a/pkg/ratectrl/config.go b/pkg/ratectrl/config.go new file mode 100644 index 0000000..8b5a4b9 --- /dev/null +++ b/pkg/ratectrl/config.go @@ -0,0 +1,241 @@ +package ratectrl + +import ( + "bytes" + "context" + "fmt" + "math/big" + "math/rand" + "time" + + "github.com/dgryski/go-pcgr" + "github.com/jamiealquiza/tachymeter" + mct "github.com/memcached/mctester/internal" + "go.uber.org/ratelimit" + "golang.org/x/sync/errgroup" +) + +type Config struct { + ClientFlags uint + ConnCount int + DelRatio int + Duration time.Duration + GetRatio int + KeyLength int + KeyPrefix string + KeySpace int + KeyTTL uint + Pipelines uint + RngSeed int64 + RPS int + Servers []string + SetRatio int + Socket string + StripKeyPrefix bool + UseZipf bool + ValidateGets bool + ValueSize uint + WarmPercent int + ZipfS float64 // (> 1, generally 1.01-2) pulls the power curve toward 0) + ZipfV float64 // v (< keySpace) puts the main part of the curve before this number + + cacheEntries []CacheEntry + rateLimiter ratelimit.Limiter + tachymeter *tachymeter.Tachymeter +} + +type CacheEntry struct { + key string + value []byte +} + +func (conf *Config) GenerateEntries() (entries []CacheEntry) { + entries = make([]CacheEntry, conf.KeySpace) + subRS := pcgr.New(1, 0) + + for i := 0; i < conf.KeySpace; i++ { + subRS.Seed(conf.RngSeed + int64(i)) + key := mct.RandString(&subRS, conf.KeyLength, conf.KeyPrefix) + + valSeed := new(big.Int).SetBytes([]byte(key)).Int64() + subRS.Seed(valSeed) + value := mct.RandBytes(&subRS, int(conf.ValueSize)) + + entries[i] = CacheEntry{key, value} + } + + return +} + +func (conf *Config) Run() (err error) { + g, _ := errgroup.WithContext(context.Background()) + + samples := conf.RPS * conf.ConnCount + if samples < 1000 { + samples = 1000 + } + + conf.cacheEntries = conf.GenerateEntries() + + if conf.WarmPercent > 0 { + err = conf.WarmCache() + if err != nil { + return + } + } + + if conf.RPS > 0 { + conf.rateLimiter = ratelimit.New(conf.RPS) + } else { + conf.rateLimiter = ratelimit.NewUnlimited() + } + + threadStats := make(chan Stats, conf.ConnCount) + conf.tachymeter = tachymeter.New(&tachymeter.Config{Size: samples}) + startTime := time.Now() + + for worker := 0; worker < conf.ConnCount; worker++ { + index := worker + g.Go(func() error { + return conf.Worker(index, threadStats) + }) + } + + err = g.Wait() + endTime := time.Now() + if err != nil { + return + } + + conf.tachymeter.SetWallTime(time.Since(startTime)) + close(threadStats) + testStats := &Stats{} + for stats := range threadStats { + testStats.Add(&stats) + } + if !conf.ValidateGets { + testStats.KeyCollisions = -1 + } + + report := &Report{ + StartTime: startTime, + EndTime: endTime, + Config: conf, + Metrics: conf.tachymeter.Calc(), + Stats: testStats, + } + err = report.PrettyPrint() + + return +} + +func (conf *Config) WarmCache() error { + mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + rs := pcgr.New(conf.RngSeed, 0) + randR := rand.New(&rs) + + for keyIndex := 0; keyIndex < conf.KeySpace; keyIndex++ { + if randR.Intn(100) < conf.WarmPercent { + entry := conf.cacheEntries[keyIndex] + key := entry.key + value := entry.value + + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + } + } + + return nil +} + +func (conf *Config) Worker(index int, results chan Stats) error { + mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + stats := Stats{} + rl := conf.rateLimiter + + workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) + rs := pcgr.New(workerSeed, 0) + randR := rand.New(&rs) + + var zipRS *rand.Zipf + if conf.UseZipf { + zipRS = rand.NewZipf(randR, conf.ZipfS, conf.ZipfV, uint64(conf.KeySpace)) + if zipRS == nil { + fmt.Printf("bad arguments to zipf: S: %f V: %f\n", conf.ZipfS, conf.ZipfV) + return nil + } + } + + for start := time.Now(); ; { + iterStart := time.Now() + if iterStart.Sub(start) > conf.Duration { + break + } + + var index int + if conf.UseZipf { + index = int(zipRS.Uint64()) + } else { + index = randR.Intn(conf.KeySpace) + } + + entry := conf.cacheEntries[index] + key := entry.key + + switch rng := randR.Intn(conf.DelRatio + conf.SetRatio + conf.GetRatio); { + case rng < conf.DelRatio: + rl.Take() + code, err := mc.Delete(key) + if err != nil { + fmt.Println(err) + return err + } + + switch code { + case mct.McDELETED: + stats.DeleteHits++ + case mct.McNOT_FOUND: + stats.DeleteMisses++ + } + case rng < (conf.DelRatio + conf.SetRatio): + value := entry.value + rl.Take() + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + + stats.SetsTotal++ + default: + rl.Take() + _, value, code, err := mc.Get(key) + if err != nil { + fmt.Println(err, value) + return err + } + + switch code { + case mct.McHIT: + stats.GetHits++ + + expectedValue := entry.value + if conf.ValidateGets && !bytes.Equal(value, expectedValue) { + stats.KeyCollisions++ + fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) + } + + case mct.McMISS: + stats.GetMisses++ + } + } + + conf.tachymeter.AddTime(time.Since(iterStart)) + } + + results <- stats + return nil +} diff --git a/cmd/ratectrl/report.go b/pkg/ratectrl/report.go similarity index 97% rename from cmd/ratectrl/report.go rename to pkg/ratectrl/report.go index 0669956..47c6e78 100644 --- a/cmd/ratectrl/report.go +++ b/pkg/ratectrl/report.go @@ -1,4 +1,4 @@ -package main +package ratectrl import ( "encoding/json" From f0d933e0cdf988a555a54f66a5e0870bd711bed3 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Thu, 16 Jun 2022 10:14:57 -0400 Subject: [PATCH 07/10] Fix import statement positions --- cmd/basic/main.go | 3 +-- cmd/server/loader_basic.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index a83a098..9d53f97 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -8,9 +8,8 @@ import ( "runtime/pprof" "time" - mct "github.com/memcached/mctester/internal" - "github.com/dgryski/go-pcgr" + mct "github.com/memcached/mctester/internal" ) var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file") diff --git a/cmd/server/loader_basic.go b/cmd/server/loader_basic.go index c89e3db..ad0e8b4 100644 --- a/cmd/server/loader_basic.go +++ b/cmd/server/loader_basic.go @@ -5,9 +5,8 @@ import ( "math/rand" "time" - mct "github.com/memcached/mctester/internal" - "github.com/dgryski/go-pcgr" + mct "github.com/memcached/mctester/internal" ) // Basic persistent load test, using text protocol: From 891b864e9b24ebc473c33f966b6b7759f0797a77 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Mon, 27 Jun 2022 12:38:18 -0400 Subject: [PATCH 08/10] Move rate limiting to beginning of test loop --- pkg/ratectrl/config.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/ratectrl/config.go b/pkg/ratectrl/config.go index 8b5a4b9..02a5011 100644 --- a/pkg/ratectrl/config.go +++ b/pkg/ratectrl/config.go @@ -185,9 +185,9 @@ func (conf *Config) Worker(index int, results chan Stats) error { entry := conf.cacheEntries[index] key := entry.key + rl.Take() switch rng := randR.Intn(conf.DelRatio + conf.SetRatio + conf.GetRatio); { case rng < conf.DelRatio: - rl.Take() code, err := mc.Delete(key) if err != nil { fmt.Println(err) @@ -202,7 +202,6 @@ func (conf *Config) Worker(index int, results chan Stats) error { } case rng < (conf.DelRatio + conf.SetRatio): value := entry.value - rl.Take() _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) if err != nil { fmt.Println(err) @@ -211,7 +210,6 @@ func (conf *Config) Worker(index int, results chan Stats) error { stats.SetsTotal++ default: - rl.Take() _, value, code, err := mc.Get(key) if err != nil { fmt.Println(err, value) From ac422cda1db4cc9031ec9f09834de27b285a8b59 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Tue, 28 Jun 2022 09:56:26 -0400 Subject: [PATCH 09/10] Move communication protocol to `pkg` from `internal` --- cmd/basic/main.go | 5 +++-- cmd/server/loader_basic.go | 5 +++-- {internal => pkg/client}/protocol.go | 2 +- {internal => pkg/client}/protocol_test.go | 2 +- pkg/ratectrl/config.go | 13 +++++++------ 5 files changed, 15 insertions(+), 12 deletions(-) rename {internal => pkg/client}/protocol.go (99%) rename {internal => pkg/client}/protocol_test.go (99%) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index 9d53f97..d1566bb 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -10,6 +10,7 @@ import ( "github.com/dgryski/go-pcgr" mct "github.com/memcached/mctester/internal" + "github.com/memcached/mctester/pkg/client" ) var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file") @@ -163,7 +164,7 @@ func (l *BasicLoader) Timer(tag string, start time.Time) { func (l *BasicLoader) Worker(doneChan chan<- int) { // FIXME: selector. host := l.servers[0] - mc := mct.NewClient(host, l.socket, l.pipelines, l.keyPrefix, l.stripKeyPrefix) + mc := client.NewClient(host, l.socket, l.pipelines, l.keyPrefix, l.stripKeyPrefix) bundles := l.requestBundlesPerConn rs := pcgr.New(time.Now().UnixNano(), 0) @@ -224,7 +225,7 @@ func (l *BasicLoader) Worker(doneChan chan<- int) { return } // set missing values - if code == mct.McMISS { + if code == client.McMISS { // TODO: random sizing value := mct.RandBytes(&rs, int(l.valueSize)) start := time.Now() diff --git a/cmd/server/loader_basic.go b/cmd/server/loader_basic.go index ad0e8b4..a80e68d 100644 --- a/cmd/server/loader_basic.go +++ b/cmd/server/loader_basic.go @@ -7,6 +7,7 @@ import ( "github.com/dgryski/go-pcgr" mct "github.com/memcached/mctester/internal" + "github.com/memcached/mctester/pkg/client" ) // Basic persistent load test, using text protocol: @@ -123,7 +124,7 @@ func runBasicLoader(Update <-chan interface{}, worker interface{}) { func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l *BasicLoader) { // TODO: server selector. host := l.Servers[0] - mc := mct.NewClient(host, l.Socket, l.Pipelines, l.KeyPrefix, l.StripKeyPrefix) + mc := client.NewClient(host, l.Socket, l.Pipelines, l.KeyPrefix, l.StripKeyPrefix) bundles := l.RequestBundlesPerConn rs := pcgr.New(time.Now().UnixNano(), 0) @@ -179,7 +180,7 @@ func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l return } // set missing values - if code == mct.McMISS { + if code == client.McMISS { // TODO: random sizing value := mct.RandBytes(&rs, int(l.ValueSize)) mc.Set(key, uint32(l.ClientFlags), uint32(l.KeyTTL), value) diff --git a/internal/protocol.go b/pkg/client/protocol.go similarity index 99% rename from internal/protocol.go rename to pkg/client/protocol.go index 42c2cc2..03c9826 100644 --- a/internal/protocol.go +++ b/pkg/client/protocol.go @@ -6,7 +6,7 @@ // https://github.com/stathat/consistent MIT licensed. // maybe just an example that uses it separately? -package internal +package client import ( "bufio" diff --git a/internal/protocol_test.go b/pkg/client/protocol_test.go similarity index 99% rename from internal/protocol_test.go rename to pkg/client/protocol_test.go index c72e2cc..d4f64d4 100644 --- a/internal/protocol_test.go +++ b/pkg/client/protocol_test.go @@ -1,4 +1,4 @@ -package internal +package client import ( "bytes" diff --git a/pkg/ratectrl/config.go b/pkg/ratectrl/config.go index 02a5011..113f186 100644 --- a/pkg/ratectrl/config.go +++ b/pkg/ratectrl/config.go @@ -11,6 +11,7 @@ import ( "github.com/dgryski/go-pcgr" "github.com/jamiealquiza/tachymeter" mct "github.com/memcached/mctester/internal" + "github.com/memcached/mctester/pkg/client" "go.uber.org/ratelimit" "golang.org/x/sync/errgroup" ) @@ -130,7 +131,7 @@ func (conf *Config) Run() (err error) { } func (conf *Config) WarmCache() error { - mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + mc := client.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) rs := pcgr.New(conf.RngSeed, 0) randR := rand.New(&rs) @@ -152,7 +153,7 @@ func (conf *Config) WarmCache() error { } func (conf *Config) Worker(index int, results chan Stats) error { - mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + mc := client.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) stats := Stats{} rl := conf.rateLimiter @@ -195,9 +196,9 @@ func (conf *Config) Worker(index int, results chan Stats) error { } switch code { - case mct.McDELETED: + case client.McDELETED: stats.DeleteHits++ - case mct.McNOT_FOUND: + case client.McNOT_FOUND: stats.DeleteMisses++ } case rng < (conf.DelRatio + conf.SetRatio): @@ -217,7 +218,7 @@ func (conf *Config) Worker(index int, results chan Stats) error { } switch code { - case mct.McHIT: + case client.McHIT: stats.GetHits++ expectedValue := entry.value @@ -226,7 +227,7 @@ func (conf *Config) Worker(index int, results chan Stats) error { fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) } - case mct.McMISS: + case client.McMISS: stats.GetMisses++ } } From fef1016e4667df44c61fb48be99fa3398c6e5ce8 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Thu, 30 Jun 2022 17:53:12 -0400 Subject: [PATCH 10/10] Support meta commands in tests --- cmd/ratectrl/main.go | 8 +++ pkg/client/protocol.go | 9 ++- pkg/client/protocol_test.go | 4 +- pkg/ratectrl/config.go | 116 +++++++++++++++++++++++++++++------- 4 files changed, 112 insertions(+), 25 deletions(-) diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go index 3327eb6..f7e3a65 100644 --- a/cmd/ratectrl/main.go +++ b/cmd/ratectrl/main.go @@ -17,6 +17,10 @@ func main() { keyLength := flag.Int("keylength", 10, "number of random characters to append to key") keyPrefix := flag.String("keyprefix", "mctester:", "prefix to append to all generated keys") keySpace := flag.Int("keyspace", 1000, "number of unique keys to generate") + metaDelFlags := flag.String("mdflags", "", "flags sent alongside 'Meta Delete' commands") + useMeta := flag.Bool("meta", false, "if true, communicate with Memcached using meta commands") + metaGetFlags := flag.String("mgflags", "f v", "flags sent alongside 'Meta Get' commands") + metaSetFlags := flag.String("msflags", "", "flags sent alongside 'Meta Set' commands") pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) number of GET requests to stack within the same syscall") delRatio := flag.Int("ratiodel", 0, "proportion of requests that should be sent as 'deletes'") getRatio := flag.Int("ratioget", 90, "proportion of requests that should be sent as 'gets'") @@ -46,6 +50,9 @@ func main() { KeyPrefix: *keyPrefix, KeySpace: *keySpace, KeyTTL: *keyTTL, + MetaDelFlags: *metaDelFlags, + MetaGetFlags: *metaGetFlags, + MetaSetFlags: *metaSetFlags, Pipelines: *pipelines, RngSeed: *rngSeed, RPS: *rps, @@ -53,6 +60,7 @@ func main() { SetRatio: *setRatio, Socket: *socket, StripKeyPrefix: *stripKeyPrefix, + UseMeta: *useMeta, UseZipf: *useZipf, ValidateGets: *validateGets, ValueSize: *valueSize, diff --git a/pkg/client/protocol.go b/pkg/client/protocol.go index 03c9826..f1fe932 100644 --- a/pkg/client/protocol.go +++ b/pkg/client/protocol.go @@ -112,6 +112,7 @@ const ( McOK McEN McME + McHD McNS McEX McNF @@ -153,11 +154,10 @@ func (c *Client) ParseMetaResponse() (rflags []byte, value []byte, code McCode, rflags = line[4+offset : len(line)-2] // Have some value data to read. + 2 bytes for \r\n value = make([]byte, size+2) - read, err := io.ReadFull(c.cn.b, value) + _, err := io.ReadFull(c.cn.b, value) if err != nil { return nil, nil, 0, err } - fmt.Printf("res size: %d read: %d\n", size, read) // check for \r\n, cut extra bytes off. if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) { return nil, nil, 0, ErrCorruptValue @@ -177,6 +177,9 @@ func (c *Client) ParseMetaResponse() (rflags []byte, value []byte, code McCode, // Meta Debug command value = line[3 : len(line)-2] code = McME + case "HD": + // Meta STORED/DELETED + code = McHD case "NS": // Meta NOT_STORED rflags = line[3 : len(line)-2] @@ -261,6 +264,8 @@ func (c *Client) MetaSet(key string, flags string, value []byte) (err error) { b.WriteString("ms ") b.WriteString(key) b.WriteString(" ") + b.WriteString(strconv.FormatUint(uint64(len(value)), 10)) + b.WriteString(" ") b.WriteString(flags) b.WriteString("\r\n") // For large sets this ends up flushing twice. diff --git a/pkg/client/protocol_test.go b/pkg/client/protocol_test.go index d4f64d4..9b0b3d4 100644 --- a/pkg/client/protocol_test.go +++ b/pkg/client/protocol_test.go @@ -28,12 +28,12 @@ func newcli() *Client { func TestMeta(t *testing.T) { mc := newcli() { - err := mc.MetaSet("doob", "S4 T300", []byte("foop")) + err := mc.MetaSet("doob", "T300", []byte("foop")) if err != nil { t.Fatalf("metaset error: %v", err) } _, _, c, err := mc.MetaReceive() - if c != McOK { + if c != McHD { t.Fatalf("metaset not stored: %d", c) } } diff --git a/pkg/ratectrl/config.go b/pkg/ratectrl/config.go index 113f186..b837490 100644 --- a/pkg/ratectrl/config.go +++ b/pkg/ratectrl/config.go @@ -26,6 +26,9 @@ type Config struct { KeyPrefix string KeySpace int KeyTTL uint + MetaDelFlags string + MetaGetFlags string + MetaSetFlags string Pipelines uint RngSeed int64 RPS int @@ -33,6 +36,7 @@ type Config struct { SetRatio int Socket string StripKeyPrefix bool + UseMeta bool UseZipf bool ValidateGets bool ValueSize uint @@ -141,10 +145,27 @@ func (conf *Config) WarmCache() error { key := entry.key value := entry.value - _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) - if err != nil { - fmt.Println(err) - return err + if conf.UseMeta { + err := mc.MetaSet(key, conf.MetaSetFlags, value) + if err != nil { + fmt.Printf("metaset error: %v\n", err) + return err + } + + _, _, c, err := mc.MetaReceive() + if c != client.McHD { + fmt.Printf("metaset not stored: %d\n", c) + } + if err != nil { + fmt.Printf("metaset receive error: %v\n", err) + return err + } + } else { + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } } } } @@ -189,36 +210,90 @@ func (conf *Config) Worker(index int, results chan Stats) error { rl.Take() switch rng := randR.Intn(conf.DelRatio + conf.SetRatio + conf.GetRatio); { case rng < conf.DelRatio: - code, err := mc.Delete(key) - if err != nil { - fmt.Println(err) - return err + var code client.McCode + var err error + + if conf.UseMeta { + err = mc.MetaDelete(key, conf.MetaDelFlags) + if err != nil { + fmt.Printf("metadelete error: %v\n", err) + return err + } + + _, _, code, err = mc.MetaReceive() + if code != client.McHD && code != client.McNF { + fmt.Printf("metadelete not successful: %d\n", code) + } + if err != nil { + fmt.Printf("metadelete receive error: %v\n", err) + return err + } + } else { + code, err = mc.Delete(key) + if err != nil { + fmt.Println(err) + return err + } } switch code { - case client.McDELETED: + case client.McDELETED, client.McHD: stats.DeleteHits++ - case client.McNOT_FOUND: + case client.McNOT_FOUND, client.McNF: stats.DeleteMisses++ } case rng < (conf.DelRatio + conf.SetRatio): value := entry.value - _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) - if err != nil { - fmt.Println(err) - return err + + if conf.UseMeta { + err := mc.MetaSet(key, conf.MetaSetFlags, value) + if err != nil { + fmt.Printf("metaset error: %v\n", err) + return err + } + + _, _, code, err := mc.MetaReceive() + if code != client.McHD { + fmt.Printf("metaset not stored: %d\n", code) + } + if err != nil { + fmt.Printf("metaset receive error: %v\n", err) + return err + } + } else { + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } } stats.SetsTotal++ default: - _, value, code, err := mc.Get(key) - if err != nil { - fmt.Println(err, value) - return err + var code client.McCode + var value []byte + if conf.UseMeta { + err := mc.MetaGet(key, conf.MetaGetFlags) + if err != nil { + fmt.Printf("metaget error: %v\n", err) + return err + } + _, value, code, err = mc.MetaReceive() + if err != nil { + fmt.Printf("metaget receive error: %v\n", err) + return err + } + } else { + var err error + _, value, code, err = mc.Get(key) + if err != nil { + fmt.Println(err, value) + return err + } } switch code { - case client.McHIT: + case client.McHIT, client.McVA: stats.GetHits++ expectedValue := entry.value @@ -226,8 +301,7 @@ func (conf *Config) Worker(index int, results chan Stats) error { stats.KeyCollisions++ fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) } - - case client.McMISS: + case client.McMISS, client.McEN: stats.GetMisses++ } }