Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 97 additions & 4 deletions cmd/kaf/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var groupDeleteCmd = &cobra.Command{
ValidArgsFunction: validGroupArgs,
Run: func(cmd *cobra.Command, args []string) {
admin := getClusterAdmin()
defer admin.Close()
var group string
if len(args) == 1 {
group = args[0]
Expand Down Expand Up @@ -151,6 +152,7 @@ func createGroupCommitOffsetCmd() *cobra.Command {
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client := getClient()
admin := getClusterAdmin()

group := args[0]
partitionOffsets := make(map[int32]int64)
Expand All @@ -163,7 +165,6 @@ func createGroupCommitOffsetCmd() *cobra.Command {
var partitions []int32
if allPartitions {
// Determine partitions
admin := getClusterAdmin()
topicDetails, err := admin.DescribeTopics([]string{topic})
if err != nil {
errorExit("Unable to determine partitions of topic: %v\n", err)
Expand Down Expand Up @@ -238,7 +239,6 @@ func createGroupCommitOffsetCmd() *cobra.Command {
}

// Verify the Consumer Group is Empty
admin := getClusterAdmin()
groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]})
if err != nil {
errorExit("Unable to describe consumer groups: %v\n", err)
Expand Down Expand Up @@ -303,6 +303,7 @@ var groupLsCmd = &cobra.Command{
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
admin := getClusterAdmin()
defer admin.Close()

groups, err := admin.ListConsumerGroups()
if err != nil {
Expand Down Expand Up @@ -354,6 +355,7 @@ var groupPeekCmd = &cobra.Command{
ValidArgsFunction: validGroupArgs,
Run: func(cmd *cobra.Command, args []string) {
admin := getClusterAdmin()
defer admin.Close()

groups, err := admin.DescribeConsumerGroups([]string{args[0]})
if err != nil {
Expand Down Expand Up @@ -504,6 +506,9 @@ var groupDescribeCmd = &cobra.Command{
}
sort.Strings(topics)

// Batch fetch all high watermarks at once - major performance optimization!
allHighWatermarks := getBatchHighWatermarks(admin, offsetAndMetadata.Blocks, flagDescribeTopics)

for _, topic := range topics {
partitions := offsetAndMetadata.Blocks[topic]
if len(flagDescribeTopics) > 0 {
Expand Down Expand Up @@ -532,7 +537,7 @@ var groupDescribeCmd = &cobra.Command{
return p[i] < p[j]
})

wms := getHighWatermarks(topic, p)
wms := allHighWatermarks[topic]

lagSum := 0
offsetSum := 0
Expand Down Expand Up @@ -606,7 +611,95 @@ var groupDescribeCmd = &cobra.Command{
},
}

func getHighWatermarks(topic string, partitions []int32) (watermarks map[int32]int64) {
// getBatchHighWatermarks fetches high watermarks for all topics and partitions in a single batch operation
func getBatchHighWatermarks(admin sarama.ClusterAdmin, offsetBlocks map[string]map[int32]*sarama.OffsetFetchResponseBlock, filterTopics []string) map[string]map[int32]int64 {
client := getClient()
defer client.Close()

// Organize all partition requests by broker leader
leaderPartitions := make(map[*sarama.Broker]map[string][]int32)

for topic, partitions := range offsetBlocks {
// Skip topics not in filter if filter is specified
if len(filterTopics) > 0 {
found := false
for _, filterTopic := range filterTopics {
if topic == filterTopic {
found = true
break
}
}
if !found {
continue
}
}

for partition := range partitions {
leader, err := client.Leader(topic, partition)
if err != nil {
errorExit("Unable to get leader for topic %s partition %d: %v", topic, partition, err)
}

if leaderPartitions[leader] == nil {
leaderPartitions[leader] = make(map[string][]int32)
}
leaderPartitions[leader][topic] = append(leaderPartitions[leader][topic], partition)
}
}

// Fetch offsets from all brokers in parallel
wg := sync.WaitGroup{}
wg.Add(len(leaderPartitions))
results := make(chan map[string]map[int32]int64, len(leaderPartitions))

for leader, topicPartitions := range leaderPartitions {
go func(leader *sarama.Broker, topicPartitions map[string][]int32) {
defer wg.Done()

req := &sarama.OffsetRequest{Version: int16(1)}
for topic, partitions := range topicPartitions {
for _, partition := range partitions {
req.AddBlock(topic, partition, int64(-1), int32(0))
}
}

resp, err := leader.GetAvailableOffsets(req)
if err != nil {
errorExit("Unable to get available offsets from broker: %v", err)
}

brokerResults := make(map[string]map[int32]int64)
for topic, blocks := range resp.Blocks {
brokerResults[topic] = make(map[int32]int64)
for partition, block := range blocks {
brokerResults[topic][partition] = block.Offset
}
}

results <- brokerResults
}(leader, topicPartitions)
}

wg.Wait()
close(results)

// Combine results from all brokers
allWatermarks := make(map[string]map[int32]int64)
for brokerResults := range results {
for topic, partitionOffsets := range brokerResults {
if allWatermarks[topic] == nil {
allWatermarks[topic] = make(map[int32]int64)
}
for partition, offset := range partitionOffsets {
allWatermarks[topic][partition] = offset
}
}
}

return allWatermarks
}

func getHighWatermarks(admin sarama.ClusterAdmin, topic string, partitions []int32) (watermarks map[int32]int64) {
client := getClient()
leaders := make(map[*sarama.Broker][]int32)

Expand Down
1 change: 1 addition & 0 deletions cmd/kaf/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var nodeLsCommand = &cobra.Command{
Short: "List nodes in a cluster",
Run: func(cmd *cobra.Command, args []string) {
admin := getClusterAdmin()
defer admin.Close()

brokers, ctlID, err := admin.DescribeCluster()
if err != nil {
Expand Down
80 changes: 49 additions & 31 deletions cmd/kaf/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ var describeTopicCmd = &cobra.Command{
for _, partition := range detail.Partitions {
partitions = append(partitions, partition.ID)
}
highWatermarks := getHighWatermarks(args[0], partitions)
highWatermarks := getHighWatermarks(admin, args[0], partitions)
highWatermarksSum := 0

for _, partition := range detail.Partitions {
Expand Down Expand Up @@ -361,6 +361,7 @@ var deleteTopicCmd = &cobra.Command{
ValidArgsFunction: validTopicArgs,
Run: func(cmd *cobra.Command, args []string) {
admin := getClusterAdmin()
defer admin.Close()

topicName := args[0]
err := admin.DeleteTopic(topicName)
Expand Down Expand Up @@ -392,7 +393,7 @@ var lagCmd = &cobra.Command{
for _, partition := range topicDetails[0].Partitions {
partitions = append(partitions, partition.ID)
}
highWatermarks := getHighWatermarks(topic, partitions)
highWatermarks := getHighWatermarks(admin, topic, partitions)

// List all consumer groups
consumerGroups, err := admin.ListConsumerGroups()
Expand All @@ -411,50 +412,50 @@ var lagCmd = &cobra.Command{
errorExit("Unable to describe consumer groups: %v\n", err)
}

// Calculate lag for each group
lagInfo := make(map[string]int64)
// Collect all groups that consume from this topic for batch processing
relevantGroups := make(map[string]map[string][]int32) // groupId -> topic -> partitions
groupStates := make(map[string]string) // To store the state of each group

for _, group := range groupsInfo {
var sum int64
show := false
groupStates[group.GroupId] = group.State
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err != nil || assignment == nil {
continue
}

metadata, err := member.GetMemberMetadata()
if err != nil || metadata == nil {
continue
}

if topicPartitions, exist := assignment.Topics[topic]; exist {
show = true
resp, err := admin.ListConsumerGroupOffsets(group.GroupId, map[string][]int32{topic: topicPartitions})
if err != nil {
fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", group.GroupId, err)
continue
if relevantGroups[group.GroupId] == nil {
relevantGroups[group.GroupId] = make(map[string][]int32)
}
relevantGroups[group.GroupId][topic] = topicPartitions
}
}
}

if blocks, ok := resp.Blocks[topic]; ok {
for pid, block := range blocks {
if hwm, ok := highWatermarks[pid]; ok {
if block.Offset > hwm {
fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, group.GroupId)
} else if block.Offset < 0 {
// Skip partitions with negative offsets
} else {
sum += hwm - block.Offset
}
}
// Batch fetch all consumer group offsets
allGroupOffsets := batchListConsumerGroupOffsets(admin, relevantGroups)

// Calculate lag for each group
lagInfo := make(map[string]int64)
for groupId, offsets := range allGroupOffsets {
var sum int64
if blocks, ok := offsets.Blocks[topic]; ok {
for pid, block := range blocks {
if hwm, ok := highWatermarks[pid]; ok {
if block.Offset > hwm {
fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, groupId)
} else if block.Offset < 0 {
// Skip partitions with negative offsets
} else {
sum += hwm - block.Offset
}
}
}
}

if show && sum >= 0 {
lagInfo[group.GroupId] = sum
groupStates[group.GroupId] = group.State // Store the state of the group

if sum >= 0 {
lagInfo[groupId] = sum
}
}

Expand All @@ -469,3 +470,20 @@ var lagCmd = &cobra.Command{
w.Flush()
},
}

// batchListConsumerGroupOffsets fetches offsets for multiple consumer groups in batched operations
func batchListConsumerGroupOffsets(admin sarama.ClusterAdmin, groupTopicPartitions map[string]map[string][]int32) map[string]*sarama.OffsetFetchResponse {
results := make(map[string]*sarama.OffsetFetchResponse)

// Process each group
for groupId, topicPartitions := range groupTopicPartitions {
resp, err := admin.ListConsumerGroupOffsets(groupId, topicPartitions)
if err != nil {
fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", groupId, err)
continue
}
results[groupId] = resp
}

return results
}