Noticed a surprising difference in behavior between kaf group commit and kafka-consumer-groups.sh while investigating the expiration/deletion of a consumer group in one of our kafka clusters.
When using kaf group commit on a group with protocolType of (Empty), the group's protocolType changes to consumer. Groups of type consumer expire their offsets after offset.retention.ms if no members have joined the group and subscribed. This is important to note because existing groups of type (Empty) are unlikely to ever have a member that will join or subscribe, as this is typically used for consumer groups that manually assign partitions.
I can't completely rule out that this is intended behavior, but it's definitely surprising for someone using kaf as a drop-in replacement for the kafka cli utils. In addition, I think the method used contains a race condition that can cause the kaf to try to commit offsets to a group with active consumers. I'm not sure this will actually function, but it definitely looks like a risk.
Quick steps to reproduce
- Create a consumer group using
kafka-consumer-groups.sh --reset-offsets ...
- Use
kaf group commit to modify that offset
- Observe that the group's
protocolType changes to consumer
Code path
In the resetHandler, kaf commits offsets using a s.GenerationID() from the session, which implies that it has joined the group at that point.
// from `cmd/kaf/group.go`
func (r *resetHandler) Setup(s sarama.ConsumerGroupSession) error {
req := &sarama.OffsetCommitRequest{
Version: 1,
ConsumerGroup: r.group,
ConsumerGroupGeneration: s.GenerationID(),
ConsumerID: s.MemberID(),
}
...
When looking at how this is done in the admin client, we see it use a generationID of -1, which is used as a flag that this is not a real consumer.
Race condition?
- First
kaf uses the admin client is used to check if the group is empty.
// from `cmd/kaf/group.go`
// Verify the Consumer Group is Empty
admin := getClusterAdmin()
groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]})
- Then an unrelated consumer joins the group
- Then
kaf joins the group and commits offsets?
// from `cmd/kaf/group.go`
g, err := sarama.NewConsumerGroupFromClient(group, client)
...
err = g.Consume(context.Background(), []string{topic}, &resetHandler{
...
Possible Fixes
I think the right fix is to use a, currently nonexistent in sarama, admin client api to commit the offsets. I need to look into this a bit more, but wanted to check first. Alternatively, we can directly use a manually constructed coordinator to make a commit and will be rejected if we use generation id -1 and there are active consumers. (removing the need for the admin api check for empty)
Background
We discovered this issue while investigating the expiration of a consumer group that was manually handling the assignment of partitions (there is 1 partition and 1 member in the group). The group was frequently committing offsets, but had not "joined" or "subscribed" because that wasn't needed. The offsets eventually expired and the group was deleted. We were having trouble explaining why this happened at the specific time that it did, and we eventually tracked it down to an operations ticket that used the kaf tool to move its offsets approximately offset.retention.ms before the group was deleted.
Noticed a surprising difference in behavior between
kaf group commitandkafka-consumer-groups.shwhile investigating the expiration/deletion of a consumer group in one of our kafka clusters.When using
kaf group commiton a group withprotocolTypeof(Empty), the group'sprotocolTypechanges toconsumer. Groups of typeconsumerexpire their offsets afteroffset.retention.msif no members have joined the group and subscribed. This is important to note because existing groups of type(Empty)are unlikely to ever have a member that will join or subscribe, as this is typically used for consumer groups that manually assign partitions.I can't completely rule out that this is intended behavior, but it's definitely surprising for someone using
kafas a drop-in replacement for the kafka cli utils. In addition, I think the method used contains a race condition that can cause the kaf to try to commit offsets to a group with active consumers. I'm not sure this will actually function, but it definitely looks like a risk.Quick steps to reproduce
kafka-consumer-groups.sh --reset-offsets ...kaf group committo modify that offsetprotocolTypechanges toconsumerCode path
In the resetHandler, kaf commits offsets using a s.GenerationID() from the session, which implies that it has joined the group at that point.
When looking at how this is done in the admin client, we see it use a generationID of -1, which is used as a flag that this is not a real consumer.
Race condition?
kafuses the admin client is used to check if the group is empty.kafjoins the group and commits offsets?Possible Fixes
I think the right fix is to use a, currently nonexistent in sarama, admin client api to commit the offsets. I need to look into this a bit more, but wanted to check first. Alternatively, we can directly use a manually constructed coordinator to make a commit and will be rejected if we use generation id
-1and there are active consumers. (removing the need for the admin api check for empty)Background
We discovered this issue while investigating the expiration of a consumer group that was manually handling the assignment of partitions (there is 1 partition and 1 member in the group). The group was frequently committing offsets, but had not "joined" or "subscribed" because that wasn't needed. The offsets eventually expired and the group was deleted. We were having trouble explaining why this happened at the specific time that it did, and we eventually tracked it down to an operations ticket that used the
kaftool to move its offsets approximatelyoffset.retention.msbefore the group was deleted.