From 33f633718725071a5aee5d05139c72488d8cc6c6 Mon Sep 17 00:00:00 2001 From: Nico Duldhardt Date: Sun, 1 Mar 2026 10:40:50 +0100 Subject: [PATCH] fix: route group coordination requests to a single broker to prevent split brain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #121. Brokers now acquire an etcd lease before coordinating any group operation. Only the lease holder can coordinate — others reject with NOT_COORDINATOR and the proxy retries on the correct broker. Also fixes: silent error swallowing on transient etcd failures, wrong byte-offset NOT_COORDINATOR detection on flexible protocol versions, false-positive byte scanning, DescribeGroups multi-group retry loop, and connect-failure retry abort in the proxy. --- cmd/broker/main.go | 124 ++++++++++- cmd/proxy/main.go | 115 +++++++++++ cmd/proxy/main_test.go | 158 ++++++++++++++ pkg/metadata/group_lease.go | 92 +++++++++ pkg/metadata/group_lease_test.go | 245 ++++++++++++++++++++++ pkg/metadata/group_router.go | 176 ++++++++++++++++ pkg/metadata/group_router_test.go | 177 ++++++++++++++++ pkg/metadata/lease_manager.go | 332 ++++++++++++++++++++++++++++++ pkg/metadata/partition_lease.go | 319 +++------------------------- pkg/protocol/errors.go | 1 + pkg/protocol/response.go | 204 ++++++++++++++++++ pkg/protocol/response_test.go | 250 ++++++++++++++++++++++ 12 files changed, 1891 insertions(+), 302 deletions(-) create mode 100644 pkg/metadata/group_lease.go create mode 100644 pkg/metadata/group_lease_test.go create mode 100644 pkg/metadata/group_router.go create mode 100644 pkg/metadata/group_router_test.go create mode 100644 pkg/metadata/lease_manager.go diff --git a/cmd/broker/main.go b/cmd/broker/main.go index fd0cf17..1b491b4 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -67,6 +67,7 @@ type handler struct { logConfig storage.PartitionLogConfig coordinator *broker.GroupCoordinator leaseManager *metadata.PartitionLeaseManager + groupLeaseManager *metadata.GroupLeaseManager s3Health *broker.S3HealthMonitor s3Namespace string brokerInfo protocol.MetadataBroker @@ -224,10 +225,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, }, header.APIVersion) } + if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 { + return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{ + CorrelationID: header.CorrelationID, + ErrorCode: errCode, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{ CorrelationID: header.CorrelationID, - ThrottleMs: 0, ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } @@ -246,10 +252,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, }, header.APIVersion) } + if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 { + return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{ + CorrelationID: header.CorrelationID, + ErrorCode: errCode, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{ CorrelationID: header.CorrelationID, - ThrottleMs: 0, ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } @@ -263,13 +274,18 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { allowed := make([]string, 0, len(req.Groups)) denied := make(map[string]struct{}) + leaseErrors := make(map[string]int16) for _, groupID := range req.Groups { - if h.allowGroup(principal, groupID, acl.ActionGroupRead) { - allowed = append(allowed, groupID) - } else { + if !h.allowGroup(principal, groupID, acl.ActionGroupRead) { denied[groupID] = struct{}{} h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, groupID) + continue + } + if errCode := h.acquireGroupLease(ctx, groupID); errCode != 0 { + leaseErrors[groupID] = errCode + continue } + allowed = append(allowed, groupID) } responseByGroup := make(map[string]protocol.DescribeGroupsResponseGroup, len(req.Groups)) @@ -296,13 +312,20 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re results := make([]protocol.DescribeGroupsResponseGroup, 0, len(req.Groups)) for _, groupID := range req.Groups { - if _, denied := denied[groupID]; denied { + if _, ok := denied[groupID]; ok { results = append(results, protocol.DescribeGroupsResponseGroup{ ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, GroupID: groupID, }) continue } + if errCode, ok := leaseErrors[groupID]; ok { + results = append(results, protocol.DescribeGroupsResponseGroup{ + ErrorCode: errCode, + GroupID: groupID, + }) + continue + } if group, ok := responseByGroup[groupID]; ok { results = append(results, group) } else { @@ -354,10 +377,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, }, header.APIVersion) } + if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 { + return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{ + CorrelationID: header.CorrelationID, + ErrorCode: errCode, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{ CorrelationID: header.CorrelationID, - ThrottleMs: 0, ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } @@ -372,6 +400,12 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, }) } + if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 { + return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{ + CorrelationID: header.CorrelationID, + ErrorCode: errCode, + }) + } if !h.etcdAvailable() { return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{ CorrelationID: header.CorrelationID, @@ -404,6 +438,26 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re Topics: topics, }) } + if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 { + topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetCommitPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetCommitPartitionResponse{ + Partition: part.Partition, + ErrorCode: errCode, + }) + } + topics = append(topics, protocol.OffsetCommitTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetCommitResponse(&protocol.OffsetCommitResponse{ + CorrelationID: header.CorrelationID, + Topics: topics, + }) + } if !h.etcdAvailable() { topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics)) for _, topic := range req.Topics { @@ -457,6 +511,29 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, }, header.APIVersion) } + if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 { + topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetFetchPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetFetchPartitionResponse{ + Partition: part.Partition, + Offset: -1, + LeaderEpoch: -1, + ErrorCode: errCode, + }) + } + topics = append(topics, protocol.OffsetFetchTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetFetchResponse(&protocol.OffsetFetchResponse{ + CorrelationID: header.CorrelationID, + Topics: topics, + ErrorCode: errCode, + }, header.APIVersion) + } if !h.etcdAvailable() { topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics)) for _, topic := range req.Topics { @@ -951,6 +1028,27 @@ func (h *handler) unauthorizedListOffsets(principal string, header *protocol.Req }) } +// acquireGroupLease attempts to acquire the coordination lease for the given +// group. Returns 0 if this broker is (or becomes) the coordinator. Returns a +// non-zero Kafka error code if the request should be rejected: +// - NOT_COORDINATOR when another broker owns the group or this broker is +// shutting down (so the proxy can redirect to the correct broker). +// - REQUEST_TIMED_OUT for transient errors (etcd timeout, session failure). +func (h *handler) acquireGroupLease(ctx context.Context, groupID string) int16 { + if h.groupLeaseManager == nil { + return 0 + } + err := h.groupLeaseManager.Acquire(ctx, groupID) + if err == nil { + return 0 + } + if errors.Is(err, metadata.ErrNotOwner) || errors.Is(err, metadata.ErrShuttingDown) { + return protocol.NOT_COORDINATOR + } + h.logger.Warn("group lease acquire failed", "group", groupID, "error", err) + return protocol.REQUEST_TIMED_OUT +} + // acquirePartitionLeases acquires leases for all partitions in the request // concurrently. Returns a map of partition -> error for partitions that failed. // Partitions already owned by this broker complete instantly (map lookup). @@ -2097,9 +2195,15 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot health := broker.NewS3HealthMonitor(s3HealthConfigFromEnv()) authorizer := buildAuthorizerFromEnv(logger) var leaseManager *metadata.PartitionLeaseManager + var groupLeaseManager *metadata.GroupLeaseManager if etcdStore, ok := store.(*metadata.EtcdStore); ok { + brokerIDStr := fmt.Sprintf("%d", brokerInfo.NodeID) leaseManager = metadata.NewPartitionLeaseManager(etcdStore.EtcdClient(), metadata.PartitionLeaseConfig{ - BrokerID: fmt.Sprintf("%d", brokerInfo.NodeID), + BrokerID: brokerIDStr, + Logger: logger, + }) + groupLeaseManager = metadata.NewGroupLeaseManager(etcdStore.EtcdClient(), metadata.GroupLeaseConfig{ + BrokerID: brokerIDStr, Logger: logger, }) } @@ -2123,6 +2227,7 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot }, coordinator: broker.NewGroupCoordinator(store, brokerInfo, nil), leaseManager: leaseManager, + groupLeaseManager: groupLeaseManager, s3Health: health, s3Namespace: s3Namespace, brokerInfo: brokerInfo, @@ -2228,6 +2333,9 @@ func main() { if handler.leaseManager != nil { handler.leaseManager.ReleaseAll() } + if handler.groupLeaseManager != nil { + handler.groupLeaseManager.ReleaseAll() + } srv.Wait() } diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 37dc433..1d55d57 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -55,6 +55,7 @@ type proxy struct { cachedBackends []string apiVersions []protocol.ApiVersion router *metadata.PartitionRouter + groupRouter *metadata.GroupRouter brokerAddrMu sync.RWMutex brokerAddrs map[string]string // brokerID -> "host:port" backendRetries int @@ -123,6 +124,13 @@ func main() { p.router = router logger.Info("partition-aware routing enabled") } + groupRouter, err := metadata.NewGroupRouter(ctx, etcdStore.EtcdClient(), logger) + if err != nil { + logger.Warn("group router init failed; using round-robin routing for group ops", "error", err) + } else { + p.groupRouter = groupRouter + logger.Info("group-aware routing enabled") + } } if len(backends) > 0 { p.setCachedBackends(backends) @@ -140,6 +148,9 @@ func main() { if p.router != nil { p.router.Stop() } + if p.groupRouter != nil { + p.groupRouter.Stop() + } } func envOrDefault(key, fallback string) string { @@ -459,6 +470,24 @@ func (p *proxy) handleConnection(ctx context.Context, conn net.Conn) { return } continue + case protocol.APIKeyJoinGroup, + protocol.APIKeySyncGroup, + protocol.APIKeyHeartbeat, + protocol.APIKeyLeaveGroup, + protocol.APIKeyOffsetCommit, + protocol.APIKeyOffsetFetch, + protocol.APIKeyDescribeGroups: + resp, err := p.handleGroupRouting(ctx, header, frame.Payload, pool) + if err != nil { + p.logger.Warn("group routing failed", "error", err) + p.respondBackendError(conn, header, frame.Payload) + return + } + if err := protocol.WriteFrame(conn, resp); err != nil { + p.logger.Warn("write group response failed", "error", err) + return + } + continue default: } @@ -1499,3 +1528,89 @@ func (p *proxy) forwardToBackend(ctx context.Context, conn net.Conn, backendAddr } return frame.Payload, nil } + +// handleGroupRouting routes group-related requests to the broker that owns the +// group coordination lease. If no owner is cached, or the owner returns +// NOT_COORDINATOR, the request is retried on a different broker. +// +// DescribeGroups requests are forwarded once without retry since different +// groups may live on different brokers. The broker returns per-group +// NOT_COORDINATOR errors that the Kafka client handles natively. +func (p *proxy) handleGroupRouting(ctx context.Context, header *protocol.RequestHeader, payload []byte, pool *connPool) ([]byte, error) { + groupID := p.extractGroupID(header.APIKey, payload) + + // DescribeGroups with multiple groups cannot be reliably split/retried at + // the proxy level. Forward once and let the client handle per-group errors. + maxAttempts := 3 + if header.APIKey == protocol.APIKeyDescribeGroups { + maxAttempts = 1 + } + + triedBackends := make(map[string]bool) + + for attempt := 0; attempt < maxAttempts; attempt++ { + targetAddr := "" + if p.groupRouter != nil && groupID != "" { + if ownerID := p.groupRouter.LookupOwner(groupID); ownerID != "" { + targetAddr = p.brokerIDToAddr(ownerID) + } + } + + conn, actualAddr, err := p.connectForAddr(ctx, targetAddr, triedBackends, pool) + if err != nil { + continue + } + triedBackends[actualAddr] = true + + resp, err := p.forwardToBackend(ctx, conn, actualAddr, payload) + if err != nil { + conn.Close() + continue + } + + if p.groupRouter != nil && groupID != "" { + if ec, ok := protocol.GroupResponseErrorCode(header.APIKey, header.APIVersion, resp); ok && ec == protocol.NOT_COORDINATOR { + pool.Return(actualAddr, conn) + p.groupRouter.Invalidate(groupID) + p.logger.Debug("NOT_COORDINATOR, retrying group request", + "group", groupID, "attempt", attempt+1, "broker", actualAddr) + continue + } + } + + pool.Return(actualAddr, conn) + return resp, nil + } + + return nil, fmt.Errorf("group request for %q failed after %d attempts", groupID, maxAttempts) +} + +// extractGroupID parses the request payload to extract the group ID for routing. +// Returns "" if the group ID cannot be determined. +func (p *proxy) extractGroupID(apiKey int16, payload []byte) string { + _, req, err := protocol.ParseRequest(payload) + if err != nil { + return "" + } + switch r := req.(type) { + case *protocol.JoinGroupRequest: + return r.GroupID + case *protocol.SyncGroupRequest: + return r.GroupID + case *protocol.HeartbeatRequest: + return r.GroupID + case *protocol.LeaveGroupRequest: + return r.GroupID + case *protocol.OffsetCommitRequest: + return r.GroupID + case *protocol.OffsetFetchRequest: + return r.GroupID + case *protocol.DescribeGroupsRequest: + if len(r.Groups) > 0 { + return r.Groups[0] + } + return "" + default: + return "" + } +} diff --git a/cmd/proxy/main_test.go b/cmd/proxy/main_test.go index 8dbae6a..6f8300c 100644 --- a/cmd/proxy/main_test.go +++ b/cmd/proxy/main_test.go @@ -591,3 +591,161 @@ func (c *fakeNetConn) RemoteAddr() net.Addr { return nil } func (c *fakeNetConn) SetDeadline(time.Time) error { return nil } func (c *fakeNetConn) SetReadDeadline(time.Time) error { return nil } func (c *fakeNetConn) SetWriteDeadline(time.Time) error { return nil } + +func TestExtractGroupID(t *testing.T) { + p := &proxy{} + clientID := "test-client" + + // Helper to write a non-flexible request header. + writeHeader := func(w *testWriter, apiKey, version int16) { + w.Int16(apiKey) + w.Int16(version) + w.Int32(1) // correlation_id + w.NullableString(&clientID) + } + + tests := []struct { + name string + apiKey int16 + payload func() []byte + want string + }{ + { + name: "JoinGroup", + apiKey: protocol.APIKeyJoinGroup, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyJoinGroup, 2) + w.String("my-join-group") // group_id + w.Int32(30000) // session_timeout + w.Int32(10000) // rebalance_timeout + w.String("") // member_id + w.String("consumer") // protocol_type + w.Int32(1) // protocol count + w.String("range") // protocol name + w.Bytes([]byte{0x00, 0x01}) // protocol metadata + return w.buf.Bytes() + }, + want: "my-join-group", + }, + { + name: "SyncGroup", + apiKey: protocol.APIKeySyncGroup, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeySyncGroup, 1) + w.String("my-sync-group") // group_id + w.Int32(1) // generation_id + w.String("member-1") // member_id + w.Int32(0) // assignments count + return w.buf.Bytes() + }, + want: "my-sync-group", + }, + { + name: "Heartbeat", + apiKey: protocol.APIKeyHeartbeat, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyHeartbeat, 1) + w.String("my-heartbeat-group") // group_id + w.Int32(1) // generation_id + w.String("member-1") // member_id + return w.buf.Bytes() + }, + want: "my-heartbeat-group", + }, + { + name: "LeaveGroup", + apiKey: protocol.APIKeyLeaveGroup, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyLeaveGroup, 0) + w.String("my-leave-group") // group_id + w.String("member-1") // member_id + return w.buf.Bytes() + }, + want: "my-leave-group", + }, + { + name: "OffsetCommit", + apiKey: protocol.APIKeyOffsetCommit, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyOffsetCommit, 3) + w.String("my-commit-group") // group_id + w.Int32(1) // generation_id + w.String("member-1") // member_id + w.Int64(-1) // retention_time_ms (v2-v4) + w.Int32(0) // topics count + return w.buf.Bytes() + }, + want: "my-commit-group", + }, + { + name: "OffsetFetch", + apiKey: protocol.APIKeyOffsetFetch, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyOffsetFetch, 3) + w.String("my-fetch-group") // group_id + w.Int32(0) // topics count + return w.buf.Bytes() + }, + want: "my-fetch-group", + }, + { + name: "DescribeGroups single", + apiKey: protocol.APIKeyDescribeGroups, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyDescribeGroups, 2) + w.Int32(1) // groups count + w.String("my-describe-group-1") // group[0] + return w.buf.Bytes() + }, + want: "my-describe-group-1", + }, + { + name: "DescribeGroups multiple returns first", + apiKey: protocol.APIKeyDescribeGroups, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyDescribeGroups, 2) + w.Int32(2) // groups count + w.String("my-describe-group-1") // group[0] + w.String("my-describe-group-2") // group[1] + return w.buf.Bytes() + }, + want: "my-describe-group-1", + }, + { + name: "DescribeGroups empty returns blank", + apiKey: protocol.APIKeyDescribeGroups, + payload: func() []byte { + w := &testWriter{} + writeHeader(w, protocol.APIKeyDescribeGroups, 2) + w.Int32(0) // groups count + return w.buf.Bytes() + }, + want: "", + }, + { + name: "truncated payload returns blank", + apiKey: protocol.APIKeyJoinGroup, + payload: func() []byte { + return []byte{0, 11, 0, 2} // just api_key + version, no body + }, + want: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := p.extractGroupID(tc.apiKey, tc.payload()) + if got != tc.want { + t.Fatalf("extractGroupID() = %q, want %q", got, tc.want) + } + }) + } +} diff --git a/pkg/metadata/group_lease.go b/pkg/metadata/group_lease.go new file mode 100644 index 0000000..af46192 --- /dev/null +++ b/pkg/metadata/group_lease.go @@ -0,0 +1,92 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "log/slog" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + // groupLeasePrefix is the etcd key prefix for group coordination leases. + groupLeasePrefix = "/kafscale/group-leases" +) + +// GroupLeaseConfig configures the group lease manager. +type GroupLeaseConfig struct { + BrokerID string + LeaseTTLSeconds int + Logger *slog.Logger +} + +// GroupLeaseManager uses etcd leases to ensure exclusive group coordination +// ownership. It delegates to the generic LeaseManager. +type GroupLeaseManager struct { + lm *LeaseManager +} + +// NewGroupLeaseManager creates a group lease manager backed by the given etcd client. +func NewGroupLeaseManager(client *clientv3.Client, cfg GroupLeaseConfig) *GroupLeaseManager { + return &GroupLeaseManager{ + lm: NewLeaseManager(client, LeaseManagerConfig{ + BrokerID: cfg.BrokerID, + Prefix: groupLeasePrefix, + LeaseTTLSeconds: cfg.LeaseTTLSeconds, + Logger: cfg.Logger, + ResourceKind: "group", + }), + } +} + +// GroupLeasePrefix returns the etcd prefix for watching group leases. +func GroupLeasePrefix() string { + return groupLeasePrefix +} + +// Acquire tries to grab the group coordination lease. If this broker already +// owns it, it returns nil immediately. If another broker owns it, it returns +// ErrNotOwner. +func (m *GroupLeaseManager) Acquire(ctx context.Context, groupID string) error { + return m.lm.Acquire(ctx, groupID) +} + +// Owns returns true if this broker currently holds the lease for the group. +func (m *GroupLeaseManager) Owns(groupID string) bool { + return m.lm.Owns(groupID) +} + +// Release explicitly gives up ownership of a single group. +func (m *GroupLeaseManager) Release(groupID string) { + m.lm.Release(groupID) +} + +// ReleaseAll releases all group leases. Called during graceful shutdown. +func (m *GroupLeaseManager) ReleaseAll() { + m.lm.ReleaseAll() +} + +// CurrentOwner queries etcd to find the current owner of a group. +// Returns the broker ID of the owner, or empty string if unowned. +func (m *GroupLeaseManager) CurrentOwner(ctx context.Context, groupID string) (string, error) { + return m.lm.CurrentOwner(ctx, groupID) +} + +// EtcdClient returns the underlying etcd client. +func (m *GroupLeaseManager) EtcdClient() *clientv3.Client { + return m.lm.EtcdClient() +} diff --git a/pkg/metadata/group_lease_test.go b/pkg/metadata/group_lease_test.go new file mode 100644 index 0000000..b8ed65c --- /dev/null +++ b/pkg/metadata/group_lease_test.go @@ -0,0 +1,245 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "fmt" + "log/slog" + "sync" + "testing" + "time" + + "github.com/KafScale/platform/internal/testutil" +) + +func newGroupLeaseManager(t *testing.T, endpoints []string, brokerID string, ttlSeconds int) *GroupLeaseManager { + t.Helper() + cli := newEtcdClientForTest(t, endpoints) + return NewGroupLeaseManager(cli, GroupLeaseConfig{ + BrokerID: brokerID, + LeaseTTLSeconds: ttlSeconds, + Logger: slog.Default(), + }) +} + +// Two brokers can't coordinate the same group simultaneously. +func TestGroupLeaseExclusivity(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + + brokerA := newGroupLeaseManager(t, endpoints, "broker-a", 10) + brokerB := newGroupLeaseManager(t, endpoints, "broker-b", 10) + + ctx := context.Background() + + if err := brokerA.Acquire(ctx, "my-group"); err != nil { + t.Fatalf("broker-a acquire: %v", err) + } + + err := brokerB.Acquire(ctx, "my-group") + if err == nil { + t.Fatalf("broker-b should not be able to acquire group owned by broker-a") + } + if err != ErrNotOwner { + t.Fatalf("expected ErrNotOwner, got: %v", err) + } + + if !brokerA.Owns("my-group") { + t.Fatalf("broker-a should own my-group") + } + if brokerB.Owns("my-group") { + t.Fatalf("broker-b should not own my-group") + } +} + +// Lease expiry enables failover. +func TestGroupLeaseExpiryFailover(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + + ttl := 2 + + cliA := newEtcdClientForTest(t, endpoints) + brokerA := NewGroupLeaseManager(cliA, GroupLeaseConfig{ + BrokerID: "broker-a", + LeaseTTLSeconds: ttl, + Logger: slog.Default(), + }) + brokerB := newGroupLeaseManager(t, endpoints, "broker-b", ttl) + + ctx := context.Background() + + if err := brokerA.Acquire(ctx, "my-group"); err != nil { + t.Fatalf("broker-a acquire: %v", err) + } + + cliA.Close() + + if err := brokerB.Acquire(ctx, "my-group"); err == nil { + t.Fatalf("broker-b should not acquire before lease expires") + } + + time.Sleep(time.Duration(ttl+1) * time.Second) + + if err := brokerB.Acquire(ctx, "my-group"); err != nil { + t.Fatalf("broker-b should acquire after lease expiry: %v", err) + } + if !brokerB.Owns("my-group") { + t.Fatalf("broker-b should own my-group after failover") + } +} + +// Graceful shutdown releases immediately. +func TestGroupGracefulReleaseImmediate(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + + brokerA := newGroupLeaseManager(t, endpoints, "broker-a", 30) + brokerB := newGroupLeaseManager(t, endpoints, "broker-b", 30) + + ctx := context.Background() + + if err := brokerA.Acquire(ctx, "group-1"); err != nil { + t.Fatalf("broker-a acquire group-1: %v", err) + } + if err := brokerA.Acquire(ctx, "group-2"); err != nil { + t.Fatalf("broker-a acquire group-2: %v", err) + } + + brokerA.ReleaseAll() + + if brokerA.Owns("group-1") || brokerA.Owns("group-2") { + t.Fatalf("broker-a should not own any groups after ReleaseAll") + } + + if err := brokerB.Acquire(ctx, "group-1"); err != nil { + t.Fatalf("broker-b acquire group-1 after release: %v", err) + } + if err := brokerB.Acquire(ctx, "group-2"); err != nil { + t.Fatalf("broker-b acquire group-2 after release: %v", err) + } +} + +// Reacquire after restart. +func TestGroupReacquireAfterRestart(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + + ttl := 5 + + cliA1 := newEtcdClientForTest(t, endpoints) + brokerA1 := NewGroupLeaseManager(cliA1, GroupLeaseConfig{ + BrokerID: "broker-a", + LeaseTTLSeconds: ttl, + Logger: slog.Default(), + }) + + ctx := context.Background() + + if err := brokerA1.Acquire(ctx, "my-group"); err != nil { + t.Fatalf("broker-a (session 1) acquire: %v", err) + } + + cliA1.Close() + + brokerA2 := newGroupLeaseManager(t, endpoints, "broker-a", ttl) + + if err := brokerA2.Acquire(ctx, "my-group"); err != nil { + t.Fatalf("broker-a (session 2) reacquire should succeed: %v", err) + } + if !brokerA2.Owns("my-group") { + t.Fatalf("broker-a (session 2) should own my-group after reacquire") + } + + brokerB := newGroupLeaseManager(t, endpoints, "broker-b", ttl) + if err := brokerB.Acquire(ctx, "my-group"); err != ErrNotOwner { + t.Fatalf("broker-b should get ErrNotOwner after broker-a reacquire, got: %v", err) + } +} + +// Concurrent acquire race: exactly one must win. +func TestGroupConcurrentAcquireRace(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + + const brokerCount = 5 + managers := make([]*GroupLeaseManager, brokerCount) + for i := range managers { + managers[i] = newGroupLeaseManager(t, endpoints, fmt.Sprintf("broker-%d", i), 30) + } + + ctx := context.Background() + results := make([]error, brokerCount) + var wg sync.WaitGroup + + for i := range managers { + wg.Add(1) + go func(idx int) { + defer wg.Done() + results[idx] = managers[idx].Acquire(ctx, "contested-group") + }(i) + } + wg.Wait() + + winners := 0 + losers := 0 + for i, err := range results { + switch err { + case nil: + winners++ + if !managers[i].Owns("contested-group") { + t.Errorf("broker-%d won but doesn't report ownership", i) + } + case ErrNotOwner: + losers++ + default: + t.Errorf("broker-%d got unexpected error: %v", i, err) + } + } + + if winners != 1 { + t.Fatalf("expected exactly 1 winner, got %d (losers=%d)", winners, losers) + } + if losers != brokerCount-1 { + t.Fatalf("expected %d losers, got %d", brokerCount-1, losers) + } +} + +// Different groups can be owned by different brokers. +func TestGroupLeaseMultipleGroups(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + + brokerA := newGroupLeaseManager(t, endpoints, "broker-a", 30) + brokerB := newGroupLeaseManager(t, endpoints, "broker-b", 30) + + ctx := context.Background() + + if err := brokerA.Acquire(ctx, "group-1"); err != nil { + t.Fatalf("broker-a acquire group-1: %v", err) + } + if err := brokerB.Acquire(ctx, "group-2"); err != nil { + t.Fatalf("broker-b acquire group-2: %v", err) + } + + if !brokerA.Owns("group-1") { + t.Fatalf("broker-a should own group-1") + } + if brokerA.Owns("group-2") { + t.Fatalf("broker-a should not own group-2") + } + if !brokerB.Owns("group-2") { + t.Fatalf("broker-b should own group-2") + } + if brokerB.Owns("group-1") { + t.Fatalf("broker-b should not own group-1") + } +} diff --git a/pkg/metadata/group_router.go b/pkg/metadata/group_router.go new file mode 100644 index 0000000..7e81482 --- /dev/null +++ b/pkg/metadata/group_router.go @@ -0,0 +1,176 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "fmt" + "log/slog" + "strings" + "sync" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// GroupRoute maps a group ID to the broker that currently coordinates it. +type GroupRoute struct { + GroupID string + BrokerID string +} + +// GroupRouter watches etcd group lease keys and maintains an in-memory +// routing table. The proxy uses this to send group coordination requests +// to the correct broker without round-tripping to etcd on every request. +type GroupRouter struct { + client *clientv3.Client + logger *slog.Logger + cancel context.CancelFunc + + mu sync.RWMutex + routes map[string]string // groupID -> brokerID +} + +// NewGroupRouter creates a router and starts watching etcd for group lease changes. +func NewGroupRouter(ctx context.Context, client *clientv3.Client, logger *slog.Logger) (*GroupRouter, error) { + if logger == nil { + logger = slog.Default() + } + watchCtx, cancel := context.WithCancel(ctx) + r := &GroupRouter{ + client: client, + logger: logger, + cancel: cancel, + routes: make(map[string]string), + } + + if err := r.loadAll(ctx); err != nil { + cancel() + return nil, fmt.Errorf("load initial group routes: %w", err) + } + + go r.watch(watchCtx) + return r, nil +} + +// LookupOwner returns the broker ID that coordinates a group, or "" if unknown/unowned. +func (r *GroupRouter) LookupOwner(groupID string) string { + r.mu.RLock() + brokerID := r.routes[groupID] + r.mu.RUnlock() + return brokerID +} + +// AllRoutes returns a snapshot of the current routing table. +func (r *GroupRouter) AllRoutes() []GroupRoute { + r.mu.RLock() + defer r.mu.RUnlock() + routes := make([]GroupRoute, 0, len(r.routes)) + for groupID, brokerID := range r.routes { + routes = append(routes, GroupRoute{ + GroupID: groupID, + BrokerID: brokerID, + }) + } + return routes +} + +// Invalidate removes a group from the routing table. Called when the proxy +// discovers (via NOT_COORDINATOR) that the cached route is stale. The +// next watch event will repopulate it with the correct owner. +func (r *GroupRouter) Invalidate(groupID string) { + r.mu.Lock() + delete(r.routes, groupID) + r.mu.Unlock() +} + +// Stop terminates the background watcher. +func (r *GroupRouter) Stop() { + r.cancel() +} + +func (r *GroupRouter) loadAll(ctx context.Context) error { + resp, err := r.client.Get(ctx, groupLeasePrefix+"/", clientv3.WithPrefix()) + if err != nil { + return err + } + fresh := make(map[string]string, len(resp.Kvs)) + for _, kv := range resp.Kvs { + groupID, ok := groupLeaseKeyToGroupID(string(kv.Key)) + if !ok { + continue + } + fresh[groupID] = string(kv.Value) + } + r.mu.Lock() + r.routes = fresh + r.mu.Unlock() + r.logger.Info("loaded group routes from etcd", "count", len(fresh)) + return nil +} + +func (r *GroupRouter) watch(ctx context.Context) { + for { + watchChan := r.client.Watch(ctx, groupLeasePrefix+"/", clientv3.WithPrefix(), clientv3.WithPrevKV()) + for resp := range watchChan { + if resp.Err() != nil { + r.logger.Warn("group lease watch error", "error", resp.Err()) + continue + } + r.mu.Lock() + for _, ev := range resp.Events { + etcdKey := string(ev.Kv.Key) + groupID, ok := groupLeaseKeyToGroupID(etcdKey) + if !ok { + continue + } + switch ev.Type { + case clientv3.EventTypePut: + r.routes[groupID] = string(ev.Kv.Value) + r.logger.Debug("group route updated", + "group", groupID, "broker", string(ev.Kv.Value)) + case clientv3.EventTypeDelete: + delete(r.routes, groupID) + r.logger.Debug("group route removed", "group", groupID) + } + } + r.mu.Unlock() + } + + if ctx.Err() != nil { + return + } + + r.logger.Warn("group lease watch stream closed, reconnecting") + time.Sleep(time.Second) + if err := r.loadAll(ctx); err != nil { + r.logger.Warn("group lease watch reconnect: reload failed", "error", err) + } + } +} + +// groupLeaseKeyToGroupID converts "/kafscale/group-leases/mygroup" -> "mygroup" +func groupLeaseKeyToGroupID(etcdKey string) (string, bool) { + prefix := groupLeasePrefix + "/" + if !strings.HasPrefix(etcdKey, prefix) { + return "", false + } + groupID := strings.TrimPrefix(etcdKey, prefix) + if groupID == "" { + return "", false + } + return groupID, true +} diff --git a/pkg/metadata/group_router_test.go b/pkg/metadata/group_router_test.go new file mode 100644 index 0000000..80fdd64 --- /dev/null +++ b/pkg/metadata/group_router_test.go @@ -0,0 +1,177 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "log/slog" + "testing" + "time" + + "github.com/KafScale/platform/internal/testutil" +) + +func TestGroupLeaseKeyToGroupID(t *testing.T) { + tests := []struct { + etcdKey string + want string + wantOK bool + }{ + {groupLeasePrefix + "/my-group", "my-group", true}, + {groupLeasePrefix + "/consumer-group-1", "consumer-group-1", true}, + {groupLeasePrefix + "/group.with.dots", "group.with.dots", true}, + + // Invalid cases. + {"/wrong/prefix/my-group", "", false}, + {groupLeasePrefix + "/", "", false}, + {"", "", false}, + {groupLeasePrefix, "", false}, + } + for _, tc := range tests { + t.Run(tc.etcdKey, func(t *testing.T) { + got, ok := groupLeaseKeyToGroupID(tc.etcdKey) + if ok != tc.wantOK { + t.Fatalf("groupLeaseKeyToGroupID(%q): ok=%v, want %v", tc.etcdKey, ok, tc.wantOK) + } + if got != tc.want { + t.Fatalf("groupLeaseKeyToGroupID(%q) = %q, want %q", tc.etcdKey, got, tc.want) + } + }) + } +} + + +// Router reflects group lease acquisition. +func TestGroupRouterReflectsAcquisition(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + routerCli := newEtcdClientForTest(t, endpoints) + router, err := NewGroupRouter(ctx, routerCli, slog.Default()) + if err != nil { + t.Fatalf("create router: %v", err) + } + t.Cleanup(router.Stop) + + brokerA := newGroupLeaseManager(t, endpoints, "broker-a", 30) + if err := brokerA.Acquire(ctx, "my-group"); err != nil { + t.Fatalf("broker-a acquire: %v", err) + } + + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if owner := router.LookupOwner("my-group"); owner == "broker-a" { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("router did not reflect broker-a ownership of my-group (got %q)", router.LookupOwner("my-group")) +} + +// Router reflects group lease release. +func TestGroupRouterReflectsRelease(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + routerCli := newEtcdClientForTest(t, endpoints) + router, err := NewGroupRouter(ctx, routerCli, slog.Default()) + if err != nil { + t.Fatalf("create router: %v", err) + } + t.Cleanup(router.Stop) + + brokerA := newGroupLeaseManager(t, endpoints, "broker-a", 30) + if err := brokerA.Acquire(ctx, "my-group"); err != nil { + t.Fatalf("broker-a acquire: %v", err) + } + + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if router.LookupOwner("my-group") == "broker-a" { + break + } + time.Sleep(50 * time.Millisecond) + } + if router.LookupOwner("my-group") != "broker-a" { + t.Fatalf("router did not reflect initial acquisition") + } + + brokerA.Release("my-group") + + deadline = time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if owner := router.LookupOwner("my-group"); owner == "" { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("router did not reflect release of my-group (still shows %q)", router.LookupOwner("my-group")) +} + +// Multiple groups route to different brokers. +func TestGroupRouterMultipleBrokers(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + routerCli := newEtcdClientForTest(t, endpoints) + router, err := NewGroupRouter(ctx, routerCli, slog.Default()) + if err != nil { + t.Fatalf("create router: %v", err) + } + t.Cleanup(router.Stop) + + brokerA := newGroupLeaseManager(t, endpoints, "broker-a", 30) + brokerB := newGroupLeaseManager(t, endpoints, "broker-b", 30) + + if err := brokerA.Acquire(ctx, "group-1"); err != nil { + t.Fatalf("broker-a acquire group-1: %v", err) + } + if err := brokerB.Acquire(ctx, "group-2"); err != nil { + t.Fatalf("broker-b acquire group-2: %v", err) + } + + expected := map[string]string{ + "group-1": "broker-a", + "group-2": "broker-b", + } + + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + allMatch := true + for groupID, wantBroker := range expected { + if router.LookupOwner(groupID) != wantBroker { + allMatch = false + break + } + } + if allMatch { + routes := router.AllRoutes() + if len(routes) != len(expected) { + t.Fatalf("expected %d routes, got %d", len(expected), len(routes)) + } + return + } + time.Sleep(50 * time.Millisecond) + } + + for groupID, wantBroker := range expected { + got := router.LookupOwner(groupID) + if got != wantBroker { + t.Errorf("LookupOwner(%s) = %q, want %q", groupID, got, wantBroker) + } + } + t.Fatalf("router did not converge to expected state") +} diff --git a/pkg/metadata/lease_manager.go b/pkg/metadata/lease_manager.go new file mode 100644 index 0000000..3dfe352 --- /dev/null +++ b/pkg/metadata/lease_manager.go @@ -0,0 +1,332 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "fmt" + "log/slog" + "sync" + "sync/atomic" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" + "golang.org/x/sync/singleflight" +) + +const defaultLeaseManagerTTLSeconds = 10 + +// LeaseManagerConfig configures a generic lease manager. +type LeaseManagerConfig struct { + BrokerID string + Prefix string // etcd key prefix, e.g. "/kafscale/partition-leases" + LeaseTTLSeconds int + Logger *slog.Logger + ResourceKind string // used in log messages, e.g. "partition" or "group" +} + +// LeaseManager uses etcd leases to ensure exclusive ownership of named resources. +// +// All lease keys are attached to a single shared etcd session/lease, so the +// keepalive cost is O(1) regardless of resource count. When the session dies +// (broker crash, network partition), etcd expires all keys after the TTL and +// the manager bulk-clears its local ownership map. +// +// Concurrent Acquire calls for the same resource are deduplicated via +// singleflight to avoid redundant etcd round-trips and session leaks. +type LeaseManager struct { + client *clientv3.Client + brokerID string + prefix string + ttl int + logger *slog.Logger + resourceKind string + closed atomic.Bool + + mu sync.RWMutex + owned map[string]struct{} // key: resource identifier + session *concurrency.Session + + acquireFlight singleflight.Group +} + +// NewLeaseManager creates a lease manager backed by the given etcd client. +func NewLeaseManager(client *clientv3.Client, cfg LeaseManagerConfig) *LeaseManager { + ttl := cfg.LeaseTTLSeconds + if ttl <= 0 { + ttl = defaultLeaseManagerTTLSeconds + } + logger := cfg.Logger + if logger == nil { + logger = slog.Default() + } + kind := cfg.ResourceKind + if kind == "" { + kind = "resource" + } + return &LeaseManager{ + client: client, + brokerID: cfg.BrokerID, + prefix: cfg.Prefix, + ttl: ttl, + logger: logger, + resourceKind: kind, + owned: make(map[string]struct{}), + } +} + +// leaseKey returns the etcd key for a resource. +func (m *LeaseManager) leaseKey(resourceID string) string { + return fmt.Sprintf("%s/%s", m.prefix, resourceID) +} + +// Prefix returns the etcd prefix for watching leases. +func (m *LeaseManager) Prefix() string { + return m.prefix +} + +// Acquire tries to grab the lease for the named resource. If this broker +// already owns it, it returns nil immediately. If another broker owns it, +// it returns ErrNotOwner. +func (m *LeaseManager) Acquire(ctx context.Context, resourceID string) error { + if m.closed.Load() { + return ErrShuttingDown + } + + m.mu.RLock() + if _, ok := m.owned[resourceID]; ok { + m.mu.RUnlock() + return nil + } + m.mu.RUnlock() + + _, err, _ := m.acquireFlight.Do(resourceID, func() (interface{}, error) { + return nil, m.doAcquire(ctx, resourceID) + }) + return err +} + +func (m *LeaseManager) doAcquire(ctx context.Context, resourceID string) error { + // Re-check under read lock. + m.mu.RLock() + if _, ok := m.owned[resourceID]; ok { + m.mu.RUnlock() + return nil + } + m.mu.RUnlock() + + session, err := m.getOrCreateSession(ctx) + if err != nil { + return fmt.Errorf("get session: %w", err) + } + + leaseKey := m.leaseKey(resourceID) + + txnCtx, txnCancel := context.WithTimeout(ctx, 5*time.Second) + defer txnCancel() + + txnResp, err := m.client.Txn(txnCtx). + If(clientv3.Compare(clientv3.CreateRevision(leaseKey), "=", 0)). + Then(clientv3.OpPut(leaseKey, m.brokerID, clientv3.WithLease(session.Lease()))). + Else(clientv3.OpGet(leaseKey)). + Commit() + + if err != nil { + return fmt.Errorf("%s lease txn: %w", m.resourceKind, err) + } + + if !txnResp.Succeeded { + if len(txnResp.Responses) > 0 { + rangeResp := txnResp.Responses[0].GetResponseRange() + if rangeResp != nil && len(rangeResp.Kvs) > 0 { + owner := string(rangeResp.Kvs[0].Value) + if owner == m.brokerID { + return m.reacquire(ctx, resourceID, leaseKey, session) + } + } + } + return ErrNotOwner + } + + m.mu.Lock() + if m.session != session { + m.mu.Unlock() + return fmt.Errorf("session changed during acquire") + } + m.owned[resourceID] = struct{}{} + m.mu.Unlock() + + m.logger.Info(fmt.Sprintf("acquired %s lease", m.resourceKind), + m.resourceKind, resourceID, "broker", m.brokerID) + return nil +} + +func (m *LeaseManager) reacquire(ctx context.Context, resourceID, leaseKey string, session *concurrency.Session) error { + txnCtx, txnCancel := context.WithTimeout(ctx, 5*time.Second) + defer txnCancel() + + txnResp, err := m.client.Txn(txnCtx). + If(clientv3.Compare(clientv3.Value(leaseKey), "=", m.brokerID)). + Then(clientv3.OpPut(leaseKey, m.brokerID, clientv3.WithLease(session.Lease()))). + Commit() + if err != nil { + return fmt.Errorf("reacquire %s lease: %w", m.resourceKind, err) + } + if !txnResp.Succeeded { + return ErrNotOwner + } + + m.mu.Lock() + if m.session != session { + m.mu.Unlock() + return fmt.Errorf("session changed during reacquire") + } + m.owned[resourceID] = struct{}{} + m.mu.Unlock() + + m.logger.Info(fmt.Sprintf("reacquired %s lease", m.resourceKind), + m.resourceKind, resourceID, "broker", m.brokerID) + return nil +} + +func (m *LeaseManager) getOrCreateSession(ctx context.Context) (*concurrency.Session, error) { + m.mu.Lock() + if m.session != nil { + select { + case <-m.session.Done(): + m.session = nil + m.owned = make(map[string]struct{}) + default: + s := m.session + m.mu.Unlock() + return s, nil + } + } + m.mu.Unlock() + + session, err := concurrency.NewSession(m.client, concurrency.WithTTL(m.ttl)) + if err != nil { + return nil, fmt.Errorf("create etcd session: %w", err) + } + + m.mu.Lock() + if m.closed.Load() { + m.mu.Unlock() + session.Close() + return nil, ErrShuttingDown + } + if m.session != nil { + select { + case <-m.session.Done(): + default: + s := m.session + m.mu.Unlock() + session.Close() + return s, nil + } + } + m.session = session + go m.monitorSession(session) + m.mu.Unlock() + return session, nil +} + +func (m *LeaseManager) monitorSession(session *concurrency.Session) { + <-session.Done() + + m.mu.Lock() + if m.session == session { + m.session = nil + count := len(m.owned) + m.owned = make(map[string]struct{}) + m.mu.Unlock() + m.logger.Warn(fmt.Sprintf("%s lease session expired, cleared all ownership", m.resourceKind), + "broker", m.brokerID, "count", count) + } else { + m.mu.Unlock() + } +} + +// Owns returns true if this broker currently holds the lease for the resource. +func (m *LeaseManager) Owns(resourceID string) bool { + m.mu.RLock() + _, ok := m.owned[resourceID] + m.mu.RUnlock() + return ok +} + +// Release explicitly gives up ownership of a single resource. +func (m *LeaseManager) Release(resourceID string) { + m.mu.Lock() + _, ok := m.owned[resourceID] + if ok { + delete(m.owned, resourceID) + } + m.mu.Unlock() + + if ok { + leaseKey := m.leaseKey(resourceID) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := m.client.Delete(ctx, leaseKey); err != nil { + m.logger.Warn(fmt.Sprintf("failed to delete %s lease key", m.resourceKind), + "key", leaseKey, "error", err) + } + m.logger.Info(fmt.Sprintf("released %s lease", m.resourceKind), + m.resourceKind, resourceID, "broker", m.brokerID) + } +} + +// ReleaseAll releases all leases. Called during graceful shutdown. +func (m *LeaseManager) ReleaseAll() { + m.closed.Store(true) + m.mu.Lock() + count := len(m.owned) + m.owned = make(map[string]struct{}) + session := m.session + m.session = nil + m.mu.Unlock() + + if session != nil { + session.Close() + } + m.logger.Info(fmt.Sprintf("released all %s leases", m.resourceKind), + "broker", m.brokerID, "count", count) +} + +// CurrentOwner queries etcd to find the current owner of a resource. +// Returns the broker ID of the owner, or empty string if unowned. +func (m *LeaseManager) CurrentOwner(ctx context.Context, resourceID string) (string, error) { + leaseKey := m.leaseKey(resourceID) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + resp, err := m.client.Get(ctx, leaseKey) + if err != nil { + return "", err + } + if len(resp.Kvs) == 0 { + return "", nil + } + return string(resp.Kvs[0].Value), nil +} + +// EtcdClient returns the underlying etcd client. Used by routers that need +// to watch the same prefix. +func (m *LeaseManager) EtcdClient() *clientv3.Client { + return m.client +} diff --git a/pkg/metadata/partition_lease.go b/pkg/metadata/partition_lease.go index 19d2cc2..460b9d6 100644 --- a/pkg/metadata/partition_lease.go +++ b/pkg/metadata/partition_lease.go @@ -21,21 +21,13 @@ import ( "fmt" "log/slog" "sync" - "sync/atomic" - "time" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/concurrency" - "golang.org/x/sync/singleflight" ) const ( // partitionLeasePrefix is the etcd key prefix for partition ownership leases. partitionLeasePrefix = "/kafscale/partition-leases" - - // defaultLeaseTTLSeconds is the TTL for partition leases. When a broker dies, - // its leases expire after this many seconds, allowing another broker to take over. - defaultLeaseTTLSeconds = 10 ) var ( @@ -59,44 +51,22 @@ type PartitionLeaseConfig struct { } // PartitionLeaseManager uses etcd leases to ensure exclusive partition ownership. -// -// All partition lease keys are attached to a single shared etcd session/lease, -// so the keepalive cost is O(1) regardless of partition count. When the session -// dies (broker crash, network partition), etcd expires all keys after the TTL -// and the manager bulk-clears its local ownership map. -// -// Concurrent Acquire calls for the same partition are deduplicated via -// singleflight to avoid redundant etcd round-trips and session leaks. +// It delegates to the generic LeaseManager, translating topic+partition pairs +// into string resource IDs. type PartitionLeaseManager struct { - client *clientv3.Client - brokerID string - ttl int - logger *slog.Logger - closed atomic.Bool - - mu sync.RWMutex - partitions map[string]struct{} // key: "topic:partition" - session *concurrency.Session - - acquireFlight singleflight.Group + lm *LeaseManager } // NewPartitionLeaseManager creates a lease manager backed by the given etcd client. func NewPartitionLeaseManager(client *clientv3.Client, cfg PartitionLeaseConfig) *PartitionLeaseManager { - ttl := cfg.LeaseTTLSeconds - if ttl <= 0 { - ttl = defaultLeaseTTLSeconds - } - logger := cfg.Logger - if logger == nil { - logger = slog.Default() - } return &PartitionLeaseManager{ - client: client, - brokerID: cfg.BrokerID, - ttl: ttl, - logger: logger, - partitions: make(map[string]struct{}), + lm: NewLeaseManager(client, LeaseManagerConfig{ + BrokerID: cfg.BrokerID, + Prefix: partitionLeasePrefix, + LeaseTTLSeconds: cfg.LeaseTTLSeconds, + Logger: cfg.Logger, + ResourceKind: "partition", + }), } } @@ -110,207 +80,15 @@ func PartitionLeasePrefix() string { return partitionLeasePrefix } +// partitionResourceID returns the resource ID used internally by LeaseManager. +func partitionResourceID(topic string, partition int32) string { + return fmt.Sprintf("%s/%d", topic, partition) +} + // Acquire tries to grab the partition lease. If this broker already owns it, // it returns nil immediately. If another broker owns it, it returns ErrNotOwner. -// -// The lease is kept alive automatically via the shared etcd session. When the -// broker crashes the session expires and all partition keys are removed by etcd. func (m *PartitionLeaseManager) Acquire(ctx context.Context, topic string, partition int32) error { - if m.closed.Load() { - return ErrShuttingDown - } - - key := partitionKey(topic, partition) - - m.mu.RLock() - if _, ok := m.partitions[key]; ok { - m.mu.RUnlock() - return nil - } - m.mu.RUnlock() - - // Deduplicate concurrent Acquire calls for the same partition. Two produce - // requests arriving simultaneously for an unowned partition would otherwise - // both create sessions and race on the CAS, leaking the loser's session. - _, err, _ := m.acquireFlight.Do(key, func() (interface{}, error) { - return nil, m.doAcquire(ctx, topic, partition) - }) - return err -} - -func (m *PartitionLeaseManager) doAcquire(ctx context.Context, topic string, partition int32) error { - key := partitionKey(topic, partition) - - // Re-check under read lock: another goroutine sharing this singleflight - // call may have already stored the result from a previous batch. - m.mu.RLock() - if _, ok := m.partitions[key]; ok { - m.mu.RUnlock() - return nil - } - m.mu.RUnlock() - - session, err := m.getOrCreateSession(ctx) - if err != nil { - return fmt.Errorf("get session: %w", err) - } - - leaseKey := partitionLeaseKey(topic, partition) - - // Try to create the key only if it does not exist (CreateRevision == 0). - // This is the atomic compare-and-swap that prevents two brokers from - // both claiming the same partition. - txnCtx, txnCancel := context.WithTimeout(ctx, 5*time.Second) - defer txnCancel() - - txnResp, err := m.client.Txn(txnCtx). - If(clientv3.Compare(clientv3.CreateRevision(leaseKey), "=", 0)). - Then(clientv3.OpPut(leaseKey, m.brokerID, clientv3.WithLease(session.Lease()))). - Else(clientv3.OpGet(leaseKey)). - Commit() - - if err != nil { - return fmt.Errorf("partition lease txn: %w", err) - } - - if !txnResp.Succeeded { - // Someone else owns the lease. Check if it's us (e.g. after a restart - // where the old session hasn't expired yet). - if len(txnResp.Responses) > 0 { - rangeResp := txnResp.Responses[0].GetResponseRange() - if rangeResp != nil && len(rangeResp.Kvs) > 0 { - owner := string(rangeResp.Kvs[0].Value) - if owner == m.brokerID { - return m.reacquire(ctx, topic, partition, leaseKey, session) - } - } - } - return ErrNotOwner - } - - // Store ownership under the lock. Verify the session is still the active - // one — if it died between getOrCreateSession and now, our etcd key will - // expire and we must not claim local ownership. - m.mu.Lock() - if m.session != session { - m.mu.Unlock() - return fmt.Errorf("session changed during acquire") - } - m.partitions[key] = struct{}{} - m.mu.Unlock() - - m.logger.Info("acquired partition lease", - "topic", topic, "partition", partition, "broker", m.brokerID) - return nil -} - -// reacquire overwrites the lease key with our current session when we discover -// that we still own the key from a previous (possibly expiring) session. -func (m *PartitionLeaseManager) reacquire(ctx context.Context, topic string, partition int32, leaseKey string, session *concurrency.Session) error { - key := partitionKey(topic, partition) - - txnCtx, txnCancel := context.WithTimeout(ctx, 5*time.Second) - defer txnCancel() - - // Conditional Put: only overwrite if we still own the key. This prevents a race - // where our old session expires and another broker acquires between the read in - // Acquire and the write here. - txnResp, err := m.client.Txn(txnCtx). - If(clientv3.Compare(clientv3.Value(leaseKey), "=", m.brokerID)). - Then(clientv3.OpPut(leaseKey, m.brokerID, clientv3.WithLease(session.Lease()))). - Commit() - if err != nil { - return fmt.Errorf("reacquire partition lease: %w", err) - } - if !txnResp.Succeeded { - return ErrNotOwner - } - - m.mu.Lock() - if m.session != session { - m.mu.Unlock() - return fmt.Errorf("session changed during reacquire") - } - m.partitions[key] = struct{}{} - m.mu.Unlock() - - m.logger.Info("reacquired partition lease", - "topic", topic, "partition", partition, "broker", m.brokerID) - return nil -} - -// getOrCreateSession returns the shared session, creating one if necessary. -// A single monitoring goroutine watches for session death and bulk-clears -// all owned partitions when it happens. -func (m *PartitionLeaseManager) getOrCreateSession(ctx context.Context) (*concurrency.Session, error) { - m.mu.Lock() - if m.session != nil { - select { - case <-m.session.Done(): - // Session died — clear stale state. - m.session = nil - m.partitions = make(map[string]struct{}) - default: - s := m.session - m.mu.Unlock() - return s, nil - } - } - m.mu.Unlock() - - // Create a new session outside the lock (network round-trip). - session, err := concurrency.NewSession(m.client, concurrency.WithTTL(m.ttl)) - if err != nil { - return nil, fmt.Errorf("create etcd session: %w", err) - } - - m.mu.Lock() - // ReleaseAll may have fired while we were creating the session. If so, - // close the session we just created and reject the acquisition. - if m.closed.Load() { - m.mu.Unlock() - session.Close() - return nil, ErrShuttingDown - } - // Double-check: another goroutine may have created a session while we - // were blocked on the network call. - if m.session != nil { - select { - case <-m.session.Done(): - // Still dead, use ours. - default: - // Someone else won the race — close ours and use theirs. - s := m.session - m.mu.Unlock() - session.Close() - return s, nil - } - } - m.session = session - go m.monitorSession(session) - m.mu.Unlock() - return session, nil -} - -// monitorSession watches the session lifetime. When it dies (etcd connectivity -// loss, lease expiry), all partition ownership is invalidated. Next Acquire -// call will create a fresh session. -func (m *PartitionLeaseManager) monitorSession(session *concurrency.Session) { - <-session.Done() - - m.mu.Lock() - // Only clear if this is still the active session (ReleaseAll may have - // already replaced it with nil and started a new one). - if m.session == session { - m.session = nil - count := len(m.partitions) - m.partitions = make(map[string]struct{}) - m.mu.Unlock() - m.logger.Warn("partition lease session expired, cleared all ownership", - "broker", m.brokerID, "count", count) - } else { - m.mu.Unlock() - } + return m.lm.Acquire(ctx, partitionResourceID(topic, partition)) } // PartitionID identifies a topic-partition pair. @@ -331,23 +109,18 @@ type AcquireResult struct { func (m *PartitionLeaseManager) AcquireAll(ctx context.Context, partitions []PartitionID) []AcquireResult { results := make([]AcquireResult, len(partitions)) - // Fast path: filter out already-owned partitions. var needAcquire []int - m.mu.RLock() for i, p := range partitions { results[i].Partition = p - key := partitionKey(p.Topic, p.Partition) - if _, ok := m.partitions[key]; !ok { + if !m.lm.Owns(partitionResourceID(p.Topic, p.Partition)) { needAcquire = append(needAcquire, i) } } - m.mu.RUnlock() if len(needAcquire) == 0 { return results } - // Acquire remaining partitions concurrently. var wg sync.WaitGroup for _, idx := range needAcquire { idx := idx @@ -363,68 +136,26 @@ func (m *PartitionLeaseManager) AcquireAll(ctx context.Context, partitions []Par // Owns returns true if this broker currently holds the lease for the partition. func (m *PartitionLeaseManager) Owns(topic string, partition int32) bool { - key := partitionKey(topic, partition) - m.mu.RLock() - _, ok := m.partitions[key] - m.mu.RUnlock() - return ok + return m.lm.Owns(partitionResourceID(topic, partition)) } // Release explicitly gives up ownership of a single partition. func (m *PartitionLeaseManager) Release(topic string, partition int32) { - key := partitionKey(topic, partition) - m.mu.Lock() - _, ok := m.partitions[key] - if ok { - delete(m.partitions, key) - } - m.mu.Unlock() - - if ok { - // Delete the etcd key so another broker can acquire immediately. - leaseKey := partitionLeaseKey(topic, partition) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if _, err := m.client.Delete(ctx, leaseKey); err != nil { - m.logger.Warn("failed to delete partition lease key", - "key", leaseKey, "error", err) - } - m.logger.Info("released partition lease", - "topic", topic, "partition", partition, "broker", m.brokerID) - } + m.lm.Release(partitionResourceID(topic, partition)) } // ReleaseAll releases all partition leases. Called during graceful shutdown. -// Closing the shared session revokes the underlying etcd lease, which removes -// all attached keys atomically. func (m *PartitionLeaseManager) ReleaseAll() { - m.closed.Store(true) - m.mu.Lock() - count := len(m.partitions) - m.partitions = make(map[string]struct{}) - session := m.session - m.session = nil - m.mu.Unlock() - - if session != nil { - session.Close() - } - m.logger.Info("released all partition leases", "broker", m.brokerID, "count", count) + m.lm.ReleaseAll() } // CurrentOwner queries etcd to find the current owner of a partition. // Returns the broker ID of the owner, or empty string if unowned. func (m *PartitionLeaseManager) CurrentOwner(ctx context.Context, topic string, partition int32) (string, error) { - leaseKey := partitionLeaseKey(topic, partition) - ctx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() + return m.lm.CurrentOwner(ctx, partitionResourceID(topic, partition)) +} - resp, err := m.client.Get(ctx, leaseKey) - if err != nil { - return "", err - } - if len(resp.Kvs) == 0 { - return "", nil - } - return string(resp.Kvs[0].Value), nil +// EtcdClient returns the underlying etcd client. +func (m *PartitionLeaseManager) EtcdClient() *clientv3.Client { + return m.lm.EtcdClient() } diff --git a/pkg/protocol/errors.go b/pkg/protocol/errors.go index 10047e8..9affd3b 100644 --- a/pkg/protocol/errors.go +++ b/pkg/protocol/errors.go @@ -38,4 +38,5 @@ const ( INVALID_PARTITIONS int16 = 37 UNSUPPORTED_VERSION int16 = 35 NOT_LEADER_OR_FOLLOWER int16 = 6 + NOT_COORDINATOR int16 = 16 ) diff --git a/pkg/protocol/response.go b/pkg/protocol/response.go index fca4c4c..fccbb44 100644 --- a/pkg/protocol/response.go +++ b/pkg/protocol/response.go @@ -1383,3 +1383,207 @@ func EncodeResponse(payload []byte) ([]byte, error) { w.write(payload) return w.Bytes(), nil } + +// GroupResponseErrorCode extracts the top-level error code from a group-related +// response. It accounts for version-dependent encoding (flexible headers, +// optional throttle_ms). Returns (errorCode, true) on success, or (0, false) +// if the response cannot be parsed. +// +// For DescribeGroups, which can contain multiple groups, this returns the error +// code of the first group in the response. +func GroupResponseErrorCode(apiKey int16, apiVersion int16, resp []byte) (int16, bool) { + r := newByteReader(resp) + + // All responses start with correlation_id. + if _, err := r.Int32(); err != nil { + return 0, false + } + + switch apiKey { + case APIKeyJoinGroup: + return readGroupErrorCode(r, apiVersion >= 6, apiVersion >= 2) + case APIKeySyncGroup: + return readGroupErrorCode(r, apiVersion >= 4, apiVersion >= 1) + case APIKeyHeartbeat: + return readGroupErrorCode(r, apiVersion >= 4, apiVersion >= 1) + case APIKeyLeaveGroup: + // LeaveGroupResponse: correlation_id + error_code (no throttle_ms, no flex header in supported versions). + ec, err := r.Int16() + if err != nil { + return 0, false + } + return ec, true + case APIKeyOffsetCommit: + return readOffsetCommitErrorCode(r) + case APIKeyOffsetFetch: + return readOffsetFetchErrorCode(r, apiVersion) + case APIKeyDescribeGroups: + return readDescribeGroupsFirstErrorCode(r, apiVersion) + default: + return 0, false + } +} + +// readGroupErrorCode reads the error code from a response with the layout: +// [tagged_fields if flexible] [throttle_ms(4) if hasThrottle] error_code(2) +func readGroupErrorCode(r *byteReader, flexible bool, hasThrottle bool) (int16, bool) { + if flexible { + if err := r.SkipTaggedFields(); err != nil { + return 0, false + } + } + if hasThrottle { + if _, err := r.Int32(); err != nil { + return 0, false + } + } + ec, err := r.Int16() + if err != nil { + return 0, false + } + return ec, true +} + +// readOffsetCommitErrorCode reads the first partition error code from an OffsetCommitResponse. +// Layout: correlation_id(4) + throttle_ms(4) + topics... +func readOffsetCommitErrorCode(r *byteReader) (int16, bool) { + // throttle_ms + if _, err := r.Int32(); err != nil { + return 0, false + } + // topic count + topicCount, err := r.Int32() + if err != nil { + return 0, false + } + for i := int32(0); i < topicCount; i++ { + // topic name + if _, err := r.String(); err != nil { + return 0, false + } + // partition count + partCount, err := r.Int32() + if err != nil { + return 0, false + } + for j := int32(0); j < partCount; j++ { + // partition index + if _, err := r.Int32(); err != nil { + return 0, false + } + // error code + ec, err := r.Int16() + if err != nil { + return 0, false + } + if ec != 0 { + return ec, true + } + } + } + return 0, true +} + +// readOffsetFetchErrorCode reads the top-level error code from an OffsetFetchResponse. +// The top-level error code exists at version >= 2 and is at the end of the response. +// For simplicity, we read through the structure to find it. +func readOffsetFetchErrorCode(r *byteReader, version int16) (int16, bool) { + if version >= 3 { + // throttle_ms + if _, err := r.Int32(); err != nil { + return 0, false + } + } + // topic count + topicCount, err := r.Int32() + if err != nil { + return 0, false + } + for i := int32(0); i < topicCount; i++ { + if _, err := r.String(); err != nil { + return 0, false + } + partCount, err := r.Int32() + if err != nil { + return 0, false + } + for j := int32(0); j < partCount; j++ { + // partition + if _, err := r.Int32(); err != nil { + return 0, false + } + // offset + if _, err := r.Int64(); err != nil { + return 0, false + } + // leader_epoch (version >= 5) + if version >= 5 { + if _, err := r.Int32(); err != nil { + return 0, false + } + } + // metadata (nullable string) + if _, err := r.NullableString(); err != nil { + return 0, false + } + // partition error code + ec, err := r.Int16() + if err != nil { + return 0, false + } + if ec != 0 { + return ec, true + } + } + } + // top-level error code (version >= 2) + if version >= 2 { + ec, err := r.Int16() + if err != nil { + return 0, false + } + return ec, true + } + return 0, true +} + +// readDescribeGroupsFirstErrorCode reads the error code of the first group +// in a DescribeGroupsResponse. +func readDescribeGroupsFirstErrorCode(r *byteReader, version int16) (int16, bool) { + flexible := version >= 5 + if flexible { + if err := r.SkipTaggedFields(); err != nil { + return 0, false + } + } + // throttle_ms (version >= 1) + if version >= 1 { + if _, err := r.Int32(); err != nil { + return 0, false + } + } + // group count + var groupCount int32 + if flexible { + gc, err := r.CompactArrayLen() + if err != nil { + return 0, false + } + groupCount = gc + } else { + gc, err := r.Int32() + if err != nil { + return 0, false + } + groupCount = gc + } + if groupCount == 0 { + return 0, true + } + // first group error code + ec, err := r.Int16() + if err != nil { + return 0, false + } + return ec, true +} diff --git a/pkg/protocol/response_test.go b/pkg/protocol/response_test.go index e606e1e..476d171 100644 --- a/pkg/protocol/response_test.go +++ b/pkg/protocol/response_test.go @@ -1605,3 +1605,253 @@ func makeTestRecordBatch(count int32, baseOffset int64) []byte { binary.BigEndian.PutUint32(data[57:61], uint32(count)) return data } + +// TestGroupResponseErrorCode_RoundTrip encodes known responses via the standard +// Encode* functions, then verifies GroupResponseErrorCode extracts the error. +func TestGroupResponseErrorCode_RoundTrip(t *testing.T) { + tests := []struct { + name string + apiKey int16 + apiVersion int16 + encode func() ([]byte, error) + wantCode int16 + }{ + { + name: "JoinGroup v2 NOT_COORDINATOR", + apiKey: APIKeyJoinGroup, + apiVersion: 2, + encode: func() ([]byte, error) { + return EncodeJoinGroupResponse(&JoinGroupResponse{ + CorrelationID: 1, + ThrottleMs: 0, + ErrorCode: NOT_COORDINATOR, + }, 2) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "JoinGroup v5 NOT_COORDINATOR", + apiKey: APIKeyJoinGroup, + apiVersion: 5, + encode: func() ([]byte, error) { + return EncodeJoinGroupResponse(&JoinGroupResponse{ + CorrelationID: 2, + ThrottleMs: 0, + ErrorCode: NOT_COORDINATOR, + }, 5) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "JoinGroup v2 success", + apiKey: APIKeyJoinGroup, + apiVersion: 2, + encode: func() ([]byte, error) { + return EncodeJoinGroupResponse(&JoinGroupResponse{ + CorrelationID: 3, + ThrottleMs: 0, + ErrorCode: 0, + ProtocolName: "range", + LeaderID: "member-1", + MemberID: "member-1", + }, 2) + }, + wantCode: 0, + }, + { + name: "SyncGroup v1 NOT_COORDINATOR", + apiKey: APIKeySyncGroup, + apiVersion: 1, + encode: func() ([]byte, error) { + return EncodeSyncGroupResponse(&SyncGroupResponse{ + CorrelationID: 4, + ErrorCode: NOT_COORDINATOR, + }, 1) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "SyncGroup v4 NOT_COORDINATOR (flexible)", + apiKey: APIKeySyncGroup, + apiVersion: 4, + encode: func() ([]byte, error) { + return EncodeSyncGroupResponse(&SyncGroupResponse{ + CorrelationID: 5, + ErrorCode: NOT_COORDINATOR, + }, 4) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "Heartbeat v1 NOT_COORDINATOR", + apiKey: APIKeyHeartbeat, + apiVersion: 1, + encode: func() ([]byte, error) { + return EncodeHeartbeatResponse(&HeartbeatResponse{ + CorrelationID: 6, + ErrorCode: NOT_COORDINATOR, + }, 1) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "Heartbeat v4 NOT_COORDINATOR (flexible)", + apiKey: APIKeyHeartbeat, + apiVersion: 4, + encode: func() ([]byte, error) { + return EncodeHeartbeatResponse(&HeartbeatResponse{ + CorrelationID: 7, + ErrorCode: NOT_COORDINATOR, + }, 4) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "LeaveGroup NOT_COORDINATOR", + apiKey: APIKeyLeaveGroup, + apiVersion: 0, + encode: func() ([]byte, error) { + return EncodeLeaveGroupResponse(&LeaveGroupResponse{ + CorrelationID: 8, + ErrorCode: NOT_COORDINATOR, + }) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "OffsetCommit with NOT_COORDINATOR on partition", + apiKey: APIKeyOffsetCommit, + apiVersion: 3, + encode: func() ([]byte, error) { + return EncodeOffsetCommitResponse(&OffsetCommitResponse{ + CorrelationID: 9, + Topics: []OffsetCommitTopicResponse{ + { + Name: "test-topic", + Partitions: []OffsetCommitPartitionResponse{ + {Partition: 0, ErrorCode: NOT_COORDINATOR}, + }, + }, + }, + }) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "OffsetCommit success (no false positive for partition 16)", + apiKey: APIKeyOffsetCommit, + apiVersion: 3, + encode: func() ([]byte, error) { + return EncodeOffsetCommitResponse(&OffsetCommitResponse{ + CorrelationID: 10, + Topics: []OffsetCommitTopicResponse{ + { + Name: "test-topic", + Partitions: []OffsetCommitPartitionResponse{ + {Partition: 16, ErrorCode: 0}, + }, + }, + }, + }) + }, + wantCode: 0, + }, + { + name: "OffsetFetch v3 NOT_COORDINATOR (top-level)", + apiKey: APIKeyOffsetFetch, + apiVersion: 3, + encode: func() ([]byte, error) { + return EncodeOffsetFetchResponse(&OffsetFetchResponse{ + CorrelationID: 11, + Topics: []OffsetFetchTopicResponse{ + { + Name: "test-topic", + Partitions: []OffsetFetchPartitionResponse{ + {Partition: 0, Offset: -1, LeaderEpoch: -1, ErrorCode: NOT_COORDINATOR}, + }, + }, + }, + ErrorCode: NOT_COORDINATOR, + }, 3) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "OffsetFetch v5 success (offset 16, no false positive)", + apiKey: APIKeyOffsetFetch, + apiVersion: 5, + encode: func() ([]byte, error) { + return EncodeOffsetFetchResponse(&OffsetFetchResponse{ + CorrelationID: 12, + Topics: []OffsetFetchTopicResponse{ + { + Name: "test-topic", + Partitions: []OffsetFetchPartitionResponse{ + {Partition: 0, Offset: 16, LeaderEpoch: 0, ErrorCode: 0}, + }, + }, + }, + ErrorCode: 0, + }, 5) + }, + wantCode: 0, + }, + { + name: "DescribeGroups v5 NOT_COORDINATOR", + apiKey: APIKeyDescribeGroups, + apiVersion: 5, + encode: func() ([]byte, error) { + return EncodeDescribeGroupsResponse(&DescribeGroupsResponse{ + CorrelationID: 13, + Groups: []DescribeGroupsResponseGroup{ + {ErrorCode: NOT_COORDINATOR, GroupID: "my-group"}, + }, + }, 5) + }, + wantCode: NOT_COORDINATOR, + }, + { + name: "DescribeGroups v5 success", + apiKey: APIKeyDescribeGroups, + apiVersion: 5, + encode: func() ([]byte, error) { + return EncodeDescribeGroupsResponse(&DescribeGroupsResponse{ + CorrelationID: 14, + Groups: []DescribeGroupsResponseGroup{ + {ErrorCode: 0, GroupID: "my-group", State: "Stable"}, + }, + }, 5) + }, + wantCode: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + resp, err := tc.encode() + if err != nil { + t.Fatalf("encode: %v", err) + } + gotCode, ok := GroupResponseErrorCode(tc.apiKey, tc.apiVersion, resp) + if !ok { + t.Fatalf("GroupResponseErrorCode returned ok=false for valid response") + } + if gotCode != tc.wantCode { + t.Fatalf("GroupResponseErrorCode = %d, want %d", gotCode, tc.wantCode) + } + }) + } +} + +func TestGroupResponseErrorCode_Truncated(t *testing.T) { + // A truncated response should return ok=false. + _, ok := GroupResponseErrorCode(APIKeyJoinGroup, 2, []byte{0, 0, 0, 1}) + if ok { + t.Fatalf("expected ok=false for truncated JoinGroup response") + } + + _, ok = GroupResponseErrorCode(APIKeyLeaveGroup, 0, []byte{0, 0}) + if ok { + t.Fatalf("expected ok=false for truncated LeaveGroup response") + } +}