From ce6c5f7d6ccd5944ebe7cc19a41db28e23232aed Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 23 May 2017 16:49:37 +0900 Subject: [PATCH 1/8] Make mantadory parameters config_params that do not set default value become mandatory parameters. --- lib/fluent/plugin/in_gcloud_pubsub.rb | 7 ++----- lib/fluent/plugin/out_gcloud_pubsub.rb | 4 +--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/in_gcloud_pubsub.rb b/lib/fluent/plugin/in_gcloud_pubsub.rb index 80a4d8c..a97abc0 100644 --- a/lib/fluent/plugin/in_gcloud_pubsub.rb +++ b/lib/fluent/plugin/in_gcloud_pubsub.rb @@ -8,8 +8,8 @@ class GcloudPubSubInput < Input config_param :tag, :string config_param :project, :string, :default => nil - config_param :topic, :string, :default => nil - config_param :subscription, :string, :default => nil + config_param :topic, :string + config_param :subscription, :string config_param :key, :string, :default => nil config_param :pull_interval, :integer, :default => 5 config_param :max_messages, :integer, :default => 100 @@ -26,9 +26,6 @@ class GcloudPubSubInput < Input def configure(conf) super - raise Fluent::ConfigError, "'topic' must be specified." unless @topic - raise Fluent::ConfigError, "'subscription' must be specified." unless @subscription - configure_parser(conf) end diff --git a/lib/fluent/plugin/out_gcloud_pubsub.rb b/lib/fluent/plugin/out_gcloud_pubsub.rb index d80d0bf..3290a9b 100644 --- a/lib/fluent/plugin/out_gcloud_pubsub.rb +++ b/lib/fluent/plugin/out_gcloud_pubsub.rb @@ -13,7 +13,7 @@ class GcloudPubSubOutput < BufferedOutput config_set_default :buffer_queue_limit, 64 config_param :project, :string, :default => nil - config_param :topic, :string, :default => nil + config_param :topic, :string config_param :key, :string, :default => nil config_param :autocreate_topic, :bool, :default => false @@ -27,8 +27,6 @@ class GcloudPubSubOutput < BufferedOutput def configure(conf) super - - raise Fluent::ConfigError, "'topic' must be specified." unless @topic end def start From 6ad8195719a0d56ccd1784118dae895e369afcb1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 16 Jun 2017 22:32:31 +0900 Subject: [PATCH 2/8] in_gcloud_pubsub: Migrate to use v0.14 API --- lib/fluent/plugin/in_gcloud_pubsub.rb | 23 ++++++++++++----------- test/plugin/test_in_gcloud_pubsub.rb | 4 ++-- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/in_gcloud_pubsub.rb b/lib/fluent/plugin/in_gcloud_pubsub.rb index a97abc0..d35b3a4 100644 --- a/lib/fluent/plugin/in_gcloud_pubsub.rb +++ b/lib/fluent/plugin/in_gcloud_pubsub.rb @@ -1,11 +1,13 @@ require 'gcloud' -require 'fluent/input' -require 'fluent/parser' +require 'fluent/plugin/input' +require 'fluent/plugin/parser' -module Fluent +module Fluent::Plugin class GcloudPubSubInput < Input Fluent::Plugin.register_input('gcloud_pubsub', self) + helpers :parser, :thread + config_param :tag, :string config_param :project, :string, :default => nil config_param :topic, :string @@ -15,6 +17,10 @@ class GcloudPubSubInput < Input config_param :max_messages, :integer, :default => 100 config_param :return_immediately, :bool, :default => true + config_section :parse do + config_set_default :@type, 'json' + end + unless method_defined?(:log) define_method("log") { $log } end @@ -30,8 +36,7 @@ def configure(conf) end def configure_parser(conf) - @parser = Fluent::TextParser.new - @parser.configure(conf) + @parser = parser_create end def start @@ -41,7 +46,7 @@ def start topic = pubsub.topic @topic @client = topic.subscription @subscription @stop_subscribing = false - @subscribe_thread = Thread.new(&method(:subscribe)) + @subscribe_thread = thread_create(:in_gcloud_pubsub_input, &method(:subscribe)) end def shutdown @@ -52,10 +57,6 @@ def shutdown end private - def configure_parser(conf) - @parser = Fluent::TextParser.new - @parser.configure(conf) - end def subscribe until @stop_subscribing @@ -84,7 +85,7 @@ def subscribe end def parse_messages(messages) - es = MultiEventStream.new + es = Fluent::MultiEventStream.new messages.each do |m| convert_line_to_event(m.message.data, es) end diff --git a/test/plugin/test_in_gcloud_pubsub.rb b/test/plugin/test_in_gcloud_pubsub.rb index c185ee9..1f9d688 100644 --- a/test/plugin/test_in_gcloud_pubsub.rb +++ b/test/plugin/test_in_gcloud_pubsub.rb @@ -1,5 +1,5 @@ require_relative "../test_helper" - +require 'fluent/test/driver/input' class GcloudPubSubInputTest < Test::Unit::TestCase def setup @@ -7,7 +7,7 @@ def setup end def create_driver(conf=CONFIG) - Fluent::Test::InputTestDriver.new(Fluent::GcloudPubSubInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::GcloudPubSubInput).configure(conf) end def test_configure From 4ff00367169eae33cff2fe057a309cbb7fc62a0a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 16 Jun 2017 22:33:03 +0900 Subject: [PATCH 3/8] Remove ensuring backward compatibility code --- lib/fluent/plugin/in_gcloud_pubsub.rb | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/fluent/plugin/in_gcloud_pubsub.rb b/lib/fluent/plugin/in_gcloud_pubsub.rb index d35b3a4..cc92f48 100644 --- a/lib/fluent/plugin/in_gcloud_pubsub.rb +++ b/lib/fluent/plugin/in_gcloud_pubsub.rb @@ -21,14 +21,6 @@ class GcloudPubSubInput < Input config_set_default :@type, 'json' end - unless method_defined?(:log) - define_method("log") { $log } - end - - unless method_defined?(:router) - define_method("router") { Fluent::Engine } - end - def configure(conf) super From e4807380d0b723c98eac65bbecff7264bd8c6674 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 16 Jun 2017 22:46:26 +0900 Subject: [PATCH 4/8] out_gcloud_pubsub: Migrate to use v0.14 API --- lib/fluent/plugin/out_gcloud_pubsub.rb | 35 +++++++++++++++++++------- test/plugin/test_out_gcloud_pubsub.rb | 18 +++++++++---- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/lib/fluent/plugin/out_gcloud_pubsub.rb b/lib/fluent/plugin/out_gcloud_pubsub.rb index 3290a9b..40443f5 100644 --- a/lib/fluent/plugin/out_gcloud_pubsub.rb +++ b/lib/fluent/plugin/out_gcloud_pubsub.rb @@ -1,22 +1,30 @@ require 'gcloud' -require 'fluent/output' +require 'fluent/plugin/output' -module Fluent - class GcloudPubSubOutput < BufferedOutput +module Fluent::Plugin + class GcloudPubSubOutput < Output Fluent::Plugin.register_output('gcloud_pubsub', self) - config_set_default :buffer_type, 'lightening' - config_set_default :flush_interval, 1 - config_set_default :try_flush_interval, 0.05 - config_set_default :buffer_chunk_records_limit, 900 - config_set_default :buffer_chunk_limit, 9437184 - config_set_default :buffer_queue_limit, 64 + helpers :compat_parameters + + DEFAULT_BUFFER_TYPE = "memory" config_param :project, :string, :default => nil config_param :topic, :string config_param :key, :string, :default => nil config_param :autocreate_topic, :bool, :default => false + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + # In v0.14, buffer configurations are renamed. + # see: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/buffer.rb + config_set_default :flush_interval, 1 + config_set_default :try_flush_interval, 0.05 + config_set_default :chunk_limit_records, 900 + config_set_default :chunk_limit_size, 9437184 + config_set_default :queue_limit_length, 64 + end + unless method_defined?(:log) define_method("log") { $log } end @@ -26,6 +34,7 @@ class GcloudPubSubOutput < BufferedOutput end def configure(conf) + compat_parameters_convert(conf, :buffer) super end @@ -40,6 +49,14 @@ def format(tag, time, record) [tag, time, record].to_msgpack end + def formatted_to_msgpack_binary? + true + end + + def multi_workers_ready? + true + end + def write(chunk) messages = [] diff --git a/test/plugin/test_out_gcloud_pubsub.rb b/test/plugin/test_out_gcloud_pubsub.rb index d128fd9..45d1749 100644 --- a/test/plugin/test_out_gcloud_pubsub.rb +++ b/test/plugin/test_out_gcloud_pubsub.rb @@ -1,5 +1,5 @@ require_relative "../test_helper" - +require 'fluent/test/driver/output' class GcloudPubSubOutputTest < Test::Unit::TestCase DEFAULT_CONFIG = <<-EOC @@ -16,7 +16,7 @@ def setup end def create_driver(conf) - Fluent::Test::BufferedOutputTestDriver.new(Fluent::GcloudPubSubOutput).configure(conf) + Fluent::Test::Driver::Output.new(Fluent::Plugin::GcloudPubSubOutput).configure(conf) end def test_configure @@ -32,7 +32,7 @@ def test_configure assert_equal('topic-test', d.instance.topic) assert_equal('key-test', d.instance.key) assert_equal(false, d.instance.autocreate_topic) - assert_equal(1, d.instance.flush_interval) + assert_equal(1, d.instance.buffer_config.flush_interval) end def test_autocreate_topic @@ -47,7 +47,9 @@ def test_autocreate_topic assert_equal(true, d.instance.autocreate_topic) - chunk = Fluent::MemoryBufferChunk.new('key', 'data') + tag, time, record = "tag", Fluent::Engine.now, {"a" => "b"} + metadata = d.instance.metadata_for_test(tag, time, record) + chunk = d.instance.buffer.generate_chunk(metadata) client = mock! client.topic("topic-test", autocreate: true).once @@ -61,7 +63,13 @@ def test_autocreate_topic def test_re_raise_errors d = create_driver(DEFAULT_CONFIG) - chunk = Fluent::MemoryBufferChunk.new('key', 'data') + tag, time, record = "tag", Fluent::Engine.now, {"a" => "b"} + metadata = d.instance.metadata_for_test(tag, time, record) + chunk = d.instance.buffer.generate_chunk(metadata).tap do |c| + c.append([d.instance.format(tag, time, record)]) + end + chunk.extend Fluent::ChunkMessagePackEventStreamer + client = Object.new def client.publish raise ReRaisedError From d51fdf71d61694b5295b547fe5cb12b3502ef677 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 16 Jun 2017 22:47:18 +0900 Subject: [PATCH 5/8] Remove obsoleted lines --- lib/fluent/plugin/out_gcloud_pubsub.rb | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/fluent/plugin/out_gcloud_pubsub.rb b/lib/fluent/plugin/out_gcloud_pubsub.rb index 40443f5..95eb86e 100644 --- a/lib/fluent/plugin/out_gcloud_pubsub.rb +++ b/lib/fluent/plugin/out_gcloud_pubsub.rb @@ -25,14 +25,6 @@ class GcloudPubSubOutput < Output config_set_default :queue_limit_length, 64 end - unless method_defined?(:log) - define_method("log") { $log } - end - - unless method_defined?(:router) - define_method("router") { Fluent::Engine } - end - def configure(conf) compat_parameters_convert(conf, :buffer) super From f27cde2a5ebafa1f78450b44bc19f0dadda85d03 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 16 Jun 2017 22:48:56 +0900 Subject: [PATCH 6/8] Depends on v0.14.15 or later --- fluent-plugin-gcloud-pubsub.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-plugin-gcloud-pubsub.gemspec b/fluent-plugin-gcloud-pubsub.gemspec index d97910a..ea9a173 100644 --- a/fluent-plugin-gcloud-pubsub.gemspec +++ b/fluent-plugin-gcloud-pubsub.gemspec @@ -16,7 +16,7 @@ Gem::Specification.new do |gem| gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } gem.require_paths = ['lib'] - gem.add_runtime_dependency "fluentd", "~> 0.12.0" + gem.add_runtime_dependency "fluentd", [">= 0.14.15", "< 2"] gem.add_runtime_dependency "gcloud", "= 0.6.3" gem.add_runtime_dependency "fluent-plugin-buffer-lightening", ">= 0.0.2" From e385272a1ed48e6ae9777d30800921c603356f53 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 16 Jun 2017 23:01:48 +0900 Subject: [PATCH 7/8] Add comment for chunk#msgpack_each implementation in test case --- test/plugin/test_out_gcloud_pubsub.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/test/plugin/test_out_gcloud_pubsub.rb b/test/plugin/test_out_gcloud_pubsub.rb index 45d1749..7d71795 100644 --- a/test/plugin/test_out_gcloud_pubsub.rb +++ b/test/plugin/test_out_gcloud_pubsub.rb @@ -68,6 +68,7 @@ def test_re_raise_errors chunk = d.instance.buffer.generate_chunk(metadata).tap do |c| c.append([d.instance.format(tag, time, record)]) end + # For chunk#msgpack_each chunk.extend Fluent::ChunkMessagePackEventStreamer client = Object.new From cf8b49e2b1a329fa79d6111218c10827a15b8d57 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 16 Jun 2017 23:05:10 +0900 Subject: [PATCH 8/8] travis: Tweak for Rubies --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index ef3807c..c1be87b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ language: ruby rvm: - - 1.9.3 - - 2.0 - 2.1 - 2.2.2 + - 2.3.4 + - 2.4.1 gemfile: - Gemfile