Skip to content

Conversation

@ukdave
Copy link
Collaborator

@ukdave ukdave commented Jun 26, 2025

Currently if the Consumer#each method is called it will loop indefinitely with no way of gracefully stopping it. Calling Consumer#close closes and destroys the kafka handle, but then causes the app to crash an "Invalid memory access" inside the each loop the next time it calls poll.

This PR adds an internal flag to keep track of whether the each loop is running, and adds a Consumer#stop method to stop it. This can be called from, for example, a signal handler to gracefully stop the consumer.

This PR also adds some extra safety by raising an error if an attempt is made to use the consumer after it has been closed instead of crashing with an "Invalid memory access" error.

require "crafka"

consumer = Kafka::Consumer.new({
  "bootstrap.servers"     => "localhost:9092",
  "client.id"             => "test",
  "group.id"              => "test",
  "broker.address.family" => "v4",
})

Signal::INT.trap do
  Log.info { "Received SIGINT" }
  consumer.stop
end

Log.info { "Subscribing to topic..." }
consumer.subscribe("test-topic")

Log.info { "Starting consumer loop..." }
consumer.each do |kafka_event|
  payload = String.new(kafka_event.payload)
  Log.info { "Consumed event: #{payload}" }
end
Log.info { "Exited consumer loop" }

Log.info { "Closing consumer..." }
consumer.close

Log.info { "Bye" }

@ukdave ukdave force-pushed the consumer-each-stop branch from 024ae23 to 13e6580 Compare June 26, 2025 13:56
ukdave added 3 commits June 26, 2025 14:58
Previously if the `Consumer#each` method was called it would loop
indefinitely with no way of gracefully stopping it. Calling
`Consumer#close` would close and destroy the kafka handle, but would
then crash the app with an "Invalid memory access" inside the each loop
the next time it called `poll`.

This commit adds an internal flag to keep track of whether the each loop
is running, and adds a `Consumer#stop` method to stop it. This can be
called from, for example, a signal handler to gracefully stop the
consumer. E.g.:

```
require "crafka"

consumer = Kafka::Consumer.new({
  "bootstrap.servers"     => "localhost:9092",
  "client.id"             => "test",
  "group.id"              => "test",
  "broker.address.family" => "v4",
})

Signal::INT.trap do
  Log.info { "Received SIGINT" }
  consumer.stop
end

Log.info { "Subscribing to topic..." }
consumer.subscribe("test-topic")

Log.info { "Starting consumer loop..." }
consumer.each do |kafka_event|
  payload = String.new(kafka_event.payload)
  Log.info { "Consumed event: #{payload}" }
end
Log.info { "Exited consumer loop" }

Log.info { "Closing consumer..." }
consumer.close

Log.info { "Bye" }
```
@ukdave ukdave force-pushed the consumer-each-stop branch from 13e6580 to 5bd1221 Compare June 26, 2025 13:58
@jphaward jphaward merged commit 5c9a821 into main Jun 26, 2025
12 checks passed
@ukdave ukdave deleted the consumer-each-stop branch June 26, 2025 15:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants