diff --git a/cmd/kaf/group.go b/cmd/kaf/group.go index 02e8bbd..4942f50 100644 --- a/cmd/kaf/group.go +++ b/cmd/kaf/group.go @@ -84,6 +84,7 @@ var groupDeleteCmd = &cobra.Command{ ValidArgsFunction: validGroupArgs, Run: func(cmd *cobra.Command, args []string) { admin := getClusterAdmin() + defer admin.Close() var group string if len(args) == 1 { group = args[0] @@ -151,6 +152,7 @@ func createGroupCommitOffsetCmd() *cobra.Command { Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { client := getClient() + admin := getClusterAdmin() group := args[0] partitionOffsets := make(map[int32]int64) @@ -163,7 +165,6 @@ func createGroupCommitOffsetCmd() *cobra.Command { var partitions []int32 if allPartitions { // Determine partitions - admin := getClusterAdmin() topicDetails, err := admin.DescribeTopics([]string{topic}) if err != nil { errorExit("Unable to determine partitions of topic: %v\n", err) @@ -238,7 +239,6 @@ func createGroupCommitOffsetCmd() *cobra.Command { } // Verify the Consumer Group is Empty - admin := getClusterAdmin() groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]}) if err != nil { errorExit("Unable to describe consumer groups: %v\n", err) @@ -303,6 +303,7 @@ var groupLsCmd = &cobra.Command{ Args: cobra.NoArgs, Run: func(cmd *cobra.Command, args []string) { admin := getClusterAdmin() + defer admin.Close() groups, err := admin.ListConsumerGroups() if err != nil { @@ -354,6 +355,7 @@ var groupPeekCmd = &cobra.Command{ ValidArgsFunction: validGroupArgs, Run: func(cmd *cobra.Command, args []string) { admin := getClusterAdmin() + defer admin.Close() groups, err := admin.DescribeConsumerGroups([]string{args[0]}) if err != nil { @@ -504,6 +506,9 @@ var groupDescribeCmd = &cobra.Command{ } sort.Strings(topics) + // Batch fetch all high watermarks at once - major performance optimization! + allHighWatermarks := getBatchHighWatermarks(admin, offsetAndMetadata.Blocks, flagDescribeTopics) + for _, topic := range topics { partitions := offsetAndMetadata.Blocks[topic] if len(flagDescribeTopics) > 0 { @@ -532,7 +537,7 @@ var groupDescribeCmd = &cobra.Command{ return p[i] < p[j] }) - wms := getHighWatermarks(topic, p) + wms := allHighWatermarks[topic] lagSum := 0 offsetSum := 0 @@ -606,7 +611,95 @@ var groupDescribeCmd = &cobra.Command{ }, } -func getHighWatermarks(topic string, partitions []int32) (watermarks map[int32]int64) { +// getBatchHighWatermarks fetches high watermarks for all topics and partitions in a single batch operation +func getBatchHighWatermarks(admin sarama.ClusterAdmin, offsetBlocks map[string]map[int32]*sarama.OffsetFetchResponseBlock, filterTopics []string) map[string]map[int32]int64 { + client := getClient() + defer client.Close() + + // Organize all partition requests by broker leader + leaderPartitions := make(map[*sarama.Broker]map[string][]int32) + + for topic, partitions := range offsetBlocks { + // Skip topics not in filter if filter is specified + if len(filterTopics) > 0 { + found := false + for _, filterTopic := range filterTopics { + if topic == filterTopic { + found = true + break + } + } + if !found { + continue + } + } + + for partition := range partitions { + leader, err := client.Leader(topic, partition) + if err != nil { + errorExit("Unable to get leader for topic %s partition %d: %v", topic, partition, err) + } + + if leaderPartitions[leader] == nil { + leaderPartitions[leader] = make(map[string][]int32) + } + leaderPartitions[leader][topic] = append(leaderPartitions[leader][topic], partition) + } + } + + // Fetch offsets from all brokers in parallel + wg := sync.WaitGroup{} + wg.Add(len(leaderPartitions)) + results := make(chan map[string]map[int32]int64, len(leaderPartitions)) + + for leader, topicPartitions := range leaderPartitions { + go func(leader *sarama.Broker, topicPartitions map[string][]int32) { + defer wg.Done() + + req := &sarama.OffsetRequest{Version: int16(1)} + for topic, partitions := range topicPartitions { + for _, partition := range partitions { + req.AddBlock(topic, partition, int64(-1), int32(0)) + } + } + + resp, err := leader.GetAvailableOffsets(req) + if err != nil { + errorExit("Unable to get available offsets from broker: %v", err) + } + + brokerResults := make(map[string]map[int32]int64) + for topic, blocks := range resp.Blocks { + brokerResults[topic] = make(map[int32]int64) + for partition, block := range blocks { + brokerResults[topic][partition] = block.Offset + } + } + + results <- brokerResults + }(leader, topicPartitions) + } + + wg.Wait() + close(results) + + // Combine results from all brokers + allWatermarks := make(map[string]map[int32]int64) + for brokerResults := range results { + for topic, partitionOffsets := range brokerResults { + if allWatermarks[topic] == nil { + allWatermarks[topic] = make(map[int32]int64) + } + for partition, offset := range partitionOffsets { + allWatermarks[topic][partition] = offset + } + } + } + + return allWatermarks +} + +func getHighWatermarks(admin sarama.ClusterAdmin, topic string, partitions []int32) (watermarks map[int32]int64) { client := getClient() leaders := make(map[*sarama.Broker][]int32) diff --git a/cmd/kaf/node.go b/cmd/kaf/node.go index 09e6014..1f63469 100644 --- a/cmd/kaf/node.go +++ b/cmd/kaf/node.go @@ -32,6 +32,7 @@ var nodeLsCommand = &cobra.Command{ Short: "List nodes in a cluster", Run: func(cmd *cobra.Command, args []string) { admin := getClusterAdmin() + defer admin.Close() brokers, ctlID, err := admin.DescribeCluster() if err != nil { diff --git a/cmd/kaf/topic.go b/cmd/kaf/topic.go index d845cec..21767c0 100644 --- a/cmd/kaf/topic.go +++ b/cmd/kaf/topic.go @@ -230,7 +230,7 @@ var describeTopicCmd = &cobra.Command{ for _, partition := range detail.Partitions { partitions = append(partitions, partition.ID) } - highWatermarks := getHighWatermarks(args[0], partitions) + highWatermarks := getHighWatermarks(admin, args[0], partitions) highWatermarksSum := 0 for _, partition := range detail.Partitions { @@ -361,6 +361,7 @@ var deleteTopicCmd = &cobra.Command{ ValidArgsFunction: validTopicArgs, Run: func(cmd *cobra.Command, args []string) { admin := getClusterAdmin() + defer admin.Close() topicName := args[0] err := admin.DeleteTopic(topicName) @@ -392,7 +393,7 @@ var lagCmd = &cobra.Command{ for _, partition := range topicDetails[0].Partitions { partitions = append(partitions, partition.ID) } - highWatermarks := getHighWatermarks(topic, partitions) + highWatermarks := getHighWatermarks(admin, topic, partitions) // List all consumer groups consumerGroups, err := admin.ListConsumerGroups() @@ -411,50 +412,50 @@ var lagCmd = &cobra.Command{ errorExit("Unable to describe consumer groups: %v\n", err) } - // Calculate lag for each group - lagInfo := make(map[string]int64) + // Collect all groups that consume from this topic for batch processing + relevantGroups := make(map[string]map[string][]int32) // groupId -> topic -> partitions groupStates := make(map[string]string) // To store the state of each group + for _, group := range groupsInfo { - var sum int64 - show := false + groupStates[group.GroupId] = group.State for _, member := range group.Members { assignment, err := member.GetMemberAssignment() if err != nil || assignment == nil { continue } - metadata, err := member.GetMemberMetadata() - if err != nil || metadata == nil { - continue - } - if topicPartitions, exist := assignment.Topics[topic]; exist { - show = true - resp, err := admin.ListConsumerGroupOffsets(group.GroupId, map[string][]int32{topic: topicPartitions}) - if err != nil { - fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", group.GroupId, err) - continue + if relevantGroups[group.GroupId] == nil { + relevantGroups[group.GroupId] = make(map[string][]int32) } + relevantGroups[group.GroupId][topic] = topicPartitions + } + } + } - if blocks, ok := resp.Blocks[topic]; ok { - for pid, block := range blocks { - if hwm, ok := highWatermarks[pid]; ok { - if block.Offset > hwm { - fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, group.GroupId) - } else if block.Offset < 0 { - // Skip partitions with negative offsets - } else { - sum += hwm - block.Offset - } - } + // Batch fetch all consumer group offsets + allGroupOffsets := batchListConsumerGroupOffsets(admin, relevantGroups) + + // Calculate lag for each group + lagInfo := make(map[string]int64) + for groupId, offsets := range allGroupOffsets { + var sum int64 + if blocks, ok := offsets.Blocks[topic]; ok { + for pid, block := range blocks { + if hwm, ok := highWatermarks[pid]; ok { + if block.Offset > hwm { + fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, groupId) + } else if block.Offset < 0 { + // Skip partitions with negative offsets + } else { + sum += hwm - block.Offset } } } } - - if show && sum >= 0 { - lagInfo[group.GroupId] = sum - groupStates[group.GroupId] = group.State // Store the state of the group + + if sum >= 0 { + lagInfo[groupId] = sum } } @@ -469,3 +470,20 @@ var lagCmd = &cobra.Command{ w.Flush() }, } + +// batchListConsumerGroupOffsets fetches offsets for multiple consumer groups in batched operations +func batchListConsumerGroupOffsets(admin sarama.ClusterAdmin, groupTopicPartitions map[string]map[string][]int32) map[string]*sarama.OffsetFetchResponse { + results := make(map[string]*sarama.OffsetFetchResponse) + + // Process each group + for groupId, topicPartitions := range groupTopicPartitions { + resp, err := admin.ListConsumerGroupOffsets(groupId, topicPartitions) + if err != nil { + fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", groupId, err) + continue + } + results[groupId] = resp + } + + return results +}