diff --git a/lib/logstash/outputs/rabbitmq.rb b/lib/logstash/outputs/rabbitmq.rb index bab0fbc..feb6aa7 100644 --- a/lib/logstash/outputs/rabbitmq.rb +++ b/lib/logstash/outputs/rabbitmq.rb @@ -138,13 +138,24 @@ def back_pressure_provider_for_connection(march_hare_connection) class MessagePropertiesTemplate ## # Creates a new `MessagePropertiesTemplate` from the provided `template` - # @param template [Hash{Symbol=>Object}] + # @param template [Hash{Symbol=>Object}] def initialize(template) - constant_properties = template.reject { |_,v| templated?(v) } - variable_properties = template.select { |_,v| templated?(v) } - - @constant_properties = normalize(constant_properties).freeze + constant_properties = template.reject { |_,v| templated?(v) } # ein hash von nicht-templated (reject) 'values' wobei templated heißt strings mit {%... + variable_properties = template.select { |_,v| templated?(v) } # der hash von templated 'values @variable_properties = variable_properties + @constant_properties = normalize(constant_properties) + + @variable_headers = nil + + if @constant_properties[:headers] + constant_headers = constant_properties[:headers].reject { |_, v| templated?(v) } + @variable_headers = constant_properties[:headers].select { |_, v| templated?(v) } + @constant_properties[:headers] = constant_headers # overwrite headers with the constant ones + @constant_properties[:headers].freeze + end + @constant_properties.freeze + + end ## @@ -153,17 +164,31 @@ def initialize(template) # @param event [LogStash::Event]: the event with which to populated templated values, if any. # @return [Hash{Symbol=>Object}] a possibly-frozen properties hash for the provided `event`. def build(event) - return @constant_properties if @variable_properties.empty? + return @constant_properties if only_constant? properties = @variable_properties.each_with_object(@constant_properties.dup) do |(k,v), memo| memo.store(k, event.sprintf(v)) end + if !@variable_headers.nil? + variable_headers_transformed = @variable_headers.transform_values {|value| event.sprintf(value)} + properties[:headers] = properties[:headers].merge(variable_headers_transformed) + end + return normalize(properties) end private + ## + # Check wether template contains variable content that needs to be expanded. + # + # @api private + # @return [boolean] + def only_constant?() + return (@variable_properties.empty? & @variable_headers.nil?) + end + ## # Normalize the provided property mapping with respect to the value types the underlying # client expects. diff --git a/spec/outputs/rabbitmq_spec.rb b/spec/outputs/rabbitmq_spec.rb index edec9e7..fde8608 100644 --- a/spec/outputs/rabbitmq_spec.rb +++ b/spec/outputs/rabbitmq_spec.rb @@ -22,6 +22,7 @@ } let(:instance) { klass.new(rabbitmq_settings) } let(:hare_info) { instance.instance_variable_get(:@hare_info) } + let(:headers) { Hash.new } shared_examples 'recovers from exception gracefully' do it 'should execute publish twice due to a retry' do @@ -104,8 +105,40 @@ context 'with message_properties' do let(:rabbitmq_settings) { super().merge("message_properties" => message_properties) } let(:message_properties) { Hash.new } + + context 'with headers' do + + let(:message_properties) { super().merge("headers" => headers) } + let(:headers) { Hash.new } + let(:headers) { super().merge("myheader" => myheader_value) } + + context 'with constant value' do + let(:myheader_value) { "asdf" } + it 'publishes headers with constant-value' do + instance.send(:publish, event, encoded_event) + expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:headers => hash_including("myheader" => "asdf")))) + end + end + + context 'with templated value' do + let(:myheader_value) { "%{[@metadata][priority]}" } + context 'when event expands template value' do + before do + expect(event).to receive(:sprintf).with(myheader_value).and_return("another_value") + end + + it 'publishes with the value extracted from the event' do + instance.send(:publish, event, encoded_event) + expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:headers => hash_including("myheader" => "another_value")))) + end + end + + end + + end + context 'priority' do - let(:message_properties) { super().merge("priority" => priority) } + let(:message_properties) { super().merge("priority" => priority) } context 'as literal Integer value' do let(:priority) { 3 } it 'publishes with the constant-value priority' do @@ -121,7 +154,6 @@ expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:priority => 7))) end end - context 'as template value' do let(:priority) { "%{[@metadata][priority]}" } context 'when event expands template value' do