diff --git a/lib/logstash/outputs/rabbitmq.rb b/lib/logstash/outputs/rabbitmq.rb index bab0fbc..b18e128 100644 --- a/lib/logstash/outputs/rabbitmq.rb +++ b/lib/logstash/outputs/rabbitmq.rb @@ -72,9 +72,14 @@ def multi_receive_encoded(events_and_data) def publish(event, message) raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange routing_key = event.sprintf(@key) + connect_retry_interval = event.sprintf(@connect_retry_interval).to_i * 1000 message_properties = @message_properties_template.build(event) @gated_executor.execute do local_exchange.publish(message, :routing_key => routing_key, :properties => message_properties) + local_got_ack = local_channel.wait_for_confirms(connect_retry_interval) + if !local_got_ack + raise MarchHare::Exception.new('Got an nack message') + end end rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e @logger.error("Error while publishing, will retry", error_details(e, backtrace: true)) @@ -96,6 +101,7 @@ def local_channel channel = @thread_local_channel.get if !channel channel = @hare_info.connection.create_channel + channel.confirm_select @thread_local_channel.set(channel) end channel