From 175dac29ccfeffe56fb87bc0cb1c606c7eea82a1 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 29 Jul 2025 10:37:33 +0100 Subject: [PATCH] fix race condition when /status is polled before workers are registered Also return an error on the status endpoint if workers are not fully registered yet. Our callers expect that there is always at least one worker report on the status endpoint. --- cmd/kgo-verifier/main.go | 116 ++++++++++++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 27 deletions(-) diff --git a/cmd/kgo-verifier/main.go b/cmd/kgo-verifier/main.go index 0ec1290..4fbf950 100644 --- a/cmd/kgo-verifier/main.go +++ b/cmd/kgo-verifier/main.go @@ -12,6 +12,7 @@ package main import ( "context" "encoding/json" + "errors" "flag" "fmt" "net/http" @@ -103,6 +104,79 @@ func makeWorkerConfig() worker.WorkerConfig { return c } +type workersGroup struct { + workers []worker.Worker + ready bool + mu sync.Mutex +} + +func (w *workersGroup) Add(worker worker.Worker) { + w.mu.Lock() + defer w.mu.Unlock() + w.workers = append(w.workers, worker) +} + +func (w *workersGroup) SetReady() { + w.mu.Lock() + defer w.mu.Unlock() + + if len(w.workers) == 0 { + util.Die("No workers added to the group, cannot set ready") + } + + if w.ready { + util.Die("Workers group is already marked as ready") + } + + w.ready = true +} + +func (w *workersGroup) StatusJson() ([]byte, error) { + if !w.ready { + return nil, errors.New("workers group is not marked as ready yet") + } + + var results []interface{} + var locks []*sync.Mutex + for _, v := range w.workers { + status, lock := v.GetStatus() + results = append(results, status) + locks = append(locks, lock) + } + + for _, lock := range locks { + lock.Lock() + } + + defer func() { + for _, lock := range locks { + lock.Unlock() + } + }() + + serialized, err := json.MarshalIndent(results, "", " ") + if err != nil { + return nil, fmt.Errorf("failed to serialize worker status: %w", err) + } + + return serialized, nil +} + +func (w *workersGroup) ResetStats() error { + w.mu.Lock() + defer w.mu.Unlock() + + if !w.ready { + return errors.New("workers group is not marked as ready yet") + } + + for _, worker := range w.workers { + worker.ResetStats() + } + + return nil +} + func main() { flag.Parse() @@ -155,39 +229,23 @@ func main() { nPartitions := int32(len(t.Partitions)) log.Debugf("Targeting topic %s with %d partitions", *topic, nPartitions) - var workers []worker.Worker + var workers workersGroup mux := http.NewServeMux() mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { - var results []interface{} - var locks []*sync.Mutex - for _, v := range workers { - status, lock := v.GetStatus() - results = append(results, status) - locks = append(locks, lock) - } - - for _, lock := range locks { - lock.Lock() - } - - serialized, err := json.MarshalIndent(results, "", " ") - - for _, lock := range locks { - lock.Unlock() + serialized, err := workers.StatusJson() + if err != nil { + log.Errorf("Failed to serialize worker status: %v", err) + http.Error(w, fmt.Sprintf("Failed to serialize worker status: %v", err), http.StatusInternalServerError) + return } - - util.Chk(err, "Status serialization error") - w.WriteHeader(http.StatusOK) w.Write(serialized) }) mux.HandleFunc("/reset", func(w http.ResponseWriter, r *http.Request) { log.Info("Remote request /reset") - for _, v := range workers { - v.ResetStats() - } + workers.ResetStats() w.WriteHeader(http.StatusOK) }) @@ -259,7 +317,8 @@ func main() { pw.EnableTransactions(tconfig) } - workers = append(workers, &pw) + workers.Add(&pw) + workers.SetReady() waitErr := pw.Wait() util.Chk(err, "Producer error: %v", waitErr) log.Info("Finished producer.") @@ -268,7 +327,8 @@ func main() { makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount, (*consumeTputMb)*1024*1024, ), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions)) - workers = append(workers, &srw) + workers.Add(&srw) + workers.SetReady() for loopState.Next() { log.Info("Starting sequential read pass") @@ -288,8 +348,9 @@ func main() { ) worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions)) randomWorkers = append(randomWorkers, &worker) - workers = append(workers, &worker) + workers.Add(&worker) } + workers.SetReady() for loopState.Next() { for _, w := range randomWorkers { @@ -316,7 +377,8 @@ func main() { verifier.NewGroupReadConfig( makeWorkerConfig(), *cgName, nPartitions, *cgReaders, *seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions)) - workers = append(workers, &grw) + workers.Add(&grw) + workers.SetReady() for loopState.Next() { log.Info("Starting group read pass")