-
Notifications
You must be signed in to change notification settings - Fork 0
Add channel error callback to Subscription #106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Introduced an on_error callback in Twingly::AMQP::Subscription to handle channel errors, with a default behavior of raising an exception if not set. Updated tests to cover the new callback and changed the Bunny dependency to require version >= 2.23.0. In 2.23.0 https://github.com/ruby-amqp/bunny/releases/tag/2.23.0 `#on_error` is invoked when channel closes because of a delivery acknowledgement timeout
|
I used the following code to experiment locally. Steps:
Diffdiff --git a/delivery_ack_test.rb b/delivery_ack_test.rb
new file mode 100644
index 0000000..ad9a42f
--- /dev/null
+++ b/delivery_ack_test.rb
@@ -0,0 +1,46 @@
+require "logger"
+
+require_relative "lib/twingly/amqp"
+
+class Subscriber
+ def initialize # rubocop:disable Metrics/MethodLength
+ @logger = Logger.new(STDOUT)
+ @subscription = Twingly::AMQP::Subscription.new(queue_name: "jobs", consumer_threads: 2)
+
+ @subscription.on_exception do |exception|
+ @logger.error("on_exception callback: #{exception}")
+ end
+
+ @subscription.on_error do |channel, channel_error|
+ @logger.error("on_error callback: channel: #{channel}")
+ @logger.error("on_error callback: channel_error: #{channel_error.inspect}")
+ end
+ end
+
+ def start(blocking: true) # rubocop:disable Metrics/MethodLength
+ @subscription.each_message(blocking:) do |message|
+ job = message.payload
+
+ @logger.info("Received job: #{job.inspect}")
+ handle_job(job)
+ @logger.info("Finished processing job: #{job.inspect}")
+
+ ack(message)
+ rescue StandardError => e
+ @logger.error("rescue StandardError: #{e}")
+
+ ack(message)
+ end
+ end
+
+ def handle_job(job)
+ raise "Error in job" if job[:type] == "fail"
+ sleep 120 if job[:type] == "sleep"
+ end
+
+ def ack(message)
+ message.ack
+ end
+end
+
+Subscriber.new.start
diff --git a/docker-compose.yml b/docker-compose.yml
index 7cb578f..0461233 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -2,6 +2,9 @@ version: "3.8"
services:
rabbitmq:
- image: rabbitmq:3.10
+ image: rabbitmq:4.0.8-management
ports:
- 5672:5672
+ - 15672:15672
+ volumes:
+ - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
diff --git a/rabbitmq.conf b/rabbitmq.conf
new file mode 100644
index 0000000..cc7bf05
--- /dev/null
+++ b/rabbitmq.conf
@@ -0,0 +1,2 @@
+# One minute in milliseconds
+consumer_timeout = 60000 |
https: //rspec.info/features/3-13/rspec-expectations/built-in-matchers/yield/ Co-Authored-By: Mattias Roback <2803721+roback@users.noreply.github.com>
roback
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Too bad it's a bit hard to test this for real. The tests here are at least better than no tests at all, and I couldn't come up with anything better either.
Yeah that's unfortunate, I spent a lot of time trying to mock and send Close frames/methods to the Channel and Session but I settled with this 😞 |
cequele
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 🐰
Because of the new feature that was added in #106
Introduced an on_error callback in Twingly::AMQP::Subscription to handle channel errors, with a default behaviour of raising an exception if not set. Updated tests to cover the new callback and changed the Bunny dependency to require version >= 2.23.0.
In 2.23.0 https://github.com/ruby-amqp/bunny/releases/tag/2.23.0
#on_erroris invoked when channel closes because of a delivery acknowledgement timeout