From 477ae71a7ac897dffb88bf21377309eca07eade5 Mon Sep 17 00:00:00 2001 From: piavlo Date: Wed, 8 Feb 2017 21:11:59 +0200 Subject: [PATCH 1/4] new kafka gem + consul discovery --- bin/tailf2kafka | 59 ++++++++++++++++++++++++++++---------- lib/tailf2kafka/version.rb | 2 +- tailf2kafka.gemspec | 3 +- 3 files changed, 47 insertions(+), 17 deletions(-) diff --git a/bin/tailf2kafka b/bin/tailf2kafka index e5fce4a..b387cc0 100755 --- a/bin/tailf2kafka +++ b/bin/tailf2kafka @@ -1,7 +1,8 @@ #!/usr/bin/env ruby require 'optparse' -require 'poseidon' +require 'kafka' +require 'diplomat' require 'yaml' require 'hash_symbolizer' require 'schash' @@ -63,8 +64,11 @@ validator = Schash::Validator.new do post_delete_command: optional(string), }, kafka: { - brokers: array_of(string), - producer_type: match(/^(sync|async)$/), + brokers: optional(string), + consul_service: optional(string), + consul_tag: optional(string), + connect_timeout: optional(integer), + socket_timeout: optional(integer), produce: optional(boolean), } } @@ -93,10 +97,29 @@ end @max_batches = @settings[:tailf].has_key?(:max_batches) ? @settings[:tailf][:max_batches] : 10 @from_begining = @settings[:tailf][:from_begining] @delete_old_tailed_files = @settings[:tailf].has_key?(:delete_old_tailed_files) ? @settings[:tailf][:delete_old_tailed_files] : false -@brokers = @settings[:kafka][:brokers] -@producer_type = @settings[:kafka][:producer_type].to_sym @produce = @settings[:kafka].has_key?(:produce) ? @settings[:kafka][:produce] : true +def kafka_connect + if @settings[:kafka].has_key?(:brokers) + kafka_nodes = @settings[:kafka][:brokers] + else + kafka_nodes = Diplomat::Service.get(@settings[:kafka][:consul_service], :all, {:tag => @settings[:kafka][:consul_tag]}).inject([]) {|hosts,h| hosts << "#{h.Node}:#{h.ServicePort}"; hosts} + end + @kafka = Kafka.new( + seed_brokers: kafka_nodes, + logger: @logger, + connect_timeout: @settings[:kafka].has_key?(:socket_timeout) ? @settings[:kafka][:socket_timeout] : 10, + socket_timeout: @settings[:kafka].has_key?(:socket_timeout) ? @settings[:kafka][:socket_timeout] : 60 + ) + @producer = @kafka.producer( + required_acks: 0, + max_retries: 2, + retry_backoff: 1, + compression_codec: :snappy, + max_buffer_size: (@settings[:tailf].has_key?(:max_batch_lines) ? @settings[:tailf][:max_batch_lines] : 1024) + 1 + ) +end + def write_position_file @mutex.synchronize do File.open(@position_file, 'w') do |file| @@ -123,19 +146,25 @@ end load_position_file @topics = @settings[:tailf][:files].map{|tailf_file| tailf_file[:topic]} -@producer = Poseidon::Producer.new(@brokers, "#{Socket.gethostname}", :type => @producer_type, :compression_codec => :snappy, :compressed_topics => @topics) if @produce + +kafka_connect if @produce @producer_queue = SizedQueue.new(@max_batches * 10) @producer_thread = Thread.new do loop do batch = @producer_queue.pop - begin - @producer.send_messages(batch[:messages]) if @produce - rescue Poseidon::Errors::UnableToFetchMetadata - @logger.warn("Got Poseidon::Errors::UnableToFetchMetadata while trying to produce kafka messages, retrying in 1 second ...") - sleep 1 - retry + if @produce + begin + batch[:messages].each do |msg| + @producer.produce(msg, topic: batch[:topic]) + end + @producer.deliver_messages + rescue Kafka::DeliveryFailed + @logger.warn("Producer failed to deliver messages to brokers, retrying in 1 second ...") + sleep 1 + retry + end end @files[batch[:path]][:offset] = batch[:offset] end @@ -151,13 +180,13 @@ def kafka_produce(path, buffer, offset) truncated = msg else msg = msg + buffer.shift - messages << Poseidon::MessageToSend.new(@files[path][:topic], msg.strip) + messages << msg.strip end else - messages << Poseidon::MessageToSend.new(@files[path][:topic], msg.strip) + messages << msg.strip end end - @producer_queue.push({ :path => path, :messages => messages, :offset => offset}) + @producer_queue.push({ :topic => @files[path][:topic], :path => path, :messages => messages, :offset => offset }) truncated end diff --git a/lib/tailf2kafka/version.rb b/lib/tailf2kafka/version.rb index df1642a..e84c404 100644 --- a/lib/tailf2kafka/version.rb +++ b/lib/tailf2kafka/version.rb @@ -1,3 +1,3 @@ module Tailf2Kafka - VERSION ||= '0.1.10' + VERSION ||= '0.2.0' end diff --git a/tailf2kafka.gemspec b/tailf2kafka.gemspec index c605e4e..7235d63 100644 --- a/tailf2kafka.gemspec +++ b/tailf2kafka.gemspec @@ -15,7 +15,7 @@ Gem::Specification.new do |s| s.license = 'MIT' s.has_rdoc = false - s.add_dependency('poseidon') + s.add_dependency('ruby-kafka') s.add_dependency('snappy') s.add_dependency('hash_symbolizer') s.add_dependency('schash') @@ -23,6 +23,7 @@ Gem::Specification.new do |s| s.add_dependency('timers') s.add_dependency('mixlib-shellout') s.add_dependency('activesupport', '~> 4.2.6') + s.add_dependency('diplomat') s.add_development_dependency('rake') From a32c4d074353f522f499cf2ce768f003ef3331af Mon Sep 17 00:00:00 2001 From: piavlo Date: Wed, 8 Feb 2017 21:16:44 +0200 Subject: [PATCH 2/4] 111 --- bin/tailf2kafka | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/tailf2kafka b/bin/tailf2kafka index b387cc0..169385d 100755 --- a/bin/tailf2kafka +++ b/bin/tailf2kafka @@ -116,7 +116,7 @@ def kafka_connect max_retries: 2, retry_backoff: 1, compression_codec: :snappy, - max_buffer_size: (@settings[:tailf].has_key?(:max_batch_lines) ? @settings[:tailf][:max_batch_lines] : 1024) + 1 + max_buffer_size: @max_batch_lines + 1 ) end From 25c8fde19440056b788daa4199cdc8e4e0849346 Mon Sep 17 00:00:00 2001 From: piavlo Date: Wed, 8 Feb 2017 22:54:42 +0200 Subject: [PATCH 3/4] fix loglevel and validation --- bin/tailf2kafka | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/tailf2kafka b/bin/tailf2kafka index 169385d..ce5c761 100755 --- a/bin/tailf2kafka +++ b/bin/tailf2kafka @@ -43,6 +43,7 @@ unless @config end @logger = Logger.new(STDOUT) +@logger.level = @loglevel @settings = YAML.load_file(@config).symbolize_keys(true) @@ -64,7 +65,7 @@ validator = Schash::Validator.new do post_delete_command: optional(string), }, kafka: { - brokers: optional(string), + brokers: optional(array_of(string)), consul_service: optional(string), consul_tag: optional(string), connect_timeout: optional(integer), From c7902e3624da5764a708a10ece17a034bd43e63b Mon Sep 17 00:00:00 2001 From: piavlo Date: Wed, 8 Feb 2017 23:58:09 +0200 Subject: [PATCH 4/4] send each batch to same partition + throttle processing batches by flush interval --- bin/tailf2kafka | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/bin/tailf2kafka b/bin/tailf2kafka index ce5c761..f154e9e 100755 --- a/bin/tailf2kafka +++ b/bin/tailf2kafka @@ -54,7 +54,7 @@ validator = Schash::Validator.new do topic: string, prefix: string, suffix: optional(string), - time_pattern: string, + time_pattern: string }), position_file: string, flush_interval: optional(integer), @@ -62,7 +62,7 @@ validator = Schash::Validator.new do max_batches: optional(integer), from_begining: boolean, delete_old_tailed_files: optional(boolean), - post_delete_command: optional(string), + post_delete_command: optional(string) }, kafka: { brokers: optional(array_of(string)), @@ -70,7 +70,7 @@ validator = Schash::Validator.new do consul_tag: optional(string), connect_timeout: optional(integer), socket_timeout: optional(integer), - produce: optional(boolean), + produce: optional(boolean) } } end @@ -87,7 +87,6 @@ end @create_notifier = INotify::Notifier.new @delete_notifier = INotify::Notifier.new -@tailf_notifier = INotify::Notifier.new @dirs = {} @files = {} @@ -152,14 +151,20 @@ kafka_connect if @produce @producer_queue = SizedQueue.new(@max_batches * 10) +@partition_key = 0 + @producer_thread = Thread.new do loop do batch = @producer_queue.pop if @produce + @partition_key += 1 + partition = @partition_key.to_s + puts batch[:messages].size begin batch[:messages].each do |msg| - @producer.produce(msg, topic: batch[:topic]) + @producer.produce(msg, topic: batch[:topic], partition_key: partition) end + batch[:messages] = [] @producer.deliver_messages rescue Kafka::DeliveryFailed @logger.warn("Producer failed to deliver messages to brokers, retrying in 1 second ...") @@ -205,8 +210,10 @@ def tailf(path) truncated = kafka_produce(path, batch, file.pos) end + notifier = INotify::Notifier.new + mutex = Mutex.new - @tailf_notifier.watch(path, :modify) do |event| + notifier.watch(path, :modify) do |event| mutex.synchronize do unless file.closed? (1..@max_batches).each do |i| @@ -220,6 +227,14 @@ def tailf(path) end end end + + loop do + time_before = Time.now + notifier.process + time_after = Time.now + time_left = @settings[:tailf][:flush_interval] - (time_before - time_after) + sleep time_left if time_left > 0 + end end @time_regexp_hash = { @@ -328,7 +343,4 @@ Thread.new { loop { @timers.wait } } end -Thread.new { @create_notifier.run } -Thread.new { @delete_notifier.run } - -@tailf_notifier.run +[ Thread.new { @create_notifier.run }, Thread.new { @delete_notifier.run } ].each{|t| t.join}