Skip to content

Modify to use multiple standby_namenodes #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 45 additions & 25 deletions lib/fluent/plugin/out_webhdfs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output
config_param :namenode, :string, default: nil # host:port
desc 'Standby namenode for Namenode HA (host:port)'
config_param :standby_namenode, :string, default: nil # host:port

desc 'Ignore errors on start up'
config_param :ignore_start_check_error, :bool, default: false

Expand Down Expand Up @@ -174,16 +173,21 @@ def configure(conf)
else
raise Fluent::ConfigError, "WebHDFS host or namenode missing"
end

# If you're running three or more name nodes, Write down the standby name node to be used, separated by spaces
if @standby_namenode
unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @standby_namenode
raise Fluent::ConfigError, "Invalid config value about standby namenode: '#{@standby_namenode}', needs STANDBY_NAMENODE_HOST:PORT"
end
if @httpfs
raise Fluent::ConfigError, "Invalid configuration: specified to use both of standby_namenode and httpfs."
@standby_namenode_group = Array.new
for standby_namenode_content in @standby_namenode.split do
unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ standby_namenode_content
raise Fluent::ConfigError, "Invalid config value about standby namenode: '#{standby_namenode_content}', needs STANDBY_NAMENODE_HOST:PORT"
end
if @httpfs
raise Fluent::ConfigError, "Invalid configuration: specified to use both of standby_namenode and httpfs."
end
@standby_namenode_group << {host:$1 ,port:$2.to_i}
end
@standby_namenode_host = $1
@standby_namenode_port = $2.to_i
end

unless @path.index('/') == 0
raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
end
Expand All @@ -195,12 +199,14 @@ def configure(conf)
end
@renew_kerberos_delegation_token_interval_hour = @renew_kerberos_delegation_token_interval / 60 / 60
end

@client = prepare_client(@namenode_host, @namenode_port, @username)
if @standby_namenode_host
@client_standby = prepare_client(@standby_namenode_host, @standby_namenode_port, @username)
# Use clients_standby to find available names.
if @standby_namenode_group
@clients_standby = @standby_namenode_group.map{ |node| prepare_client(node[:host],node[:port],@username) }
@clients_standby = @clients_standby.prepend(@client)
else
@client_standby = nil
@clients_standby = nil
end

unless @append
Expand Down Expand Up @@ -261,9 +267,14 @@ def start
log.info "webhdfs connection confirmed: #{@namenode_host}:#{@namenode_port}"
return
end
if @client_standby && namenode_available(@client_standby)
log.info "webhdfs connection confirmed: #{@standby_namenode_host}:#{@standby_namenode_port}"
return
# If you use standby_namenode, search all standby_namenodes to find available names.
if @clients_standby
for client_standby_check in @clients_standby do
if namenode_available(client_standby_check)
log.info "webhdfs connection confirmed: #{client_standby_check.host}:#{client_standby_check.port}"
return
end
end
end

unless @ignore_start_check_error
Expand All @@ -275,11 +286,11 @@ def is_standby_exception(e)
e.is_a?(WebHDFS::IOError) && e.message.match(/org\.apache\.hadoop\.ipc\.StandbyException/)
end

def namenode_failover
if @standby_namenode
@client, @client_standby = @client_standby, @client
log.warn "Namenode failovered, now using #{@client.host}:#{@client.port}."
end
# Once you find the available standby_namenode, switch to the information for the namenode you are using.
def namenode_replace_to_standby(index)
@client_ha_index = index
@client = @clients_standby[@client_ha_index]
log.warn "Namenode failovered, now using #{@client.host}:#{@client.port}."
end

def send_data(path, data)
Expand Down Expand Up @@ -402,6 +413,17 @@ def format(tag, time, record)
''
end

# if occured the failover, check standby_namenodes and change available standby namenode to main namenode
def namenode_failover
@clients_standby.each_with_index do |client_standby, idx|
if namenode_available(client_standby)
namenode_replace_to_standby(idx)
return true
end
end
return false
end

def write(chunk)
hdfs_path = generate_path(chunk)

Expand All @@ -413,17 +435,15 @@ def write(chunk)
rescue => e
log.warn "failed to communicate hdfs cluster, path: #{hdfs_path}"

raise e if !@client_standby || failovered
raise e if !@clients_standby || failovered

if is_standby_exception(e) && namenode_available(@client_standby)
if is_standby_exception(e) && namenode_failover
log.warn "Seems the connected host status is not active (maybe due to failovers). Gonna try another namenode immediately."
namenode_failover
failovered = true
retry
end
if @num_errors && ((@num_errors + 1) >= @failures_before_use_standby) && namenode_available(@client_standby)
if @num_errors && ((@num_errors + 1) >= @failures_before_use_standby) && namenode_failover
log.warn "Too many failures. Try to use the standby namenode instead."
namenode_failover
failovered = true
retry
end
Expand Down