From b584c3ce3ee813156e39c8fc8f51cebb85dc7936 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Thu, 28 Jan 2016 14:14:14 -0500 Subject: [PATCH 1/4] cleanup code --- lib/logstash/codecs/multiline.rb | 94 +++++++++++++++----------------- 1 file changed, 45 insertions(+), 49 deletions(-) diff --git a/lib/logstash/codecs/multiline.rb b/lib/logstash/codecs/multiline.rb index 5a28b90..7eb76d3 100644 --- a/lib/logstash/codecs/multiline.rb +++ b/lib/logstash/codecs/multiline.rb @@ -4,6 +4,10 @@ require "logstash/timestamp" require "logstash/codecs/auto_flush" +require "grok-pure" +require 'logstash/patterns/core' +require "logstash/util/buftok" + # The multiline codec will collapse multiline messages and merge them into a # single event. # @@ -132,19 +136,9 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base # auto_flush_interval. No default. If unset, no auto_flush. Units: seconds config :auto_flush_interval, :validate => :number - public - def register - require "grok-pure" # rubygem 'jls-grok' - require 'logstash/patterns/core' - - # Detect if we are running from a jarfile, pick the right path. - patterns_path = [] - patterns_path += [LogStash::Patterns::Core.path] - @grok = Grok.new - - @patterns_dir = patterns_path.to_a + @patterns_dir + @patterns_dir = [LogStash::Patterns::Core.path] + @patterns_dir @patterns_dir.each do |path| if ::File.directory?(path) path = ::File.join(path, "*") @@ -169,33 +163,23 @@ def register # will start on first decode @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval) end - end # def register - - def accept(listener) - # memoize references to listener that holds upstream state - @previous_listener = @last_seen_listener || listener - @last_seen_listener = listener - decode(listener.data) do |event| - what_based_listener.process_event(event) - end end def decode(text, &block) text = @converter.convert(text) text.split("\n").each do |line| match = @grok.match(line) - @logger.debug("Multiline", :pattern => @pattern, :text => line, - :match => !match.nil?, :negate => @negate) + @logger.debug("Multiline", :pattern => @pattern, :text => line, :match => !match.nil?, :negate => @negate) # Add negate option match = (match and !@negate) || (!match and @negate) @handler.call(line, match, &block) end - end # def decode + end - def buffer(text) - @buffer_bytes += text.bytesize - @buffer.push(text) + def encode(event) + # Nothing to do. + @on_event.call(event, event) end def flush(&block) @@ -215,12 +199,46 @@ def flush(&block) end end + def close + if auto_flush_runner.pending? + #will cancel task if necessary + auto_flush_runner.stop + end + auto_flush + end + + def accept(listener) + # memoize references to listener that holds upstream state + @previous_listener = @last_seen_listener || listener + @last_seen_listener = listener + decode(listener.data) do |event| + what_based_listener.process_event(event) + end + end + + def buffer(text) + @buffer_bytes += text.bytesize + @buffer.push(text) + end + + def reset_buffer + @buffer = [] + @buffer_bytes = 0 + end + def auto_flush flush do |event| @last_seen_listener.process_event(event) end end + # TODO: (colin) auto_flush_active? doesn't seem to be used anywhere. any reason to keep this api? + def auto_flush_active? + !@auto_flush_interval.nil? + end + + private + def merge_events event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL)) event.tag @multiline_tag if @multiline_tag && @buffer.size > 1 @@ -229,11 +247,6 @@ def merge_events event end - def reset_buffer - @buffer = [] - @buffer_bytes = 0 - end - def doing_previous? @what == "previous" end @@ -266,24 +279,7 @@ def buffer_over_limits? over_maximum_lines? || over_maximum_bytes? end - def encode(event) - # Nothing to do. - @on_event.call(event, event) - end # def encode - - def close - if auto_flush_runner.pending? - #will cancel task if necessary - auto_flush_runner.stop - end - auto_flush - end - - def auto_flush_active? - !@auto_flush_interval.nil? - end - def auto_flush_runner @auto_flush_runner || AutoFlushUnset.new(nil, nil) end -end end end # class LogStash::Codecs::Multiline +end end end From e9d6d304f826e9e2c59887a1786f77b832054562 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Thu, 28 Jan 2016 14:14:53 -0500 Subject: [PATCH 2/4] add partial reads support spec --- spec/codecs/multiline_spec.rb | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/spec/codecs/multiline_spec.rb b/spec/codecs/multiline_spec.rb index a046f49..a79828f 100644 --- a/spec/codecs/multiline_spec.rb +++ b/spec/codecs/multiline_spec.rb @@ -58,6 +58,18 @@ expect(events[1]["message"]).to eq "0987654321" end + it "should handle message continuation across decode calls (i.e. use buftok)" do + config.update("pattern" => '\D', "what" => "previous") + lineio = StringIO.new("1234567890\nA234567890\nB234567890\n0987654321\n") + until lineio.eof + line = lineio.read(5) + codec.decode(line) {|evt| events.push(evt)} + end + codec.flush { |e| events << e } + expect(events[0]["message"]).to eq "1234567890\nA234567890\nB234567890" + expect(events[1]["message"]).to eq "0987654321" + end + it "should allow grok patterns to be used" do config.update( "pattern" => "^%{NUMBER} %{TIME}", From 4a4dbbeb219f7cbf2f753a67feff2a39d094ca53 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Thu, 28 Jan 2016 17:31:11 -0500 Subject: [PATCH 3/4] support buftok for proper delimited input extraction --- lib/logstash/codecs/multiline.rb | 122 ++++++++++++++++++++++--------- spec/codecs/multiline_spec.rb | 33 ++++++--- 2 files changed, 110 insertions(+), 45 deletions(-) diff --git a/lib/logstash/codecs/multiline.rb b/lib/logstash/codecs/multiline.rb index 7eb76d3..875bd10 100644 --- a/lib/logstash/codecs/multiline.rb +++ b/lib/logstash/codecs/multiline.rb @@ -115,6 +115,9 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base # This only affects "plain" format logs since JSON is `UTF-8` already. config :charset, :validate => ::Encoding.name_list, :default => "UTF-8" + # Change the delimiter that separates lines + config :delimiter, :validate => :string, :default => "\n" + # Tag multiline events with a given tag. This tag will only be added # to events that actually have multiple lines in them. config :multiline_tag, :validate => :string, :default => "multiline" @@ -137,6 +140,7 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base config :auto_flush_interval, :validate => :number def register + @tokenizer = FileWatch::BufferedTokenizer.new(@delimiter) @grok = Grok.new @patterns_dir = [LogStash::Patterns::Core.path] + @patterns_dir @patterns_dir.each do |path| @@ -159,21 +163,28 @@ def register @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger + + # TODO: (colin) I don't really understand this @last_seen_listener poutine. I needed to create + # this lamda to DRY across the close & auto_flush methods to pass the closure to the new + # @tokenizer which I figured needs to be flushed upon close. there does not seem to be + # explicit tests for this closing logic. + # what is not clear here is the initialization of @last_seen_listener which gets initialized + # in the accept method but the close method systematically call auto_flush which assumes the + # existence of @last_seen_listener. this whole logic is confusing and should be either made + # more explicit and self documenting OR documentation should be prodided. + @auto_flush_block = lambda do |event| + @last_seen_listener.process_event(event) + end + if @auto_flush_interval # will start on first decode @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval) end end - def decode(text, &block) - text = @converter.convert(text) - text.split("\n").each do |line| - match = @grok.match(line) - @logger.debug("Multiline", :pattern => @pattern, :text => line, :match => !match.nil?, :negate => @negate) - - # Add negate option - match = (match and !@negate) || (!match and @negate) - @handler.call(line, match, &block) + def decode(data, &block) + @tokenizer.extract(data.force_encoding("ASCII-8BIT")).each do |line| + match_line(@converter.convert(line), &block) end end @@ -182,31 +193,35 @@ def encode(event) @on_event.call(event, event) end + # this is the termination or final flush API + # + # @param block [Proc] the closure that will be called for all events that need to be flushed. def flush(&block) - if block_given? && @buffer.any? - no_error = true - events = merge_events - begin - yield events - rescue ::Exception => e - # need to rescue everything - # likliest cause: backpressure or timeout by exception - # can't really do anything but leave the data in the buffer for next time if there is one - @logger.error("Multiline: flush downstream error", :exception => e) - no_error = false - end - reset_buffer if no_error - end + remainder = @tokenizer.flush + match_line(@converter.convert(remainder), &block) unless remainder.empty? + flush_buffered_events(&block) end + # TODO: (colin) I believe there is a problem here in calling auto_flush. auto_flush depends on + # @auto_flush_block which references @last_seen_listener which is only initialized in the context of + # IdentityMapCodec which in turn I believe cannot by assumed. the multiline codec could run without + # IdentityMapCodec. def close if auto_flush_runner.pending? #will cancel task if necessary auto_flush_runner.stop end + + remainder = @tokenizer.flush + match_line(@converter.convert(remainder), &@auto_flush_block) unless remainder.empty? + auto_flush end + # TODO: (colin) what is the pupose of this accept method? AFAICT it is only used if this codec is used + # within the IdentityMapCodec. it is not clear when & in which context this method is used. + # I believe the codec should still be able to live outside the context of an IdentityMapCodec but there + # are usage of ivars like @last_seen_listener only inititalized int the context of IdentityMapCodec. def accept(listener) # memoize references to listener that holds upstream state @previous_listener = @last_seen_listener || listener @@ -216,9 +231,9 @@ def accept(listener) end end - def buffer(text) - @buffer_bytes += text.bytesize - @buffer.push(text) + def buffer(line) + @buffer_bytes += line.bytesize + @buffer.push(line) end def reset_buffer @@ -226,10 +241,11 @@ def reset_buffer @buffer_bytes = 0 end + # TODO: (colin) this method is not clearly documented as being required by the AutoFlush class & tasks. + # I belive there is a problem here with the usage of @auto_flush_block which assumes to be in the + # context of an IdentityMapCodec but the multiline codec could run without IdentityMapCodec def auto_flush - flush do |event| - @last_seen_listener.process_event(event) - end + flush_buffered_events(&@auto_flush_block) end # TODO: (colin) auto_flush_active? doesn't seem to be used anywhere. any reason to keep this api? @@ -237,7 +253,41 @@ def auto_flush_active? !@auto_flush_interval.nil? end - private + # private + + # merge all currently bufferred events and call the passed block for the resulting merged event + # + # @param block [Proc] the closure that will be called for the resulting merged event + def flush_buffered_events(&block) + if block_given? && @buffer.any? + no_error = true + event = merge_events + begin + yield event + rescue ::Exception => e + # need to rescue everything + # likliest cause: backpressure or timeout by exception + # can't really do anything but leave the data in the buffer for next time if there is one + @logger.error("Multiline: buffered events flush downstream error", :exception => e) + no_error = false + end + reset_buffer if no_error + end + end + + # evalutate if a given line matches the configured pattern and call the appropriate do_next or do_previous + # handler given the match state. + # + # @param line [String] the string to match against the pattern + # @param block [Proc] the closure that will be called for each event that might result after processing this line + def match_line(line, &block) + match = @grok.match(line) + @logger.debug? && @logger.debug("Multiline", :pattern => @pattern, :line => line, :match => !match.nil?, :negate => @negate) + + # Add negate option + match = (match and !@negate) || (!match and @negate) + @handler.call(line, match, &block) + end def merge_events event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL)) @@ -255,16 +305,16 @@ def what_based_listener doing_previous? ? @previous_listener : @last_seen_listener end - def do_next(text, matched, &block) - buffer(text) + def do_next(line, matched, &block) + buffer(line) auto_flush_runner.start - flush(&block) if !matched || buffer_over_limits? + flush_buffered_events(&block) if !matched || buffer_over_limits? end - def do_previous(text, matched, &block) - flush(&block) if !matched || buffer_over_limits? + def do_previous(line, matched, &block) + flush_buffered_events(&block) if !matched || buffer_over_limits? auto_flush_runner.start - buffer(text) + buffer(line) end def over_maximum_lines? diff --git a/spec/codecs/multiline_spec.rb b/spec/codecs/multiline_spec.rb index a79828f..43c0782 100644 --- a/spec/codecs/multiline_spec.rb +++ b/spec/codecs/multiline_spec.rb @@ -15,6 +15,7 @@ let(:line_producer) do lambda do |lines| lines.each do |line| + line = "#{line}\n" codec.decode(line) do |event| events << event end @@ -92,6 +93,7 @@ config.update("pattern" => "^\\s", "what" => "previous") lines = [ "foobar", "κόσμε" ] lines.each do |line| + line = "#{line}\n" expect(line.encoding.name).to eq "UTF-8" expect(line.valid_encoding?).to be_truthy codec.decode(line) { |event| events << event } @@ -109,6 +111,7 @@ config.update("pattern" => "^\\s", "what" => "previous") lines = [ "foo \xED\xB9\x81\xC3", "bar \xAD" ] lines.each do |line| + line = "#{line}\n" expect(line.encoding.name).to eq "UTF-8" expect(line.valid_encoding?).to eq false @@ -136,6 +139,7 @@ # lines = [ "foo \xED\xB9\x81\xC3", "bar \xAD" ] samples.map{|(a, b)| a.force_encoding("ISO-8859-1")}.each do |line| + line = "#{line}\n".force_encoding("ISO-8859-1") expect(line.encoding.name).to eq "ISO-8859-1" expect(line.valid_encoding?).to eq true @@ -162,6 +166,8 @@ ] events = [] samples.map{|(a, b)| a.force_encoding("ASCII-8BIT")}.each do |line| + line = "#{line}\n" + line.force_encoding("ASCII-8BIT") expect(line.encoding.name).to eq "ASCII-8BIT" expect(line.valid_encoding?).to eq true @@ -182,7 +188,7 @@ context "with non closed multiline events" do let(:random_number_of_events) { rand(300..1000) } - let(:sample_event) { "- Sample event" } + let(:sample_event) { "- Sample event\n" } let(:events) { decode_events } let(:unmerged_events_count) { events.collect { |event| event["message"].split(LogStash::Codecs::Multiline::NL).size }.inject(&:+) } @@ -232,9 +238,11 @@ let(:codec) { Mlc::MultilineRspec.new(config).tap {|c| c.register} } let(:events) { [] } let(:lines) do - { "en.log" => ["hello world", " second line", " third line"], + { + "en.log" => ["hello world", " second line", " third line"], "fr.log" => ["Salut le Monde", " deuxième ligne", " troisième ligne"], - "de.log" => ["Hallo Welt"] } + "de.log" => ["Hallo Welt"] + } end let(:listener_class) { Mlc::LineListener } let(:auto_flush_interval) { 0.5 } @@ -244,7 +252,7 @@ #create a listener that holds upstream state listener = listener_class.new(events, codec, path) lines[path].each do |data| - listener.accept(data) + listener.accept("#{data}\n") end end end @@ -264,13 +272,17 @@ let(:listener_class) { Mlc::LineErrorListener } it "does not build any events, logs an error and the buffer data remains" do - config.update("pattern" => "^\\s", "what" => "previous", - "auto_flush_interval" => auto_flush_interval) + config.update( + "pattern" => "^\\s", + "what" => "previous", + "auto_flush_interval" => auto_flush_interval + ) + codec.logger = Mlc::MultilineLogTracer.new line_producer.call("en.log") sleep(auto_flush_interval + 0.1) msg, args = codec.logger.trace_for(:error) - expect(msg).to eq("Multiline: flush downstream error") + expect(msg).to eq("Multiline: buffered events flush downstream error") expect(args[:exception].message).to eq(errmsg) expect(events.size).to eq(0) expect(codec.buffer_size).to eq(3) @@ -286,8 +298,11 @@ def assert_produced_events(key, sleeping) context "mode: previous, when there are pauses between multiline file writes" do it "auto-flushes events from the accumulated lines to the queue" do - config.update("pattern" => "^\\s", "what" => "previous", - "auto_flush_interval" => auto_flush_interval) + config.update( + "pattern" => "^\\s", + "what" => "previous", + "auto_flush_interval" => auto_flush_interval + ) assert_produced_events("en.log", auto_flush_interval + 0.1) do expect(events[0]).to match_path_and_line("en.log", lines["en.log"]) From 70ea5c335a6b58001563fa424998db4e74aeebf6 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 1 Feb 2016 20:00:09 -0500 Subject: [PATCH 4/4] revert to original assumption of line-based input and make that optional --- lib/logstash/codecs/multiline.rb | 6 ++++++ spec/codecs/multiline_spec.rb | 16 +++++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/logstash/codecs/multiline.rb b/lib/logstash/codecs/multiline.rb index 875bd10..503b351 100644 --- a/lib/logstash/codecs/multiline.rb +++ b/lib/logstash/codecs/multiline.rb @@ -118,6 +118,10 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base # Change the delimiter that separates lines config :delimiter, :validate => :string, :default => "\n" + # Assume data received from input plugin as line based. For some input plugins + # like stdin or tcp/udp data is not line based and this option should be set to false. + config :line_based_input, :validate => :boolean, :default => true + # Tag multiline events with a given tag. This tag will only be added # to events that actually have multiple lines in them. config :multiline_tag, :validate => :string, :default => "multiline" @@ -183,6 +187,8 @@ def register end def decode(data, &block) + data = data + @delimiter if @line_based_input + @tokenizer.extract(data.force_encoding("ASCII-8BIT")).each do |line| match_line(@converter.convert(line), &block) end diff --git a/spec/codecs/multiline_spec.rb b/spec/codecs/multiline_spec.rb index 43c0782..5058743 100644 --- a/spec/codecs/multiline_spec.rb +++ b/spec/codecs/multiline_spec.rb @@ -15,7 +15,6 @@ let(:line_producer) do lambda do |lines| lines.each do |line| - line = "#{line}\n" codec.decode(line) do |event| events << event end @@ -60,7 +59,11 @@ end it "should handle message continuation across decode calls (i.e. use buftok)" do - config.update("pattern" => '\D', "what" => "previous") + config.update( + "pattern" => '\D', + "what" => "previous", + "line_based_input" => false, + ) lineio = StringIO.new("1234567890\nA234567890\nB234567890\n0987654321\n") until lineio.eof line = lineio.read(5) @@ -93,7 +96,6 @@ config.update("pattern" => "^\\s", "what" => "previous") lines = [ "foobar", "κόσμε" ] lines.each do |line| - line = "#{line}\n" expect(line.encoding.name).to eq "UTF-8" expect(line.valid_encoding?).to be_truthy codec.decode(line) { |event| events << event } @@ -111,7 +113,6 @@ config.update("pattern" => "^\\s", "what" => "previous") lines = [ "foo \xED\xB9\x81\xC3", "bar \xAD" ] lines.each do |line| - line = "#{line}\n" expect(line.encoding.name).to eq "UTF-8" expect(line.valid_encoding?).to eq false @@ -139,7 +140,6 @@ # lines = [ "foo \xED\xB9\x81\xC3", "bar \xAD" ] samples.map{|(a, b)| a.force_encoding("ISO-8859-1")}.each do |line| - line = "#{line}\n".force_encoding("ISO-8859-1") expect(line.encoding.name).to eq "ISO-8859-1" expect(line.valid_encoding?).to eq true @@ -166,8 +166,6 @@ ] events = [] samples.map{|(a, b)| a.force_encoding("ASCII-8BIT")}.each do |line| - line = "#{line}\n" - line.force_encoding("ASCII-8BIT") expect(line.encoding.name).to eq "ASCII-8BIT" expect(line.valid_encoding?).to eq true @@ -188,7 +186,7 @@ context "with non closed multiline events" do let(:random_number_of_events) { rand(300..1000) } - let(:sample_event) { "- Sample event\n" } + let(:sample_event) { "- Sample event" } let(:events) { decode_events } let(:unmerged_events_count) { events.collect { |event| event["message"].split(LogStash::Codecs::Multiline::NL).size }.inject(&:+) } @@ -252,7 +250,7 @@ #create a listener that holds upstream state listener = listener_class.new(events, codec, path) lines[path].each do |data| - listener.accept("#{data}\n") + listener.accept(data) end end end