Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 116 additions & 8 deletions cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type handler struct {
logConfig storage.PartitionLogConfig
coordinator *broker.GroupCoordinator
leaseManager *metadata.PartitionLeaseManager
groupLeaseManager *metadata.GroupLeaseManager
s3Health *broker.S3HealthMonitor
s3Namespace string
brokerInfo protocol.MetadataBroker
Expand Down Expand Up @@ -224,10 +225,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED,
}, header.APIVersion)
}
if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 {
return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{
CorrelationID: header.CorrelationID,
ErrorCode: errCode,
}, header.APIVersion)
}
if !h.etcdAvailable() {
return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{
CorrelationID: header.CorrelationID,
ThrottleMs: 0,
ErrorCode: protocol.REQUEST_TIMED_OUT,
}, header.APIVersion)
}
Expand All @@ -246,10 +252,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED,
}, header.APIVersion)
}
if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 {
return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{
CorrelationID: header.CorrelationID,
ErrorCode: errCode,
}, header.APIVersion)
}
if !h.etcdAvailable() {
return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{
CorrelationID: header.CorrelationID,
ThrottleMs: 0,
ErrorCode: protocol.REQUEST_TIMED_OUT,
}, header.APIVersion)
}
Expand All @@ -263,13 +274,18 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
return h.withAdminMetrics(header.APIKey, func() ([]byte, error) {
allowed := make([]string, 0, len(req.Groups))
denied := make(map[string]struct{})
leaseErrors := make(map[string]int16)
for _, groupID := range req.Groups {
if h.allowGroup(principal, groupID, acl.ActionGroupRead) {
allowed = append(allowed, groupID)
} else {
if !h.allowGroup(principal, groupID, acl.ActionGroupRead) {
denied[groupID] = struct{}{}
h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, groupID)
continue
}
if errCode := h.acquireGroupLease(ctx, groupID); errCode != 0 {
leaseErrors[groupID] = errCode
continue
}
allowed = append(allowed, groupID)
}

responseByGroup := make(map[string]protocol.DescribeGroupsResponseGroup, len(req.Groups))
Expand All @@ -296,13 +312,20 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re

results := make([]protocol.DescribeGroupsResponseGroup, 0, len(req.Groups))
for _, groupID := range req.Groups {
if _, denied := denied[groupID]; denied {
if _, ok := denied[groupID]; ok {
results = append(results, protocol.DescribeGroupsResponseGroup{
ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED,
GroupID: groupID,
})
continue
}
if errCode, ok := leaseErrors[groupID]; ok {
results = append(results, protocol.DescribeGroupsResponseGroup{
ErrorCode: errCode,
GroupID: groupID,
})
continue
}
if group, ok := responseByGroup[groupID]; ok {
results = append(results, group)
} else {
Expand Down Expand Up @@ -354,10 +377,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED,
}, header.APIVersion)
}
if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 {
return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{
CorrelationID: header.CorrelationID,
ErrorCode: errCode,
}, header.APIVersion)
}
if !h.etcdAvailable() {
return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{
CorrelationID: header.CorrelationID,
ThrottleMs: 0,
ErrorCode: protocol.REQUEST_TIMED_OUT,
}, header.APIVersion)
}
Expand All @@ -372,6 +400,12 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED,
})
}
if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 {
return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{
CorrelationID: header.CorrelationID,
ErrorCode: errCode,
})
}
if !h.etcdAvailable() {
return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{
CorrelationID: header.CorrelationID,
Expand Down Expand Up @@ -404,6 +438,26 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
Topics: topics,
})
}
if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 {
topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics))
for _, topic := range req.Topics {
partitions := make([]protocol.OffsetCommitPartitionResponse, 0, len(topic.Partitions))
for _, part := range topic.Partitions {
partitions = append(partitions, protocol.OffsetCommitPartitionResponse{
Partition: part.Partition,
ErrorCode: errCode,
})
}
topics = append(topics, protocol.OffsetCommitTopicResponse{
Name: topic.Name,
Partitions: partitions,
})
}
return protocol.EncodeOffsetCommitResponse(&protocol.OffsetCommitResponse{
CorrelationID: header.CorrelationID,
Topics: topics,
})
}
if !h.etcdAvailable() {
topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics))
for _, topic := range req.Topics {
Expand Down Expand Up @@ -457,6 +511,29 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED,
}, header.APIVersion)
}
if errCode := h.acquireGroupLease(ctx, req.GroupID); errCode != 0 {
topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics))
for _, topic := range req.Topics {
partitions := make([]protocol.OffsetFetchPartitionResponse, 0, len(topic.Partitions))
for _, part := range topic.Partitions {
partitions = append(partitions, protocol.OffsetFetchPartitionResponse{
Partition: part.Partition,
Offset: -1,
LeaderEpoch: -1,
ErrorCode: errCode,
})
}
topics = append(topics, protocol.OffsetFetchTopicResponse{
Name: topic.Name,
Partitions: partitions,
})
}
return protocol.EncodeOffsetFetchResponse(&protocol.OffsetFetchResponse{
CorrelationID: header.CorrelationID,
Topics: topics,
ErrorCode: errCode,
}, header.APIVersion)
}
if !h.etcdAvailable() {
topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics))
for _, topic := range req.Topics {
Expand Down Expand Up @@ -951,6 +1028,27 @@ func (h *handler) unauthorizedListOffsets(principal string, header *protocol.Req
})
}

// acquireGroupLease attempts to acquire the coordination lease for the given
// group. Returns 0 if this broker is (or becomes) the coordinator. Returns a
// non-zero Kafka error code if the request should be rejected:
// - NOT_COORDINATOR when another broker owns the group or this broker is
// shutting down (so the proxy can redirect to the correct broker).
// - REQUEST_TIMED_OUT for transient errors (etcd timeout, session failure).
func (h *handler) acquireGroupLease(ctx context.Context, groupID string) int16 {
if h.groupLeaseManager == nil {
return 0
}
err := h.groupLeaseManager.Acquire(ctx, groupID)
if err == nil {
return 0
}
if errors.Is(err, metadata.ErrNotOwner) || errors.Is(err, metadata.ErrShuttingDown) {
return protocol.NOT_COORDINATOR
}
h.logger.Warn("group lease acquire failed", "group", groupID, "error", err)
return protocol.REQUEST_TIMED_OUT
}

// acquirePartitionLeases acquires leases for all partitions in the request
// concurrently. Returns a map of partition -> error for partitions that failed.
// Partitions already owned by this broker complete instantly (map lookup).
Expand Down Expand Up @@ -2097,9 +2195,15 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot
health := broker.NewS3HealthMonitor(s3HealthConfigFromEnv())
authorizer := buildAuthorizerFromEnv(logger)
var leaseManager *metadata.PartitionLeaseManager
var groupLeaseManager *metadata.GroupLeaseManager
if etcdStore, ok := store.(*metadata.EtcdStore); ok {
brokerIDStr := fmt.Sprintf("%d", brokerInfo.NodeID)
leaseManager = metadata.NewPartitionLeaseManager(etcdStore.EtcdClient(), metadata.PartitionLeaseConfig{
BrokerID: fmt.Sprintf("%d", brokerInfo.NodeID),
BrokerID: brokerIDStr,
Logger: logger,
})
groupLeaseManager = metadata.NewGroupLeaseManager(etcdStore.EtcdClient(), metadata.GroupLeaseConfig{
BrokerID: brokerIDStr,
Logger: logger,
})
}
Expand All @@ -2123,6 +2227,7 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot
},
coordinator: broker.NewGroupCoordinator(store, brokerInfo, nil),
leaseManager: leaseManager,
groupLeaseManager: groupLeaseManager,
s3Health: health,
s3Namespace: s3Namespace,
brokerInfo: brokerInfo,
Expand Down Expand Up @@ -2228,6 +2333,9 @@ func main() {
if handler.leaseManager != nil {
handler.leaseManager.ReleaseAll()
}
if handler.groupLeaseManager != nil {
handler.groupLeaseManager.ReleaseAll()
}
srv.Wait()
}

Expand Down
115 changes: 115 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type proxy struct {
cachedBackends []string
apiVersions []protocol.ApiVersion
router *metadata.PartitionRouter
groupRouter *metadata.GroupRouter
brokerAddrMu sync.RWMutex
brokerAddrs map[string]string // brokerID -> "host:port"
backendRetries int
Expand Down Expand Up @@ -123,6 +124,13 @@ func main() {
p.router = router
logger.Info("partition-aware routing enabled")
}
groupRouter, err := metadata.NewGroupRouter(ctx, etcdStore.EtcdClient(), logger)
if err != nil {
logger.Warn("group router init failed; using round-robin routing for group ops", "error", err)
} else {
p.groupRouter = groupRouter
logger.Info("group-aware routing enabled")
}
}
if len(backends) > 0 {
p.setCachedBackends(backends)
Expand All @@ -140,6 +148,9 @@ func main() {
if p.router != nil {
p.router.Stop()
}
if p.groupRouter != nil {
p.groupRouter.Stop()
}
}

func envOrDefault(key, fallback string) string {
Expand Down Expand Up @@ -459,6 +470,24 @@ func (p *proxy) handleConnection(ctx context.Context, conn net.Conn) {
return
}
continue
case protocol.APIKeyJoinGroup,
protocol.APIKeySyncGroup,
protocol.APIKeyHeartbeat,
protocol.APIKeyLeaveGroup,
protocol.APIKeyOffsetCommit,
protocol.APIKeyOffsetFetch,
protocol.APIKeyDescribeGroups:
resp, err := p.handleGroupRouting(ctx, header, frame.Payload, pool)
if err != nil {
p.logger.Warn("group routing failed", "error", err)
p.respondBackendError(conn, header, frame.Payload)
return
}
if err := protocol.WriteFrame(conn, resp); err != nil {
p.logger.Warn("write group response failed", "error", err)
return
}
continue
default:
}

Expand Down Expand Up @@ -1499,3 +1528,89 @@ func (p *proxy) forwardToBackend(ctx context.Context, conn net.Conn, backendAddr
}
return frame.Payload, nil
}

// handleGroupRouting routes group-related requests to the broker that owns the
// group coordination lease. If no owner is cached, or the owner returns
// NOT_COORDINATOR, the request is retried on a different broker.
//
// DescribeGroups requests are forwarded once without retry since different
// groups may live on different brokers. The broker returns per-group
// NOT_COORDINATOR errors that the Kafka client handles natively.
func (p *proxy) handleGroupRouting(ctx context.Context, header *protocol.RequestHeader, payload []byte, pool *connPool) ([]byte, error) {
groupID := p.extractGroupID(header.APIKey, payload)

// DescribeGroups with multiple groups cannot be reliably split/retried at
// the proxy level. Forward once and let the client handle per-group errors.
maxAttempts := 3
if header.APIKey == protocol.APIKeyDescribeGroups {
maxAttempts = 1
}

triedBackends := make(map[string]bool)

for attempt := 0; attempt < maxAttempts; attempt++ {
targetAddr := ""
if p.groupRouter != nil && groupID != "" {
if ownerID := p.groupRouter.LookupOwner(groupID); ownerID != "" {
targetAddr = p.brokerIDToAddr(ownerID)
}
}

conn, actualAddr, err := p.connectForAddr(ctx, targetAddr, triedBackends, pool)
if err != nil {
continue
}
triedBackends[actualAddr] = true

resp, err := p.forwardToBackend(ctx, conn, actualAddr, payload)
if err != nil {
conn.Close()
continue
}

if p.groupRouter != nil && groupID != "" {
if ec, ok := protocol.GroupResponseErrorCode(header.APIKey, header.APIVersion, resp); ok && ec == protocol.NOT_COORDINATOR {
pool.Return(actualAddr, conn)
p.groupRouter.Invalidate(groupID)
p.logger.Debug("NOT_COORDINATOR, retrying group request",
"group", groupID, "attempt", attempt+1, "broker", actualAddr)
continue
}
}

pool.Return(actualAddr, conn)
return resp, nil
}

return nil, fmt.Errorf("group request for %q failed after %d attempts", groupID, maxAttempts)
}

// extractGroupID parses the request payload to extract the group ID for routing.
// Returns "" if the group ID cannot be determined.
func (p *proxy) extractGroupID(apiKey int16, payload []byte) string {
_, req, err := protocol.ParseRequest(payload)
if err != nil {
return ""
}
switch r := req.(type) {
case *protocol.JoinGroupRequest:
return r.GroupID
case *protocol.SyncGroupRequest:
return r.GroupID
case *protocol.HeartbeatRequest:
return r.GroupID
case *protocol.LeaveGroupRequest:
return r.GroupID
case *protocol.OffsetCommitRequest:
return r.GroupID
case *protocol.OffsetFetchRequest:
return r.GroupID
case *protocol.DescribeGroupsRequest:
if len(r.Groups) > 0 {
return r.Groups[0]
}
return ""
default:
return ""
}
}
Loading