Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ var (
tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.")
compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not. This will suppress warnings about offset gaps in consumed values.")
validateLatestValues = flag.Bool("validate-latest-values", false, "If true, values consumed by a worker will be validated against the last produced value by a producer. This value should only be set if compaction has been allowed to fully de-duplicate the entirety of the log before consuming.")

produceRandomBytes = flag.Bool("produce-random-bytes", false, "If true, when generating random values, generate random bytes rather than random ascii")
)

func makeWorkerConfig() worker.WorkerConfig {
Expand Down Expand Up @@ -249,7 +251,7 @@ func main() {

if *pCount > 0 {
log.Info("Starting producer...")
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability)
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability, *produceRandomBytes)
pw := verifier.NewProducerWorker(pwc)

if *useTransactions {
Expand Down
13 changes: 8 additions & 5 deletions pkg/worker/verifier/producer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ProducerConfig struct {
}

func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32,
messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64) ProducerConfig {
messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64, produceRandomBytes bool) ProducerConfig {
return ProducerConfig{
workerCfg: wc,
name: name,
Expand All @@ -52,6 +52,7 @@ func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32,
PayloadSize: uint64(messageSize),
Compressible: wc.CompressiblePayload,
TombstoneProbability: tombstoneProbability,
ProduceRandomBytes: produceRandomBytes,
},
}
}
Expand All @@ -62,8 +63,6 @@ type ProducerWorker struct {
validOffsets TopicOffsetRanges
latestValueProduced LatestValueMap

payload []byte

// Used for enabling transactional produces
transactionsEnabled bool
transactionSTMConfig worker.TransactionSTMConfig
Expand Down Expand Up @@ -94,7 +93,6 @@ func NewProducerWorker(cfg ProducerConfig) ProducerWorker {
Status: NewProducerWorkerStatus(cfg.workerCfg.Topic),
latestValueProduced: NewLatestValueMap(cfg.workerCfg.Topic, cfg.nPartitions),
validOffsets: validOffsets,
payload: cfg.valueGenerator.Generate(),
churnProducers: cfg.messagesPerProducerId > 0,
tolerateDataLoss: cfg.workerCfg.TolerateDataLoss,
tolerateFailedProduce: cfg.workerCfg.TolerateFailedProduce,
Expand Down Expand Up @@ -135,8 +133,13 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record
value.Write(make([]byte, paddingSize))
}
payload = value.Bytes()
} else if pw.config.valueGenerator.Compressible {
payload = pw.config.valueGenerator.GenerateCompressible()
} else if pw.config.valueGenerator.ProduceRandomBytes {
payload = make([]byte, pw.config.valueGenerator.PayloadSize)
rand.Read(payload)
} else {
payload = make([]byte, pw.config.messageSize)
payload = pw.config.valueGenerator.GenerateRandom()
}
}

Expand Down
92 changes: 57 additions & 35 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,52 +103,74 @@ type ValueGenerator struct {
PayloadSize uint64
Compressible bool
TombstoneProbability float64
RandomData []byte
ProduceRandomBytes bool
}

var compressible_payload []byte

func min(vars ...int) int {
v := vars[0]
for _, i := range vars {
if i < v {
v = i
}
}
return v
}

func (vg *ValueGenerator) GenerateRandom() []byte {
if vg.RandomData == nil {
// 2mb should be enough
data := make([]byte, 1<<21)
for i := range data {
// printable ascii range
data[i] = byte(rand.Intn(126+1-32) + 32)
}
vg.RandomData = data
}
frag_size := 1024 // didn't seem to have much perf impact
size := int(vg.PayloadSize)
res := make([]byte, 0, size)
for {
remaining := size - len(res)
if remaining == 0 {
break
}
start := rand.Intn(size)
view := vg.RandomData[start:]
end := min(len(view), remaining, frag_size)
res = append(res, view[:end]...)
}
return res
}

func (vg *ValueGenerator) GenerateCompressible() []byte {
// Zeros, which is about as compressible as an array can be.
if len(compressible_payload) == 0 {
compressible_payload = make([]byte, vg.PayloadSize)
} else if len(compressible_payload) != int(vg.PayloadSize) {
// This is an implementation shortcut that lets us use a simple
// global array of zeros for compressible payloads, as long
// as everyone wants the same size.
panic("Can't have multiple compressible generators of different sizes")
}

// Everyone who asks for compressible payload gets a ref to the same array
// of zeros: this is worthwhile because a compressible producer might do
// huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB.
return compressible_payload
}

func (vg *ValueGenerator) Generate() []byte {
isTombstone := rand.Float64() < vg.TombstoneProbability
if isTombstone {
return nil
}
if vg.Compressible {
// Zeros, which is about as compressible as an array can be.
if len(compressible_payload) == 0 {
compressible_payload = make([]byte, vg.PayloadSize)
} else if len(compressible_payload) != int(vg.PayloadSize) {
// This is an implementation shortcut that lets us use a simple
// global array of zeros for compressible payloads, as long
// as everyone wants the same size.
panic("Can't have multiple compressible generators of different sizes")
}

// Everyone who asks for compressible payload gets a ref to the same array
// of zeros: this is worthwhile because a compressible producer might do
// huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB.
return compressible_payload
return vg.GenerateCompressible()
} else {
randBytes := make([]byte, vg.PayloadSize)
// An incompressible high entropy payload. This will likely not be UTF-8 decodable.
n, err := rand.Read(randBytes)
if err != nil {
panic(err.Error())
}
if n != int(vg.PayloadSize) {
panic("Unexpected byte count from rand.Read")
}
// Convert to a valid UTF-8 string, replacing bad chars with " ".
// A valid UTF-8 string is needed to avoid any decoding issues
// for services on the consuming end.
payload := []byte(strings.ToValidUTF8(string(randBytes), " "))

// In converting to valid UTF-8, we may have lost some bytes.
// Append back the difference.
diff := int(vg.PayloadSize) - len(payload)
if diff > 0 {
payload = append(payload, make([]byte, diff)...)
}
return payload
return vg.GenerateRandom()
}
}

Expand Down
100 changes: 100 additions & 0 deletions pkg/worker/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package worker

import (
"math/rand"
"strings"
"testing"
)

type state struct {
data []byte
}

func newstate(size int) state {
data := make([]byte, size)
for i := range data {
// printable ascii range
data[i] = byte(rand.Intn(126+1-32) + 32)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any good way to reuse the definition in valueGenerator? not a huge deal if not

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dunno. just choosing my battles

}
return state{
data: data,
}
}

func (s *state) generate(size, frag_size int) []byte {
res := make([]byte, 0, size)
for {
remaining := size - len(res)
if remaining == 0 {
break
}
start := rand.Intn(size)
view := s.data[start:]
end := min(len(view), remaining, frag_size)
res = append(res, view[:end]...)
}
return res
}

func benchmark_old_random_payload(i int, b *testing.B) {
for n := 0; n < b.N; n++ {
randBytes := make([]byte, i)
// An incompressible high entropy payload. This will likely not be UTF-8 decodable.
n, err := rand.Read(randBytes)
if err != nil {
panic(err.Error())
}
if n != int(i) {
panic("Unexpected byte count from rand.Read")
}
// Convert to a valid UTF-8 string, replacing bad chars with " ".
// A valid UTF-8 string is needed to avoid any decoding issues
// for services on the consuming end.
payload := []byte(strings.ToValidUTF8(string(randBytes), " "))

// In converting to valid UTF-8, we may have lost some bytes.
// Append back the difference.
diff := int(i) - len(payload)
if diff > 0 {
payload = append(payload, make([]byte, diff)...)
}
}
}

func benchmark_random_payload(i int, b *testing.B) {
s := newstate(1 << 22)
b.ResetTimer()
for n := 0; n < b.N; n++ {
s.generate(i, 1024)
}
}

func benchmark_empty_payload(i int, b *testing.B) {
gen := func() []byte {
return make([]byte, i)
}
for n := 0; n < b.N; n++ {
gen()
}
}

func Benchmark_old_random_payload1(b *testing.B) { benchmark_old_random_payload(10, b) }
func Benchmark_old_random_payload2(b *testing.B) { benchmark_old_random_payload(100, b) }
func Benchmark_old_random_payload3(b *testing.B) { benchmark_old_random_payload(1000, b) }
func Benchmark_old_random_payload10(b *testing.B) { benchmark_old_random_payload(10000, b) }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i know this is copypasta from elsewhere but if we're going to check this in we might as well make the Benchmark function names sensible (10,20,40 should be 4, 5, 6 to indicate number of zeros)

func Benchmark_old_random_payload20(b *testing.B) { benchmark_old_random_payload(100000, b) }
func Benchmark_old_random_payload40(b *testing.B) { benchmark_old_random_payload(1000000, b) }

func Benchmark_random_payload1(b *testing.B) { benchmark_random_payload(10, b) }
func Benchmark_random_payload2(b *testing.B) { benchmark_random_payload(100, b) }
func Benchmark_random_payload3(b *testing.B) { benchmark_random_payload(1000, b) }
func Benchmark_random_payload10(b *testing.B) { benchmark_random_payload(10000, b) }
func Benchmark_random_payload20(b *testing.B) { benchmark_random_payload(100000, b) }
func Benchmark_random_payload40(b *testing.B) { benchmark_random_payload(1000000, b) }

func Benchmark_empty_payload1(b *testing.B) { benchmark_empty_payload(10, b) }
func Benchmark_empty_payload2(b *testing.B) { benchmark_empty_payload(100, b) }
func Benchmark_empty_payload3(b *testing.B) { benchmark_empty_payload(1000, b) }
func Benchmark_empty_payload10(b *testing.B) { benchmark_empty_payload(10000, b) }
func Benchmark_empty_payload20(b *testing.B) { benchmark_empty_payload(100000, b) }
func Benchmark_empty_payload40(b *testing.B) { benchmark_empty_payload(1000000, b) }