From ae464a6d5cf9048080917b37e5746da260dce599 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 2 Apr 2025 11:48:19 -0700 Subject: [PATCH 1/6] remove unused payload variable Signed-off-by: Noah Watkins --- pkg/worker/verifier/producer_worker.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index c53bc58..fae99b9 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -62,8 +62,6 @@ type ProducerWorker struct { validOffsets TopicOffsetRanges latestValueProduced LatestValueMap - payload []byte - // Used for enabling transactional produces transactionsEnabled bool transactionSTMConfig worker.TransactionSTMConfig @@ -94,7 +92,6 @@ func NewProducerWorker(cfg ProducerConfig) ProducerWorker { Status: NewProducerWorkerStatus(cfg.workerCfg.Topic), latestValueProduced: NewLatestValueMap(cfg.workerCfg.Topic, cfg.nPartitions), validOffsets: validOffsets, - payload: cfg.valueGenerator.Generate(), churnProducers: cfg.messagesPerProducerId > 0, tolerateDataLoss: cfg.workerCfg.TolerateDataLoss, tolerateFailedProduce: cfg.workerCfg.TolerateFailedProduce, From 04bdb052f8b9a87e6933b3964a3846c3b6ee62ac Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 2 Apr 2025 11:50:41 -0700 Subject: [PATCH 2/6] split out separate data generation tasks this is useful to avoid needing to refactor other users of the generate interface like kgo-repeater Signed-off-by: Noah Watkins --- pkg/worker/worker.go | 78 ++++++++++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 10f31a3..2572e65 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -107,48 +107,56 @@ type ValueGenerator struct { var compressible_payload []byte +func (vg *ValueGenerator) GenerateRandom() []byte { + randBytes := make([]byte, vg.PayloadSize) + // An incompressible high entropy payload. This will likely not be UTF-8 decodable. + n, err := rand.Read(randBytes) + if err != nil { + panic(err.Error()) + } + if n != int(vg.PayloadSize) { + panic("Unexpected byte count from rand.Read") + } + // Convert to a valid UTF-8 string, replacing bad chars with " ". + // A valid UTF-8 string is needed to avoid any decoding issues + // for services on the consuming end. + payload := []byte(strings.ToValidUTF8(string(randBytes), " ")) + + // In converting to valid UTF-8, we may have lost some bytes. + // Append back the difference. + diff := int(vg.PayloadSize) - len(payload) + if diff > 0 { + payload = append(payload, make([]byte, diff)...) + } + return payload +} + +func (vg *ValueGenerator) GenerateCompressible() []byte { + // Zeros, which is about as compressible as an array can be. + if len(compressible_payload) == 0 { + compressible_payload = make([]byte, vg.PayloadSize) + } else if len(compressible_payload) != int(vg.PayloadSize) { + // This is an implementation shortcut that lets us use a simple + // global array of zeros for compressible payloads, as long + // as everyone wants the same size. + panic("Can't have multiple compressible generators of different sizes") + } + + // Everyone who asks for compressible payload gets a ref to the same array + // of zeros: this is worthwhile because a compressible producer might do + // huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB. + return compressible_payload +} + func (vg *ValueGenerator) Generate() []byte { isTombstone := rand.Float64() < vg.TombstoneProbability if isTombstone { return nil } if vg.Compressible { - // Zeros, which is about as compressible as an array can be. - if len(compressible_payload) == 0 { - compressible_payload = make([]byte, vg.PayloadSize) - } else if len(compressible_payload) != int(vg.PayloadSize) { - // This is an implementation shortcut that lets us use a simple - // global array of zeros for compressible payloads, as long - // as everyone wants the same size. - panic("Can't have multiple compressible generators of different sizes") - } - - // Everyone who asks for compressible payload gets a ref to the same array - // of zeros: this is worthwhile because a compressible producer might do - // huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB. - return compressible_payload + return vg.GenerateCompressible() } else { - randBytes := make([]byte, vg.PayloadSize) - // An incompressible high entropy payload. This will likely not be UTF-8 decodable. - n, err := rand.Read(randBytes) - if err != nil { - panic(err.Error()) - } - if n != int(vg.PayloadSize) { - panic("Unexpected byte count from rand.Read") - } - // Convert to a valid UTF-8 string, replacing bad chars with " ". - // A valid UTF-8 string is needed to avoid any decoding issues - // for services on the consuming end. - payload := []byte(strings.ToValidUTF8(string(randBytes), " ")) - - // In converting to valid UTF-8, we may have lost some bytes. - // Append back the difference. - diff := int(vg.PayloadSize) - len(payload) - if diff > 0 { - payload = append(payload, make([]byte, diff)...) - } - return payload + return vg.GenerateRandom() } } From 8ba466a27d374582c018edfa9bb136629cf071f0 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 2 Apr 2025 11:50:10 -0700 Subject: [PATCH 3/6] produce random data in non-compressible mode Signed-off-by: Noah Watkins --- pkg/worker/verifier/producer_worker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index fae99b9..1eb88d2 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -132,8 +132,10 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record value.Write(make([]byte, paddingSize)) } payload = value.Bytes() + } else if pw.config.valueGenerator.Compressible { + payload = pw.config.valueGenerator.GenerateCompressible() } else { - payload = make([]byte, pw.config.messageSize) + payload = pw.config.valueGenerator.GenerateRandom() } } From 077b26d96f559c35f23d59fb1807097fbab788f5 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 2 Apr 2025 13:47:15 -0700 Subject: [PATCH 4/6] add benchmakr for old and new random gen Signed-off-by: Noah Watkins --- pkg/worker/worker_test.go | 100 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 pkg/worker/worker_test.go diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go new file mode 100644 index 0000000..f106308 --- /dev/null +++ b/pkg/worker/worker_test.go @@ -0,0 +1,100 @@ +package worker + +import ( + "math/rand" + "strings" + "testing" +) + +type state struct { + data []byte +} + +func newstate(size int) state { + data := make([]byte, size) + for i := range data { + // printable ascii range + data[i] = byte(rand.Intn(126+1-32) + 32) + } + return state{ + data: data, + } +} + +func (s *state) generate(size, frag_size int) []byte { + res := make([]byte, 0, size) + for { + remaining := size - len(res) + if remaining == 0 { + break + } + start := rand.Intn(size) + view := s.data[start:] + end := min(len(view), remaining, frag_size) + res = append(res, view[:end]...) + } + return res +} + +func benchmark_old_random_payload(i int, b *testing.B) { + for n := 0; n < b.N; n++ { + randBytes := make([]byte, i) + // An incompressible high entropy payload. This will likely not be UTF-8 decodable. + n, err := rand.Read(randBytes) + if err != nil { + panic(err.Error()) + } + if n != int(i) { + panic("Unexpected byte count from rand.Read") + } + // Convert to a valid UTF-8 string, replacing bad chars with " ". + // A valid UTF-8 string is needed to avoid any decoding issues + // for services on the consuming end. + payload := []byte(strings.ToValidUTF8(string(randBytes), " ")) + + // In converting to valid UTF-8, we may have lost some bytes. + // Append back the difference. + diff := int(i) - len(payload) + if diff > 0 { + payload = append(payload, make([]byte, diff)...) + } + } +} + +func benchmark_random_payload(i int, b *testing.B) { + s := newstate(1 << 22) + b.ResetTimer() + for n := 0; n < b.N; n++ { + s.generate(i, 1024) + } +} + +func benchmark_empty_payload(i int, b *testing.B) { + gen := func() []byte { + return make([]byte, i) + } + for n := 0; n < b.N; n++ { + gen() + } +} + +func Benchmark_old_random_payload1(b *testing.B) { benchmark_old_random_payload(10, b) } +func Benchmark_old_random_payload2(b *testing.B) { benchmark_old_random_payload(100, b) } +func Benchmark_old_random_payload3(b *testing.B) { benchmark_old_random_payload(1000, b) } +func Benchmark_old_random_payload10(b *testing.B) { benchmark_old_random_payload(10000, b) } +func Benchmark_old_random_payload20(b *testing.B) { benchmark_old_random_payload(100000, b) } +func Benchmark_old_random_payload40(b *testing.B) { benchmark_old_random_payload(1000000, b) } + +func Benchmark_random_payload1(b *testing.B) { benchmark_random_payload(10, b) } +func Benchmark_random_payload2(b *testing.B) { benchmark_random_payload(100, b) } +func Benchmark_random_payload3(b *testing.B) { benchmark_random_payload(1000, b) } +func Benchmark_random_payload10(b *testing.B) { benchmark_random_payload(10000, b) } +func Benchmark_random_payload20(b *testing.B) { benchmark_random_payload(100000, b) } +func Benchmark_random_payload40(b *testing.B) { benchmark_random_payload(1000000, b) } + +func Benchmark_empty_payload1(b *testing.B) { benchmark_empty_payload(10, b) } +func Benchmark_empty_payload2(b *testing.B) { benchmark_empty_payload(100, b) } +func Benchmark_empty_payload3(b *testing.B) { benchmark_empty_payload(1000, b) } +func Benchmark_empty_payload10(b *testing.B) { benchmark_empty_payload(10000, b) } +func Benchmark_empty_payload20(b *testing.B) { benchmark_empty_payload(100000, b) } +func Benchmark_empty_payload40(b *testing.B) { benchmark_empty_payload(1000000, b) } From 4ef032a80197007274af7f0f37b60bbbc13f30f0 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 2 Apr 2025 13:52:28 -0700 Subject: [PATCH 5/6] use faster random data generator Signed-off-by: Noah Watkins --- pkg/worker/worker.go | 51 +++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 2572e65..1404811 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -103,32 +103,45 @@ type ValueGenerator struct { PayloadSize uint64 Compressible bool TombstoneProbability float64 + RandomData []byte } var compressible_payload []byte -func (vg *ValueGenerator) GenerateRandom() []byte { - randBytes := make([]byte, vg.PayloadSize) - // An incompressible high entropy payload. This will likely not be UTF-8 decodable. - n, err := rand.Read(randBytes) - if err != nil { - panic(err.Error()) +func min(vars ...int) int { + v := vars[0] + for _, i := range vars { + if i < v { + v = i + } } - if n != int(vg.PayloadSize) { - panic("Unexpected byte count from rand.Read") + return v +} + +func (vg *ValueGenerator) GenerateRandom() []byte { + if vg.RandomData == nil { + // 2mb should be enough + data := make([]byte, 1<<21) + for i := range data { + // printable ascii range + data[i] = byte(rand.Intn(126+1-32) + 32) + } + vg.RandomData = data } - // Convert to a valid UTF-8 string, replacing bad chars with " ". - // A valid UTF-8 string is needed to avoid any decoding issues - // for services on the consuming end. - payload := []byte(strings.ToValidUTF8(string(randBytes), " ")) - - // In converting to valid UTF-8, we may have lost some bytes. - // Append back the difference. - diff := int(vg.PayloadSize) - len(payload) - if diff > 0 { - payload = append(payload, make([]byte, diff)...) + frag_size := 1024 // didn't seem to have much perf impact + size := int(vg.PayloadSize) + res := make([]byte, 0, size) + for { + remaining := size - len(res) + if remaining == 0 { + break + } + start := rand.Intn(size) + view := vg.RandomData[start:] + end := min(len(view), remaining, frag_size) + res = append(res, view[:end]...) } - return payload + return res } func (vg *ValueGenerator) GenerateCompressible() []byte { From 104fd4df79067219daa2fbd4734a27673cf6c4c4 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 2 Apr 2025 16:45:32 -0700 Subject: [PATCH 6/6] add option for real random bytes Signed-off-by: Noah Watkins --- cmd/kgo-verifier/main.go | 4 +++- pkg/worker/verifier/producer_worker.go | 6 +++++- pkg/worker/worker.go | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd/kgo-verifier/main.go b/cmd/kgo-verifier/main.go index 59f47aa..0ec1290 100644 --- a/cmd/kgo-verifier/main.go +++ b/cmd/kgo-verifier/main.go @@ -75,6 +75,8 @@ var ( tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.") compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not. This will suppress warnings about offset gaps in consumed values.") validateLatestValues = flag.Bool("validate-latest-values", false, "If true, values consumed by a worker will be validated against the last produced value by a producer. This value should only be set if compaction has been allowed to fully de-duplicate the entirety of the log before consuming.") + + produceRandomBytes = flag.Bool("produce-random-bytes", false, "If true, when generating random values, generate random bytes rather than random ascii") ) func makeWorkerConfig() worker.WorkerConfig { @@ -249,7 +251,7 @@ func main() { if *pCount > 0 { log.Info("Starting producer...") - pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability) + pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability, *produceRandomBytes) pw := verifier.NewProducerWorker(pwc) if *useTransactions { diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index 1eb88d2..319f29b 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -35,7 +35,7 @@ type ProducerConfig struct { } func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32, - messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64) ProducerConfig { + messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64, produceRandomBytes bool) ProducerConfig { return ProducerConfig{ workerCfg: wc, name: name, @@ -52,6 +52,7 @@ func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32, PayloadSize: uint64(messageSize), Compressible: wc.CompressiblePayload, TombstoneProbability: tombstoneProbability, + ProduceRandomBytes: produceRandomBytes, }, } } @@ -134,6 +135,9 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record payload = value.Bytes() } else if pw.config.valueGenerator.Compressible { payload = pw.config.valueGenerator.GenerateCompressible() + } else if pw.config.valueGenerator.ProduceRandomBytes { + payload = make([]byte, pw.config.valueGenerator.PayloadSize) + rand.Read(payload) } else { payload = pw.config.valueGenerator.GenerateRandom() } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 1404811..4496486 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -104,6 +104,7 @@ type ValueGenerator struct { Compressible bool TombstoneProbability float64 RandomData []byte + ProduceRandomBytes bool } var compressible_payload []byte