diff --git a/.ci/docker-compose.override.yml b/.ci/docker-compose.override.yml new file mode 100644 index 0000000..7c3fd31 --- /dev/null +++ b/.ci/docker-compose.override.yml @@ -0,0 +1,5 @@ +version: '3' + +services: + logstash: + network_mode: host diff --git a/.ci/run.sh b/.ci/run.sh new file mode 100755 index 0000000..daf3303 --- /dev/null +++ b/.ci/run.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# This is intended to be run inside the docker container as the command of the docker-compose. + +env + +set -ex + +jruby -rbundler/setup -S rspec -fd + +jruby -rbundler/setup -S rspec -fd --tag redis diff --git a/.travis.yml b/.travis.yml index a50fc73..028f060 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,13 @@ import: -- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file +- logstash-plugins/.ci:travis/travis.yml@1.x + +addons: + apt: + sources: + - sourceline: 'ppa:chris-lea/redis-server' + packages: + - redis-server + +before_install: + - sudo service redis-server stop + - sudo service redis-server start --bind 0.0.0.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 65fc096..d6479c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 3.7.0 + - Fix: better (Redis) exception handling [#89](https://github.com/logstash-plugins/logstash-input-redis/pull/89) + - Test: start running integration specs on CI + +## 3.6.1 + - Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86) + +## 3.6.0 + - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84) + ## 3.5.1 - [DOC] Reordered config option to alpha order [#79](https://github.com/logstash-plugins/logstash-input-redis/issues/79) diff --git a/README.md b/README.md index 5fd2cf6..10fa417 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Logstash Plugin Travis Build -[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-input-redis.svg)](https://travis-ci.org/logstash-plugins/logstash-input-redis) +[![Travis Build Status](https://travis-ci.com/logstash-plugins/logstash-input-redis.svg)](https://travis-ci.com/logstash-plugins/logstash-input-redis) This is a plugin for [Logstash](https://github.com/elastic/logstash). diff --git a/batch_perf/perf_batch.rb b/batch_perf/perf_batch.rb index 3863d85..ac81e85 100644 --- a/batch_perf/perf_batch.rb +++ b/batch_perf/perf_batch.rb @@ -3,7 +3,7 @@ require "securerandom" require "logstash/event" -require "logstash/pipeline" +require "logstash/java_pipeline" require_relative "../lib/logstash/inputs/redis" class BenchOptions @@ -35,7 +35,7 @@ def cfg_batch(d) bench_options = BenchOptions.new def input(cfg, slow, &block) - pipeline = LogStash::Pipeline.new(cfg) + pipeline = LogStash::JavaPipeline.new(cfg) queue = Queue.new pipeline.instance_eval do diff --git a/docs/index.asciidoc b/docs/index.asciidoc index cc8340e..7f80126 100755 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -84,12 +84,14 @@ Redis allows for the renaming or disabling of commands in its protocol, see: ht ===== `data_type` * This is a required setting. - * Value can be any of: `list`, `channel`, `pattern_channel` + * Value can be any of: `list`, `pattern_list`, `channel`, `pattern_channel` * There is no default value for this setting. Specify either list or channel. If `data_type` is `list`, then we will BLPOP the -key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. -If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. +key. If `data_type` is `pattern_list`, then we will spawn a number of worker +threads that will LPOP from keys matching that pattern. If `data_type` is +`channel`, then we will SUBSCRIBE to the key. If `data_type` is +`pattern_channel`, then we will PSUBSCRIBE to the key. [id="plugins-{type}s-{plugin}-db"] ===== `db` @@ -125,6 +127,7 @@ The unix socket path of your Redis server. The name of a Redis list or channel. + [id="plugins-{type}s-{plugin}-password"] ===== `password` @@ -133,6 +136,37 @@ The name of a Redis list or channel. Password to authenticate with. There is no authentication by default. + +[id="plugins-{type}s-{plugin}-pattern_list_max_items"] +===== `pattern_list_max_items` + + * Value type is <> + * Default value is `1000` + +Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`. +After the list is empty or this number of items have been processed, the thread will exit and a +new one will be started if there are non-empty lists matching the pattern without a consumer. + + +[id="plugins-{type}s-{plugin}-pattern_list_threadpool_sleep"] +===== `pattern_list_threadpool_sleep` + + * Value type is <> + * Default value is `0.2` + +Time to sleep in main loop after checking if more threads can/need to be spawned. +Applies to `data_type` is `pattern_list` + + +[id="plugins-{type}s-{plugin}-pattern_list_threads"] +===== `pattern_list_threads` + + * Value type is <> + * Default value is `20` + +Maximum number of worker threads to spawn when using `data_type` `pattern_list`. + + [id="plugins-{type}s-{plugin}-port"] ===== `port` @@ -141,8 +175,9 @@ Password to authenticate with. There is no authentication by default. The port to connect on. + [id="plugins-{type}s-{plugin}-ssl"] -===== `ssl` +===== `ssl` * Value type is <> * Default value is `false` @@ -157,7 +192,6 @@ Enable SSL support. * Default value is `1` - [id="plugins-{type}s-{plugin}-timeout"] ===== `timeout` @@ -166,7 +200,9 @@ Enable SSL support. Initial connection timeout in seconds. + + [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] -:default_codec!: \ No newline at end of file +:default_codec!: diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 9722d8b..e8b24a7 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -3,6 +3,9 @@ require "logstash/inputs/base" require "logstash/inputs/threadable" require 'redis' +require 'concurrent' +require 'concurrent/executors' +require "stud/interval" # This input will read events from a Redis instance; it supports both Redis channels and lists. # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and @@ -49,9 +52,11 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable config :key, :validate => :string, :required => true # Specify either list or channel. If `data_type` is `list`, then we will BLPOP the - # key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. - # If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. - config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true + # key. If `data_type` is `pattern_list`, then we will spawn a number of worker + # threads that will LPOP from keys matching that pattern. If `data_type` is + # `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`, + # then we will PSUBSCRIBE to the key. + config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 125 @@ -59,33 +64,39 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # Redefined Redis commands to be passed to the Redis client. config :command_map, :validate => :hash, :default => {} - public - # public API - # use to store a proc that can provide a Redis instance or mock - def add_external_redis_builder(builder) #callable - @redis_builder = builder - self - end + # Maximum number of worker threads to spawn when using `data_type` `pattern_list`. + config :pattern_list_threads, :validate => :number, :default => 20 - # use to apply an instance directly and bypass the builder - def use_redis(instance) - @redis = instance - self - end + # Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`. + # After the list is empty or this number of items have been processed, the thread will exit and a + # new one will be started if there are non-empty lists matching the pattern without a consumer. + config :pattern_list_max_items, :validate => :number, :default => 1000 - def new_redis_instance - @redis_builder.call + # Time to sleep in main loop after checking if more threads can/need to be spawned. + # Applies to `data_type` is `pattern_list` + config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2 + + public + + def init_threadpool + @threadpool ||= Concurrent::ThreadPoolExecutor.new( + min_threads: @pattern_list_threads, + max_threads: @pattern_list_threads, + max_queue: 2 * @pattern_list_threads + ) + @current_workers ||= Concurrent::Set.new end def register @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" - @redis_builder ||= method(:internal_redis_builder) - # just switch on data_type once if @data_type == 'list' || @data_type == 'dummy' @run_method = method(:list_runner) @stop_method = method(:list_stop) + elsif @data_type == 'pattern_list' + @run_method = method(:pattern_list_runner) + @stop_method = method(:pattern_list_stop) elsif @data_type == 'channel' @run_method = method(:channel_runner) @stop_method = method(:subscribe_stop) @@ -94,8 +105,6 @@ def register @stop_method = method(:subscribe_stop) end - @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) - @identity = "#{@redis_url} #{@data_type}:#{@key}" @logger.info("Registering Redis", :identity => @identity) end # def register @@ -119,35 +128,30 @@ def batched? # private def is_list_type? - @data_type == 'list' + @data_type == 'list' || @data_type == 'pattern_list' end # private def redis_params + params = { + :timeout => @timeout, + :db => @db, + :password => @password.nil? ? nil : @password.value, + :ssl => @ssl + } + if @path.nil? - connectionParams = { - :host => @host, - :port => @port - } + params[:host] = @host + params[:port] = @port else @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'") - connectionParams = { - :path => @path - } + params[:path] = @path end - baseParams = { - :timeout => @timeout, - :db => @db, - :password => @password.nil? ? nil : @password.value, - :ssl => @ssl - } - - return connectionParams.merge(baseParams) + params end - # private - def internal_redis_builder + def new_redis_instance ::Redis.new(redis_params) end @@ -156,14 +160,12 @@ def connect redis = new_redis_instance # register any renamed Redis commands - if @command_map.any? - client_command_map = redis.client.command_map - @command_map.each do |name, renamed| - client_command_map[name.to_sym] = renamed.to_sym - end + @command_map.each do |name, renamed| + redis._client.command_map[name.to_sym] = renamed.to_sym end load_batch_script(redis) if batched? && is_list_type? + redis end # def connect @@ -193,40 +195,142 @@ def queue_event(msg, output_queue, channel=nil) end # private - def list_stop - return if @redis.nil? || !@redis.connected? + def reset_redis + redis = @redis # might change during method invocation + return if redis.nil? || !redis.connected? - @redis.quit rescue nil + redis.quit rescue nil # does client.disconnect internally + # check if input retried while executing + list_stop unless redis.equal? @redis @redis = nil end + # private + def list_stop + reset_redis + end + # private def list_runner(output_queue) + @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) while !stop? begin @redis ||= connect @list_method.call(@redis, output_queue) - rescue ::Redis::BaseError => e - @logger.warn("Redis connection problem", :exception => e) - # Reset the redis variable to trigger reconnect - @redis = nil - # this sleep does not need to be stoppable as its - # in a while !stop? loop - sleep 1 + rescue => e + log_error(e) + retry if reset_for_error_retry(e) end end end - def list_batch_listener(redis, output_queue) + #private + def reset_threadpool + return if @threadpool.nil? + @threadpool.shutdown + @threadpool.wait_for_termination + @threadpool = nil + end + + # private + def pattern_list_stop + reset_redis + reset_threadpool + end + + # private + def pattern_list_process_item(redis, output_queue, key) + if stop? + @logger.debug("Breaking from thread #{key} as it was requested to stop") + return false + end + value = redis.lpop(key) + return false if value.nil? + queue_event(value, output_queue) + true + end + + # private + def pattern_list_single_processor(redis, output_queue, key) + (0...@pattern_list_max_items).each do + break unless pattern_list_process_item(redis, output_queue, key) + end + end + + # private + def pattern_list_batch_processor(redis, output_queue, key) + items_left = @pattern_list_max_items + while items_left > 0 + limit = [items_left, @batch_count].min + processed = process_batch(redis, output_queue, key, limit, 0) + if processed.zero? || processed < limit + return + end + items_left -= processed + end + end + + # private + def pattern_list_worker_consume(output_queue, key) begin - results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]) - results.each do |item| - queue_event(item, output_queue) + redis ||= connect + @pattern_list_processor.call(redis, output_queue, key) + rescue ::Redis::BaseError => e + @logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e) + sleep 1 + return + ensure + redis.quit rescue nil + end + end + + # private + def threadpool_capacity? + @threadpool.remaining_capacity > 0 + end + + # private + def pattern_list_launch_worker(output_queue, key) + @current_workers.add(key) + @threadpool.post do + begin + pattern_list_worker_consume(output_queue, key) + ensure + @current_workers.delete(key) end + end + end - if results.size.zero? - sleep BATCH_EMPTY_SLEEP + # private + def pattern_list_ensure_workers(output_queue) + return unless threadpool_capacity? + redis_runner do + @redis.keys(@key).shuffle.each do |key| + next if @current_workers.include?(key) + pattern_list_launch_worker(output_queue, key) + break unless threadpool_capacity? end + end + end + + # private + def pattern_list_runner(output_queue) + @pattern_list_processor = batched? ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor) + while !stop? + init_threadpool if @threadpool.nil? + pattern_list_ensure_workers(output_queue) + sleep(@pattern_list_threadpool_sleep) + end + end + + def process_batch(redis, output_queue, key, batch_size, sleep_time) + begin + results = redis.evalsha(@redis_script_sha, [key], [batch_size-1]) + results.each do |item| + queue_event(item, output_queue) + end + sleep sleep_time if results.size.zero? && sleep_time > 0 + results.size # Below is a commented-out implementation of 'batch fetch' # using pipelined LPOP calls. This in practice has been observed to @@ -255,6 +359,10 @@ def list_batch_listener(redis, output_queue) end end + def list_batch_listener(redis, output_queue) + process_batch(redis, output_queue, @key, @batch_count, BATCH_EMPTY_SLEEP) + end + def list_single_listener(redis, output_queue) item = redis.blpop(@key, 0, :timeout => 1) return unless item # from timeout or other conditions @@ -266,18 +374,19 @@ def list_single_listener(redis, output_queue) # private def subscribe_stop - return if @redis.nil? || !@redis.connected? - # if its a SubscribedClient then: - # it does not have a disconnect method (yet) - if @redis.client.is_a?(::Redis::SubscribedClient) + redis = @redis # might change during method invocation + return if redis.nil? || !redis.connected? + + if redis.subscribed? if @data_type == 'pattern_channel' - @redis.client.punsubscribe + redis.punsubscribe else - @redis.client.unsubscribe + redis.unsubscribe end - else - @redis.client.disconnect end + redis.close rescue nil # does client.disconnect + # check if input retried while executing + subscribe_stop unless redis.equal? @redis @redis = nil end @@ -286,15 +395,43 @@ def redis_runner begin @redis ||= connect yield - rescue ::Redis::BaseError => e - @logger.warn("Redis connection problem", :exception => e) - # Reset the redis variable to trigger reconnect - @redis = nil - Stud.stoppable_sleep(1) { stop? } - retry if !stop? + rescue => e + log_error(e) + retry if reset_for_error_retry(e) end end + def log_error(e) + info = { message: e.message, exception: e.class } + info[:backtrace] = e.backtrace if @logger.debug? + + case e + when ::Redis::TimeoutError + # expected for channels in case no data is available + @logger.debug("Redis timeout, retrying", info) + when ::Redis::BaseConnectionError, ::Redis::ProtocolError + @logger.warn("Redis connection error", info) + when ::Redis::BaseError + @logger.error("Redis error", info) + when ::LogStash::ShutdownSignal + @logger.debug("Received shutdown signal") + else + info[:backtrace] ||= e.backtrace + @logger.error("Unexpected error", info) + end + end + + # @return [true] if operation is fine to retry + def reset_for_error_retry(e) + return if e.is_a?(::LogStash::ShutdownSignal) + + # Reset the redis variable to trigger reconnect + @redis = nil + + Stud.stoppable_sleep(1) { stop? } + !stop? # retry if not stop-ing + end + # private def channel_runner(output_queue) redis_runner do @@ -342,6 +479,4 @@ def pattern_channel_listener(output_queue) end end -# end - end end end # Redis Inputs LogStash diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index e68f21a..520cacd 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-redis' - s.version = '3.5.1' + s.version = '3.7.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from a Redis instance" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -23,7 +23,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json' - s.add_runtime_dependency 'redis', '~> 4' + s.add_runtime_dependency 'redis', '>= 4.0.1', '< 5' s.add_development_dependency 'logstash-devutils' end diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 6675a8e..78c3213 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -1,12 +1,12 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/devutils/rspec/shared_examples" -require "redis" -require "stud/try" require 'logstash/inputs/redis' require 'securerandom' def populate(key, event_count) require "logstash/event" + require "redis" + require "stud/try" redis = Redis.new(:host => "localhost") event_count.times do |value| event = LogStash::Event.new("sequence" => value) @@ -16,12 +16,22 @@ def populate(key, event_count) end end -def process(conf, event_count) - events = input(conf) do |pipeline, queue| - event_count.times.map{queue.pop} +def wait_events(conf, event_count) + events = input(conf) do |_, queue| + sleep 0.1 until queue.size >= event_count + queue.size.times.map { queue.pop } end + expect(events.size).to eq event_count + events +end - expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a) +def process(conf, event_count) + events = wait_events(conf, event_count) + # due multiple workers we get events out-of-order in the output + events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') } + expect(events[0].get('sequence')).to eq(0) + expect(events[100].get('sequence')).to eq(100) + expect(events[1000].get('sequence')).to eq(1000) end # integration tests --------------------- @@ -31,7 +41,6 @@ def process(conf, event_count) it "should read events from a list" do key = SecureRandom.hex event_count = 1000 + rand(50) - # event_count = 100 conf = <<-CONFIG input { redis { @@ -63,155 +72,245 @@ def process(conf, event_count) populate(key, event_count) process(conf, event_count) end -end -# unit tests --------------------- + it "should read events from a list pattern" do + key_base = SecureRandom.hex + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key_base}.*" + data_type => "pattern_list" + batch_count => 1 + } + } + CONFIG + total_event_count = 0 + (0..10).each do |idx| + event_count = 100 + rand(50) + total_event_count += event_count + populate("#{key_base}.#{idx}", event_count) + end + wait_events(conf, total_event_count) + end + + it "should read events from a list pattern using batch_count (default 125)" do + key_base = SecureRandom.hex + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key_base}.*" + data_type => "pattern_list" + batch_count => 125 + } + } + CONFIG + total_event_count = 0 + (0..10).each do |idx| + event_count = 100 + rand(50) + total_event_count += event_count + populate("#{key_base}.#{idx}", event_count) + end + wait_events(conf, total_event_count) + end +end describe LogStash::Inputs::Redis do - let(:redis) { double('redis') } - let(:builder) { ->{ redis } } - let(:connection) { double('redis_connection') } - let(:connected) { [true] } + let(:queue) { Queue.new } + let(:data_type) { 'list' } + let(:redis_key) { 'foo' } let(:batch_count) { 1 } - let(:cfg) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} } + let(:config) { {'key' => redis_key, 'data_type' => data_type, 'batch_count' => batch_count} } let(:quit_calls) { [:quit] } - let(:accumulator) { [] } - let(:command_map) { {} } subject do - LogStash::Plugin.lookup("input", "redis") - .new(cfg).add_external_redis_builder(builder) + LogStash::Inputs::Redis.new(config) end context 'construction' do it 'registers the input' do - expect {subject.register}.not_to raise_error + expect { subject.register }.not_to raise_error end end context 'renamed redis commands' do - let(:cfg) { - {'key' => 'foo', - 'data_type' => data_type, - 'command_map' => - { - 'blpop' => 'testblpop', - 'evalsha' => 'testevalsha', - 'lrange' => 'testlrange', - 'ltrim' => 'testltrim', - 'script' => 'testscript', - 'subscribe' => 'testsubscribe', - 'psubscribe' => 'testpsubscribe', - }, - 'batch_count' => 2 + let(:config) do + { + 'key' => 'foo', + 'data_type' => data_type, + 'command_map' => { + 'blpop' => 'testblpop', + 'evalsha' => 'testevalsha', + 'lrange' => 'testlrange', + 'ltrim' => 'testltrim', + 'script' => 'testscript', + 'subscribe' => 'testsubscribe', + 'psubscribe' => 'testpsubscribe', + }, + 'batch_count' => 2 } - } - - before do - subject.register - allow(redis).to receive(:connected?) - allow(redis).to receive(:client).and_return(connection) - allow(connection).to receive(:command_map).and_return(command_map) end it 'sets the renamed commands in the command map' do - allow(redis).to receive(:script) - allow(redis).to receive(:evalsha).and_return([]) - - tt = Thread.new do - sleep 0.01 - subject.do_stop + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :script + expect(command[1]).to eql 'load' end - subject.run(accumulator) - tt.join + subject.register + redis = subject.send :connect - expect(command_map[:blpop]).to eq cfg['command_map']['blpop'].to_sym - expect(command_map[:evalsha]).to eq cfg['command_map']['evalsha'].to_sym - expect(command_map[:lrange]).to eq cfg['command_map']['lrange'].to_sym - expect(command_map[:ltrim]).to eq cfg['command_map']['ltrim'].to_sym - expect(command_map[:script]).to eq cfg['command_map']['script'].to_sym - expect(command_map[:subscribe]).to eq cfg['command_map']['subscribe'].to_sym - expect(command_map[:psubscribe]).to eq cfg['command_map']['psubscribe'].to_sym + command_map = redis._client.command_map + + expect(command_map[:blpop]).to eq config['command_map']['blpop'].to_sym + expect(command_map[:evalsha]).to eq config['command_map']['evalsha'].to_sym + expect(command_map[:lrange]).to eq config['command_map']['lrange'].to_sym + expect(command_map[:ltrim]).to eq config['command_map']['ltrim'].to_sym + expect(command_map[:script]).to eq config['command_map']['script'].to_sym + expect(command_map[:subscribe]).to eq config['command_map']['subscribe'].to_sym + expect(command_map[:psubscribe]).to eq config['command_map']['psubscribe'].to_sym end it 'loads the batch script with the renamed command' do - capture = nil - allow(redis).to receive(:script) { |load, lua_script| capture = lua_script } - allow(redis).to receive(:evalsha).and_return([]) + expect_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :script + expect(command[1]).to eql 'load' - tt = Thread.new do - sleep 0.01 - subject.do_stop + script = command[2] + expect(script).to include "redis.call('#{config['command_map']['lrange']}', KEYS[1], 0, batchsize)" + expect(script).to include "redis.call('#{config['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)" end - subject.run(accumulator) - tt.join - - expect(capture).to include "redis.call('#{cfg['command_map']['lrange']}', KEYS[1], 0, batchsize)" - expect(capture).to include "redis.call('#{cfg['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)" + subject.register + subject.send :connect end end - context 'runtime for list data_type' do + before do subject.register + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true + allow_any_instance_of( Redis::Client ).to receive(:disconnect) + allow_any_instance_of( Redis ).to receive(:quit) + end + + after do + subject.stop end context 'close when redis is unset' do - let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } it 'does not attempt to quit' do - allow(redis).to receive(:nil?).and_return(true) - quit_calls.each do |call| - expect(redis).not_to receive(call) - end - expect {subject.do_stop}.not_to raise_error + expect_any_instance_of( Redis::Client ).to_not receive(:call) + expect_any_instance_of( Redis::Client ).to_not receive(:disconnect) + + expect { subject.do_stop }.not_to raise_error end end it 'calling the run method, adds events to the queue' do - expect(redis).to receive(:blpop).at_least(:once).and_return(['foo', 'l1']) + allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + expect(command[0]).to eql :blpop + expect(command[1]).to eql ['foo', 0] + end.and_return ['foo', "{\"foo1\":\"bar\""], nil - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:quit) + tt = Thread.new do + sleep 0.25 + subject.do_stop + end + + subject.run(queue) + + tt.join + + expect( queue.size ).to be > 0 + end + + it 'keeps running when a connection error occurs' do + raised = false + allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + expect(command[0]).to eql :blpop + unless raised + raised = true + raise Redis::CannotConnectError.new('test') + end + ['foo', "{\"after\":\"raise\"}"] + end + + expect(subject.logger).to receive(:warn).with('Redis connection error', + hash_including(:message=>"test", :exception=>Redis::CannotConnectError) + ).and_call_original tt = Thread.new do - sleep 0.01 + sleep 2.0 # allow for retry (sleep) after handle_error subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join - expect(accumulator.size).to be > 0 + try(3) { expect( queue.size ).to be > 0 } end - context "when the batch size is greater than 1" do - let(:batch_count) { 10 } - let(:rates) { [] } + context 'error handling' do + + let(:config) do + super().merge 'batch_count' => 2 + end + + it 'keeps running when a (non-Redis) io error occurs' do + raised = false + allow(subject).to receive(:connect).and_return redis = double('redis') + allow(redis).to receive(:blpop).and_return nil + expect(redis).to receive(:evalsha) do + unless raised + raised = true + raise IOError.new('closed stream') + end + [] + end.at_least(1) + redis + allow(subject).to receive(:stop) + + expect(subject.logger).to receive(:error).with('Unexpected error', + hash_including(:message=>'closed stream', :exception=>IOError) + ).and_call_original - before do - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:script) - allow(redis).to receive(:quit) + tt = Thread.new do + sleep 2.0 # allow for retry (sleep) after handle_error + subject.do_stop + end + + subject.run(queue) + + tt.join end + end + + context "when the batch size is greater than 1" do + let(:batch_count) { 10 } + it 'calling the run method, adds events to the queue' do - expect(redis).to receive(:evalsha).at_least(:once).and_return(['a', 'b']) + allow_any_instance_of( Redis ).to receive(:script) + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :evalsha + end.and_return ['{"a": 1}', '{"b":'], [] tt = Thread.new do - sleep 0.01 + sleep 0.25 subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join - expect(accumulator.size).to be > 0 + + expect( queue.size ).to be > 0 end end @@ -220,20 +319,18 @@ def process(conf, event_count) let(:rates) { [] } it 'will throttle the loop' do - allow(redis).to receive(:evalsha) do + allow_any_instance_of( Redis ).to receive(:script) + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :evalsha rates.unshift Time.now.to_f - [] - end - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:script) - allow(redis).to receive(:quit) + end.and_return [] tt = Thread.new do - sleep 1 + sleep 0.25 subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join @@ -242,7 +339,7 @@ def process(conf, event_count) inters << x - y end - expect(accumulator.size).to eq(0) + expect( queue.size ).to eq(0) inters.each do |delta| expect(delta).to be_within(0.01).of(LogStash::Inputs::Redis::BATCH_EMPTY_SLEEP) end @@ -250,24 +347,84 @@ def process(conf, event_count) end it 'multiple close calls, calls to redis once' do - subject.use_redis(redis) - allow(redis).to receive(:blpop).and_return(['foo', 'l1']) - expect(redis).to receive(:connected?).and_return(connected.last) + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false + # allow_any_instance_of( Redis::Client ).to receive(:disconnect) quit_calls.each do |call| - expect(redis).to receive(call).at_most(:once) + allow_any_instance_of( Redis ).to receive(call).at_most(:once) + end + + subject.do_stop + expect { subject.do_stop }.not_to raise_error + subject.do_stop + end + end + + context 'runtime for pattern_list data_type' do + let(:data_type) { 'pattern_list' } + let(:redis_key) { 'foo.*' } + + before do + subject.register + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true + allow_any_instance_of( Redis::Client ).to receive(:disconnect) + allow_any_instance_of( Redis ).to receive(:quit) + subject.init_threadpool + end + + after do + subject.stop + end + + context 'close when redis is unset' do + let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } + + it 'does not attempt to quit' do + allow_any_instance_of( Redis::Client ).to receive(:nil?).and_return(true) + quit_calls.each do |call| + expect_any_instance_of( Redis::Client ).not_to receive(call) + end + expect {subject.do_stop}.not_to raise_error + end + end + + it 'calling the run method, adds events to the queue' do + expect_any_instance_of( Redis ).to receive(:keys).at_least(:once).with(redis_key).and_return ['foo.bar'] + expect_any_instance_of( Redis ).to receive(:lpop).at_least(:once).with('foo.bar').and_return 'l1' + + tt = Thread.new do + end_by = Time.now + 3 + sleep 0.1 until queue.size > 0 or Time.now > end_by + subject.do_stop + end + + subject.run(queue) + + tt.join + + expect(queue.size).to be > 0 + end + + it 'multiple close calls, calls to redis once' do + allow_any_instance_of( Redis ).to receive(:keys).with(redis_key).and_return(['foo.bar']) + allow_any_instance_of( Redis ).to receive(:lpop).with('foo.bar').and_return('l1') + + quit_calls.each do |call| + allow_any_instance_of( Redis ).to receive(call).at_most(:once) end subject.do_stop - connected.push(false) #can't use let block here so push to array expect {subject.do_stop}.not_to raise_error subject.do_stop end end context 'for the subscribe data_types' do + + before { subject.register } + def run_it_thread(inst) Thread.new(inst) do |subj| - subj.run(accumulator) + subj.run(queue) end end @@ -283,35 +440,21 @@ def publish_thread(new_redis, prefix) def close_thread(inst, rt) Thread.new(inst, rt) do |subj, runner| # block for the messages - e1 = accumulator.pop - e2 = accumulator.pop + e1 = queue.pop + e2 = queue.pop # put em back for the tests - accumulator.push(e1) - accumulator.push(e2) + queue.push(e1) + queue.push(e2) runner.raise(LogStash::ShutdownSignal) subj.close end end - let(:accumulator) { Queue.new } - - let(:instance) do - inst = described_class.new(cfg) - inst.register - inst - end - before(:example, type: :mocked) do subject.register - subject.use_redis(redis) - allow(connection).to receive(:is_a?).and_return(true) - allow(redis).to receive(:client).and_return(connection) - expect(redis).to receive(:connected?).and_return(connected.last) - allow(connection).to receive(:unsubscribe) - allow(connection).to receive(:punsubscribe) - + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false quit_calls.each do |call| - expect(redis).to receive(call).at_most(:once) + allow_any_instance_of( Redis ).to receive(call).at_most(:once) end end @@ -319,11 +462,12 @@ def close_thread(inst, rt) let(:data_type) { 'channel' } let(:quit_calls) { [:unsubscribe, :connection] } + before { subject.register } + context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -331,23 +475,23 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'c').join + publish_thread(subject.send(:new_redis_instance), 'c').join #simulate the pipeline thread - close_thread(instance, rt).join + close_thread(subject, rt).join - expect(accumulator.size).to eq(2) + expect(queue.size).to eq(2) end it 'events had redis_channel' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'c').join + publish_thread(subject.send(:new_redis_instance), 'c').join #simulate the pipeline thread - close_thread(instance, rt).join - e1 = accumulator.pop - e2 = accumulator.pop + close_thread(subject, rt).join + e1 = queue.pop + e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') expect(e2.get('[@metadata][redis_channel]')).to eq('foo') end @@ -361,8 +505,7 @@ def close_thread(inst, rt) context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -370,23 +513,24 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'pc').join + publish_thread(subject.send(:new_redis_instance), 'pc').join #simulate the pipeline thread - close_thread(instance, rt).join + close_thread(subject, rt).join - expect(accumulator.size).to eq(2) + expect(queue.size).to eq(2) end + it 'events had redis_channel' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'pc').join + publish_thread(subject.send(:new_redis_instance), 'pc').join #simulate the pipeline thread - close_thread(instance, rt).join - e1 = accumulator.pop - e2 = accumulator.pop + close_thread(subject, rt).join + e1 = queue.pop + e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') expect(e2.get('[@metadata][redis_channel]')).to eq('foo') end @@ -394,15 +538,19 @@ def close_thread(inst, rt) end end - describe LogStash::Inputs::Redis do - context "when using data type" do - ["list", "channel", "pattern_channel"].each do |data_type| - context data_type do - it_behaves_like "an interruptible input plugin" do - let(:config) { {'key' => 'foo', 'data_type' => data_type } } - end + context "when using data type" do + + ["list", "channel", "pattern_channel", "pattern_list"].each do |data_type| + context data_type do + # TODO pending + # redis-rb ends up in a read wait loop since we do not use subscribe_with_timeout + next unless data_type == 'list' + + it_behaves_like "an interruptible input plugin", :redis => true do + let(:config) { { 'key' => 'foo', 'data_type' => data_type } } end end end + end end