From f578a8af40460a35d8afcdfb11ba811f1f9c5a19 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Wed, 7 Sep 2016 16:21:07 +0800 Subject: [PATCH 1/2] Redis supports failover through Sentinel --- lib/logstash/inputs/redis.rb | 37 ++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 79bc51e..ad7825c 100644 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -49,6 +49,10 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 125 + config :sentinel_hosts, :validate => :array + + config :master, :validate => :string, :default => "mymaster" + public # public API # use to store a proc that can provide a redis instance or mock @@ -68,8 +72,6 @@ def new_redis_instance end def register - @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}" - @redis_builder ||= method(:internal_redis_builder) # just switch on data_type once @@ -86,10 +88,17 @@ def register @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) - @identity = "#{@redis_url} #{@data_type}:#{@key}" + @identity = identity @logger.info("Registering Redis", :identity => @identity) end # def register + def identity + if @sentinel_hosts + return "redis-sentinel://#{@password} #{$sentinel_hosts} #{@db} #{@data_type}:#{@key}" + end + "redis://#{@password}@#{@host}:#{@port}/#{@db} #{@data_type}:#{@key}" + end + def run(output_queue) @run_method.call(output_queue) rescue LogStash::ShutdownSignal @@ -115,8 +124,6 @@ def is_list_type? # private def redis_params { - :host => @host, - :port => @port, :timeout => @timeout, :db => @db, :password => @password.nil? ? nil : @password.value @@ -125,7 +132,25 @@ def redis_params # private def internal_redis_builder - ::Redis.new(redis_params) + if @sentinel_hosts + params = redis_params + @logger.info('Connecting to sentinel') + hosts = [] + for sentinel_host in @sentinel_hosts + host, port = sentinel_host.split(":") + unless port + port = @sentinel_port + end + hosts.push({:host => host, :port => port}) + end + params[:url] = 'redis://'+@master + params[:sentinels] = hosts + params[:role] = :master + else + params[:host] = @current_host + params[:port] = @current_port + end + ::Redis.new(params) end # private From 7be4e4adde0ffde909b875b3f380041aa375fc91 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Sat, 10 Feb 2018 19:51:09 +0800 Subject: [PATCH 2/2] Redis supports failover through Sentinel SyntaxError --- lib/logstash/inputs/redis.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index e4575b9..b74e4ea 100644 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -129,7 +129,6 @@ def is_list_type? # private def redis_params - { if @path.nil? if @sentinel_hosts.nil? connectionParams = {