diff --git a/cmd/broker/main.go b/cmd/broker/main.go index f291c86..d0c2914 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -40,6 +40,7 @@ import ( "github.com/KafScale/platform/pkg/protocol" "github.com/KafScale/platform/pkg/storage" "golang.org/x/sync/semaphore" + "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -61,7 +62,8 @@ type handler struct { s3 storage.S3Client cache *cache.SegmentCache logs map[string]map[int32]*storage.PartitionLog - logMu sync.Mutex + logMu sync.RWMutex + logInit singleflight.Group logConfig storage.PartitionLogConfig coordinator *broker.GroupCoordinator s3Health *broker.S3HealthMonitor @@ -1943,20 +1945,61 @@ func (h *handler) ensureTopic(ctx context.Context, topic string, partition int32 } func (h *handler) getPartitionLog(ctx context.Context, topic string, partition int32) (*storage.PartitionLog, error) { - for { - h.logMu.Lock() - partitions := h.logs[topic] - if partitions == nil { - partitions = make(map[int32]*storage.PartitionLog) - h.logs[topic] = partitions + h.logMu.RLock() + if partitions, ok := h.logs[topic]; ok { + if plog, ok := partitions[partition]; ok { + h.logMu.RUnlock() + return plog, nil } - if log, ok := partitions[partition]; ok { + } + h.logMu.RUnlock() + + // Requests for other partitions proceed in parallel; only one goroutine + // per partition does the actual initialization. + for { + key := fmt.Sprintf("%s/%d", topic, partition) + result, err, _ := h.logInit.Do(key, func() (interface{}, error) { + // Double-check under read lock in case another goroutine just finished. + h.logMu.RLock() + if partitions, ok := h.logs[topic]; ok { + if plog, ok := partitions[partition]; ok { + h.logMu.RUnlock() + return plog, nil + } + } + h.logMu.RUnlock() + + // All I/O happens outside the lock. + nextOffset, err := h.store.NextOffset(ctx, topic, partition) + if err != nil { + return nil, err + } + plog := storage.NewPartitionLog(h.s3Namespace, topic, partition, nextOffset, h.s3, h.cache, h.logConfig, func(cbCtx context.Context, artifact *storage.SegmentArtifact) { + if err := h.store.UpdateOffsets(cbCtx, topic, partition, artifact.LastOffset); err != nil { + h.logger.Error("update offsets failed", "error", err, "topic", topic, "partition", partition) + } + }, h.recordS3Op, h.s3sem) + lastOffset, err := plog.RestoreFromS3(ctx) + if err != nil { + h.logger.Error("restore partition log from S3 failed", "topic", topic, "partition", partition, "error", err) + return nil, err + } + if lastOffset >= nextOffset { + if err := h.store.UpdateOffsets(ctx, topic, partition, lastOffset); err != nil { + h.logger.Error("sync offsets from S3 failed", "error", err, "topic", topic, "partition", partition) + } + } + + h.logMu.Lock() + if h.logs[topic] == nil { + h.logs[topic] = make(map[int32]*storage.PartitionLog) + } + h.logs[topic][partition] = plog h.logMu.Unlock() - return log, nil - } - nextOffset, err := h.store.NextOffset(ctx, topic, partition) + + return plog, nil + }) if err != nil { - h.logMu.Unlock() if errors.Is(err, metadata.ErrUnknownTopic) && h.autoCreateTopics { if err := h.ensureTopic(ctx, topic, partition); err != nil { return nil, err @@ -1965,25 +2008,7 @@ func (h *handler) getPartitionLog(ctx context.Context, topic string, partition i } return nil, err } - plog := storage.NewPartitionLog(h.s3Namespace, topic, partition, nextOffset, h.s3, h.cache, h.logConfig, func(cbCtx context.Context, artifact *storage.SegmentArtifact) { - if err := h.store.UpdateOffsets(cbCtx, topic, partition, artifact.LastOffset); err != nil { - h.logger.Error("update offsets failed", "error", err, "topic", topic, "partition", partition) - } - }, h.recordS3Op, h.s3sem) - lastOffset, err := plog.RestoreFromS3(ctx) - if err != nil { - h.logger.Error("restore partition log from S3 failed", "topic", topic, "partition", partition, "error", err) - h.logMu.Unlock() - return nil, err - } - if lastOffset >= nextOffset { - if err := h.store.UpdateOffsets(ctx, topic, partition, lastOffset); err != nil { - h.logger.Error("sync offsets from S3 failed", "error", err, "topic", topic, "partition", partition) - } - } - partitions[partition] = plog - h.logMu.Unlock() - return plog, nil + return result.(*storage.PartitionLog), nil } }