diff --git a/lib/fluent/plugin/out_webhdfs.rb b/lib/fluent/plugin/out_webhdfs.rb index 3f35f69..2aa2885 100644 --- a/lib/fluent/plugin/out_webhdfs.rb +++ b/lib/fluent/plugin/out_webhdfs.rb @@ -20,7 +20,6 @@ class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output config_param :namenode, :string, default: nil # host:port desc 'Standby namenode for Namenode HA (host:port)' config_param :standby_namenode, :string, default: nil # host:port - desc 'Ignore errors on start up' config_param :ignore_start_check_error, :bool, default: false @@ -174,16 +173,21 @@ def configure(conf) else raise Fluent::ConfigError, "WebHDFS host or namenode missing" end + + # If you're running three or more name nodes, Write down the standby name node to be used, separated by spaces if @standby_namenode - unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @standby_namenode - raise Fluent::ConfigError, "Invalid config value about standby namenode: '#{@standby_namenode}', needs STANDBY_NAMENODE_HOST:PORT" - end - if @httpfs - raise Fluent::ConfigError, "Invalid configuration: specified to use both of standby_namenode and httpfs." + @standby_namenode_group = Array.new + for standby_namenode_content in @standby_namenode.split do + unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ standby_namenode_content + raise Fluent::ConfigError, "Invalid config value about standby namenode: '#{standby_namenode_content}', needs STANDBY_NAMENODE_HOST:PORT" + end + if @httpfs + raise Fluent::ConfigError, "Invalid configuration: specified to use both of standby_namenode and httpfs." + end + @standby_namenode_group << {host:$1 ,port:$2.to_i} end - @standby_namenode_host = $1 - @standby_namenode_port = $2.to_i end + unless @path.index('/') == 0 raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'" end @@ -195,12 +199,14 @@ def configure(conf) end @renew_kerberos_delegation_token_interval_hour = @renew_kerberos_delegation_token_interval / 60 / 60 end - + @client = prepare_client(@namenode_host, @namenode_port, @username) - if @standby_namenode_host - @client_standby = prepare_client(@standby_namenode_host, @standby_namenode_port, @username) + # Use clients_standby to find available names. + if @standby_namenode_group + @clients_standby = @standby_namenode_group.map{ |node| prepare_client(node[:host],node[:port],@username) } + @clients_standby = @clients_standby.prepend(@client) else - @client_standby = nil + @clients_standby = nil end unless @append @@ -261,9 +267,14 @@ def start log.info "webhdfs connection confirmed: #{@namenode_host}:#{@namenode_port}" return end - if @client_standby && namenode_available(@client_standby) - log.info "webhdfs connection confirmed: #{@standby_namenode_host}:#{@standby_namenode_port}" - return + # If you use standby_namenode, search all standby_namenodes to find available names. + if @clients_standby + for client_standby_check in @clients_standby do + if namenode_available(client_standby_check) + log.info "webhdfs connection confirmed: #{client_standby_check.host}:#{client_standby_check.port}" + return + end + end end unless @ignore_start_check_error @@ -275,11 +286,11 @@ def is_standby_exception(e) e.is_a?(WebHDFS::IOError) && e.message.match(/org\.apache\.hadoop\.ipc\.StandbyException/) end - def namenode_failover - if @standby_namenode - @client, @client_standby = @client_standby, @client - log.warn "Namenode failovered, now using #{@client.host}:#{@client.port}." - end + # Once you find the available standby_namenode, switch to the information for the namenode you are using. + def namenode_replace_to_standby(index) + @client_ha_index = index + @client = @clients_standby[@client_ha_index] + log.warn "Namenode failovered, now using #{@client.host}:#{@client.port}." end def send_data(path, data) @@ -402,6 +413,17 @@ def format(tag, time, record) '' end + # if occured the failover, check standby_namenodes and change available standby namenode to main namenode + def namenode_failover + @clients_standby.each_with_index do |client_standby, idx| + if namenode_available(client_standby) + namenode_replace_to_standby(idx) + return true + end + end + return false + end + def write(chunk) hdfs_path = generate_path(chunk) @@ -413,17 +435,15 @@ def write(chunk) rescue => e log.warn "failed to communicate hdfs cluster, path: #{hdfs_path}" - raise e if !@client_standby || failovered + raise e if !@clients_standby || failovered - if is_standby_exception(e) && namenode_available(@client_standby) + if is_standby_exception(e) && namenode_failover log.warn "Seems the connected host status is not active (maybe due to failovers). Gonna try another namenode immediately." - namenode_failover failovered = true retry end - if @num_errors && ((@num_errors + 1) >= @failures_before_use_standby) && namenode_available(@client_standby) + if @num_errors && ((@num_errors + 1) >= @failures_before_use_standby) && namenode_failover log.warn "Too many failures. Try to use the standby namenode instead." - namenode_failover failovered = true retry end