diff --git a/cmd/kgo-verifier/main.go b/cmd/kgo-verifier/main.go index 59f47aa..0ec1290 100644 --- a/cmd/kgo-verifier/main.go +++ b/cmd/kgo-verifier/main.go @@ -75,6 +75,8 @@ var ( tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.") compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not. This will suppress warnings about offset gaps in consumed values.") validateLatestValues = flag.Bool("validate-latest-values", false, "If true, values consumed by a worker will be validated against the last produced value by a producer. This value should only be set if compaction has been allowed to fully de-duplicate the entirety of the log before consuming.") + + produceRandomBytes = flag.Bool("produce-random-bytes", false, "If true, when generating random values, generate random bytes rather than random ascii") ) func makeWorkerConfig() worker.WorkerConfig { @@ -249,7 +251,7 @@ func main() { if *pCount > 0 { log.Info("Starting producer...") - pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability) + pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability, *produceRandomBytes) pw := verifier.NewProducerWorker(pwc) if *useTransactions { diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index c53bc58..319f29b 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -35,7 +35,7 @@ type ProducerConfig struct { } func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32, - messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64) ProducerConfig { + messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64, produceRandomBytes bool) ProducerConfig { return ProducerConfig{ workerCfg: wc, name: name, @@ -52,6 +52,7 @@ func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32, PayloadSize: uint64(messageSize), Compressible: wc.CompressiblePayload, TombstoneProbability: tombstoneProbability, + ProduceRandomBytes: produceRandomBytes, }, } } @@ -62,8 +63,6 @@ type ProducerWorker struct { validOffsets TopicOffsetRanges latestValueProduced LatestValueMap - payload []byte - // Used for enabling transactional produces transactionsEnabled bool transactionSTMConfig worker.TransactionSTMConfig @@ -94,7 +93,6 @@ func NewProducerWorker(cfg ProducerConfig) ProducerWorker { Status: NewProducerWorkerStatus(cfg.workerCfg.Topic), latestValueProduced: NewLatestValueMap(cfg.workerCfg.Topic, cfg.nPartitions), validOffsets: validOffsets, - payload: cfg.valueGenerator.Generate(), churnProducers: cfg.messagesPerProducerId > 0, tolerateDataLoss: cfg.workerCfg.TolerateDataLoss, tolerateFailedProduce: cfg.workerCfg.TolerateFailedProduce, @@ -135,8 +133,13 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record value.Write(make([]byte, paddingSize)) } payload = value.Bytes() + } else if pw.config.valueGenerator.Compressible { + payload = pw.config.valueGenerator.GenerateCompressible() + } else if pw.config.valueGenerator.ProduceRandomBytes { + payload = make([]byte, pw.config.valueGenerator.PayloadSize) + rand.Read(payload) } else { - payload = make([]byte, pw.config.messageSize) + payload = pw.config.valueGenerator.GenerateRandom() } } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 10f31a3..4496486 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -103,52 +103,74 @@ type ValueGenerator struct { PayloadSize uint64 Compressible bool TombstoneProbability float64 + RandomData []byte + ProduceRandomBytes bool } var compressible_payload []byte +func min(vars ...int) int { + v := vars[0] + for _, i := range vars { + if i < v { + v = i + } + } + return v +} + +func (vg *ValueGenerator) GenerateRandom() []byte { + if vg.RandomData == nil { + // 2mb should be enough + data := make([]byte, 1<<21) + for i := range data { + // printable ascii range + data[i] = byte(rand.Intn(126+1-32) + 32) + } + vg.RandomData = data + } + frag_size := 1024 // didn't seem to have much perf impact + size := int(vg.PayloadSize) + res := make([]byte, 0, size) + for { + remaining := size - len(res) + if remaining == 0 { + break + } + start := rand.Intn(size) + view := vg.RandomData[start:] + end := min(len(view), remaining, frag_size) + res = append(res, view[:end]...) + } + return res +} + +func (vg *ValueGenerator) GenerateCompressible() []byte { + // Zeros, which is about as compressible as an array can be. + if len(compressible_payload) == 0 { + compressible_payload = make([]byte, vg.PayloadSize) + } else if len(compressible_payload) != int(vg.PayloadSize) { + // This is an implementation shortcut that lets us use a simple + // global array of zeros for compressible payloads, as long + // as everyone wants the same size. + panic("Can't have multiple compressible generators of different sizes") + } + + // Everyone who asks for compressible payload gets a ref to the same array + // of zeros: this is worthwhile because a compressible producer might do + // huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB. + return compressible_payload +} + func (vg *ValueGenerator) Generate() []byte { isTombstone := rand.Float64() < vg.TombstoneProbability if isTombstone { return nil } if vg.Compressible { - // Zeros, which is about as compressible as an array can be. - if len(compressible_payload) == 0 { - compressible_payload = make([]byte, vg.PayloadSize) - } else if len(compressible_payload) != int(vg.PayloadSize) { - // This is an implementation shortcut that lets us use a simple - // global array of zeros for compressible payloads, as long - // as everyone wants the same size. - panic("Can't have multiple compressible generators of different sizes") - } - - // Everyone who asks for compressible payload gets a ref to the same array - // of zeros: this is worthwhile because a compressible producer might do - // huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB. - return compressible_payload + return vg.GenerateCompressible() } else { - randBytes := make([]byte, vg.PayloadSize) - // An incompressible high entropy payload. This will likely not be UTF-8 decodable. - n, err := rand.Read(randBytes) - if err != nil { - panic(err.Error()) - } - if n != int(vg.PayloadSize) { - panic("Unexpected byte count from rand.Read") - } - // Convert to a valid UTF-8 string, replacing bad chars with " ". - // A valid UTF-8 string is needed to avoid any decoding issues - // for services on the consuming end. - payload := []byte(strings.ToValidUTF8(string(randBytes), " ")) - - // In converting to valid UTF-8, we may have lost some bytes. - // Append back the difference. - diff := int(vg.PayloadSize) - len(payload) - if diff > 0 { - payload = append(payload, make([]byte, diff)...) - } - return payload + return vg.GenerateRandom() } } diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go new file mode 100644 index 0000000..f106308 --- /dev/null +++ b/pkg/worker/worker_test.go @@ -0,0 +1,100 @@ +package worker + +import ( + "math/rand" + "strings" + "testing" +) + +type state struct { + data []byte +} + +func newstate(size int) state { + data := make([]byte, size) + for i := range data { + // printable ascii range + data[i] = byte(rand.Intn(126+1-32) + 32) + } + return state{ + data: data, + } +} + +func (s *state) generate(size, frag_size int) []byte { + res := make([]byte, 0, size) + for { + remaining := size - len(res) + if remaining == 0 { + break + } + start := rand.Intn(size) + view := s.data[start:] + end := min(len(view), remaining, frag_size) + res = append(res, view[:end]...) + } + return res +} + +func benchmark_old_random_payload(i int, b *testing.B) { + for n := 0; n < b.N; n++ { + randBytes := make([]byte, i) + // An incompressible high entropy payload. This will likely not be UTF-8 decodable. + n, err := rand.Read(randBytes) + if err != nil { + panic(err.Error()) + } + if n != int(i) { + panic("Unexpected byte count from rand.Read") + } + // Convert to a valid UTF-8 string, replacing bad chars with " ". + // A valid UTF-8 string is needed to avoid any decoding issues + // for services on the consuming end. + payload := []byte(strings.ToValidUTF8(string(randBytes), " ")) + + // In converting to valid UTF-8, we may have lost some bytes. + // Append back the difference. + diff := int(i) - len(payload) + if diff > 0 { + payload = append(payload, make([]byte, diff)...) + } + } +} + +func benchmark_random_payload(i int, b *testing.B) { + s := newstate(1 << 22) + b.ResetTimer() + for n := 0; n < b.N; n++ { + s.generate(i, 1024) + } +} + +func benchmark_empty_payload(i int, b *testing.B) { + gen := func() []byte { + return make([]byte, i) + } + for n := 0; n < b.N; n++ { + gen() + } +} + +func Benchmark_old_random_payload1(b *testing.B) { benchmark_old_random_payload(10, b) } +func Benchmark_old_random_payload2(b *testing.B) { benchmark_old_random_payload(100, b) } +func Benchmark_old_random_payload3(b *testing.B) { benchmark_old_random_payload(1000, b) } +func Benchmark_old_random_payload10(b *testing.B) { benchmark_old_random_payload(10000, b) } +func Benchmark_old_random_payload20(b *testing.B) { benchmark_old_random_payload(100000, b) } +func Benchmark_old_random_payload40(b *testing.B) { benchmark_old_random_payload(1000000, b) } + +func Benchmark_random_payload1(b *testing.B) { benchmark_random_payload(10, b) } +func Benchmark_random_payload2(b *testing.B) { benchmark_random_payload(100, b) } +func Benchmark_random_payload3(b *testing.B) { benchmark_random_payload(1000, b) } +func Benchmark_random_payload10(b *testing.B) { benchmark_random_payload(10000, b) } +func Benchmark_random_payload20(b *testing.B) { benchmark_random_payload(100000, b) } +func Benchmark_random_payload40(b *testing.B) { benchmark_random_payload(1000000, b) } + +func Benchmark_empty_payload1(b *testing.B) { benchmark_empty_payload(10, b) } +func Benchmark_empty_payload2(b *testing.B) { benchmark_empty_payload(100, b) } +func Benchmark_empty_payload3(b *testing.B) { benchmark_empty_payload(1000, b) } +func Benchmark_empty_payload10(b *testing.B) { benchmark_empty_payload(10000, b) } +func Benchmark_empty_payload20(b *testing.B) { benchmark_empty_payload(100000, b) } +func Benchmark_empty_payload40(b *testing.B) { benchmark_empty_payload(1000000, b) }