From cc9d7f6621892c045969caec20e4ff9674d207c6 Mon Sep 17 00:00:00 2001 From: Nico Duldhardt Date: Sat, 28 Feb 2026 10:26:47 +0100 Subject: [PATCH] fix: eliminate head-of-line blocking in getPartitionLog logMu was held as an exclusive lock across etcd and S3 I/O during partition initialization. A single slow RestoreFromS3 (1+2N S3 round-trips for N segments) blocked all produce, fetch, and list-offsets requests broker-wide. Replace sync.Mutex with sync.RWMutex so the fast path (partition already initialized) uses a shared read lock. Move all I/O outside the lock and use singleflight.Group to deduplicate concurrent initialization per partition without blocking other partitions. --- cmd/broker/main.go | 87 +++++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 2448aa3..087c73a 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -40,6 +40,7 @@ import ( "github.com/KafScale/platform/pkg/protocol" "github.com/KafScale/platform/pkg/storage" "golang.org/x/sync/semaphore" + "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -66,7 +67,8 @@ type handler struct { s3 storage.S3Client cache *cache.SegmentCache logs map[string]map[int32]*storage.PartitionLog - logMu sync.Mutex + logMu sync.RWMutex + logInit singleflight.Group logConfig storage.PartitionLogConfig coordinator *broker.GroupCoordinator s3Health *broker.S3HealthMonitor @@ -1948,20 +1950,61 @@ func (h *handler) ensureTopic(ctx context.Context, topic string, partition int32 } func (h *handler) getPartitionLog(ctx context.Context, topic string, partition int32) (*storage.PartitionLog, error) { - for { - h.logMu.Lock() - partitions := h.logs[topic] - if partitions == nil { - partitions = make(map[int32]*storage.PartitionLog) - h.logs[topic] = partitions + h.logMu.RLock() + if partitions, ok := h.logs[topic]; ok { + if plog, ok := partitions[partition]; ok { + h.logMu.RUnlock() + return plog, nil } - if log, ok := partitions[partition]; ok { + } + h.logMu.RUnlock() + + // Requests for other partitions proceed in parallel; only one goroutine + // per partition does the actual initialization. + for { + key := fmt.Sprintf("%s/%d", topic, partition) + result, err, _ := h.logInit.Do(key, func() (interface{}, error) { + // Double-check under read lock in case another goroutine just finished. + h.logMu.RLock() + if partitions, ok := h.logs[topic]; ok { + if plog, ok := partitions[partition]; ok { + h.logMu.RUnlock() + return plog, nil + } + } + h.logMu.RUnlock() + + // All I/O happens outside the lock. + nextOffset, err := h.store.NextOffset(ctx, topic, partition) + if err != nil { + return nil, err + } + plog := storage.NewPartitionLog(h.s3Namespace, topic, partition, nextOffset, h.s3, h.cache, h.logConfig, func(cbCtx context.Context, artifact *storage.SegmentArtifact) { + if err := h.store.UpdateOffsets(cbCtx, topic, partition, artifact.LastOffset); err != nil { + h.logger.Error("update offsets failed", "error", err, "topic", topic, "partition", partition) + } + }, h.recordS3Op, h.s3sem) + lastOffset, err := plog.RestoreFromS3(ctx) + if err != nil { + h.logger.Error("restore partition log from S3 failed", "topic", topic, "partition", partition, "error", err) + return nil, err + } + if lastOffset >= nextOffset { + if err := h.store.UpdateOffsets(ctx, topic, partition, lastOffset); err != nil { + h.logger.Error("sync offsets from S3 failed", "error", err, "topic", topic, "partition", partition) + } + } + + h.logMu.Lock() + if h.logs[topic] == nil { + h.logs[topic] = make(map[int32]*storage.PartitionLog) + } + h.logs[topic][partition] = plog h.logMu.Unlock() - return log, nil - } - nextOffset, err := h.store.NextOffset(ctx, topic, partition) + + return plog, nil + }) if err != nil { - h.logMu.Unlock() if errors.Is(err, metadata.ErrUnknownTopic) && h.autoCreateTopics { if err := h.ensureTopic(ctx, topic, partition); err != nil { return nil, err @@ -1970,25 +2013,7 @@ func (h *handler) getPartitionLog(ctx context.Context, topic string, partition i } return nil, err } - plog := storage.NewPartitionLog(h.s3Namespace, topic, partition, nextOffset, h.s3, h.cache, h.logConfig, func(cbCtx context.Context, artifact *storage.SegmentArtifact) { - if err := h.store.UpdateOffsets(cbCtx, topic, partition, artifact.LastOffset); err != nil { - h.logger.Error("update offsets failed", "error", err, "topic", topic, "partition", partition) - } - }, h.recordS3Op, h.s3sem) - lastOffset, err := plog.RestoreFromS3(ctx) - if err != nil { - h.logger.Error("restore partition log from S3 failed", "topic", topic, "partition", partition, "error", err) - h.logMu.Unlock() - return nil, err - } - if lastOffset >= nextOffset { - if err := h.store.UpdateOffsets(ctx, topic, partition, lastOffset); err != nil { - h.logger.Error("sync offsets from S3 failed", "error", err, "topic", topic, "partition", partition) - } - } - partitions[partition] = plog - h.logMu.Unlock() - return plog, nil + return result.(*storage.PartitionLog), nil } }