From 21e0f4b2c12360cff3fc868d63c9442cf80710f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?0aKarmA=5F=E9=AA=85=E6=96=87?= Date: Mon, 7 Aug 2023 17:06:52 +0800 Subject: [PATCH 1/2] feat: add on_confirm listener when publish msg to exchange/queue --- lib/logstash/outputs/rabbitmq.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/logstash/outputs/rabbitmq.rb b/lib/logstash/outputs/rabbitmq.rb index bab0fbc..78249f6 100644 --- a/lib/logstash/outputs/rabbitmq.rb +++ b/lib/logstash/outputs/rabbitmq.rb @@ -72,9 +72,14 @@ def multi_receive_encoded(events_and_data) def publish(event, message) raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange routing_key = event.sprintf(@key) + connect_retry_interval = event.sprintf(@connect_retry_interval * 1000).to_i message_properties = @message_properties_template.build(event) @gated_executor.execute do local_exchange.publish(message, :routing_key => routing_key, :properties => message_properties) + local_got_ack = local_channel.wait_for_confirms(connect_retry_interval) + if !local_got_ack + raise MarchHare::Exception.new('Got an nack message') + end end rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e @logger.error("Error while publishing, will retry", error_details(e, backtrace: true)) @@ -96,6 +101,7 @@ def local_channel channel = @thread_local_channel.get if !channel channel = @hare_info.connection.create_channel + channel.confirm_select @thread_local_channel.set(channel) end channel From 6e80bea3f42bdc40acba0b237dbf5ccf78fe6837 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?0aKarmA=5F=E9=AA=85=E6=96=87?= Date: Mon, 7 Aug 2023 17:29:55 +0800 Subject: [PATCH 2/2] fix: fix bug with formatting connect_retry_interval --- lib/logstash/outputs/rabbitmq.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/rabbitmq.rb b/lib/logstash/outputs/rabbitmq.rb index 78249f6..b18e128 100644 --- a/lib/logstash/outputs/rabbitmq.rb +++ b/lib/logstash/outputs/rabbitmq.rb @@ -72,7 +72,7 @@ def multi_receive_encoded(events_and_data) def publish(event, message) raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange routing_key = event.sprintf(@key) - connect_retry_interval = event.sprintf(@connect_retry_interval * 1000).to_i + connect_retry_interval = event.sprintf(@connect_retry_interval).to_i * 1000 message_properties = @message_properties_template.build(event) @gated_executor.execute do local_exchange.publish(message, :routing_key => routing_key, :properties => message_properties)