diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 9a07acbe22..a0aca082a8 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -610,7 +610,17 @@ def verify_connection end def establish_connection(sock, ri) + start_time = Fluent::Clock.now + timeout = @sender.hard_timeout + while ri.state != :established + # Check for timeout to prevent infinite loop + if Fluent::Clock.now - start_time > timeout + @log.warn "handshake timeout after #{timeout}s", host: @host, port: @port + disable! + break + end + begin # TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly. # We need rewrite around here using new socket/server plugin helper. diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index cb35a31743..6209a3b1ae 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -1347,4 +1347,27 @@ def plugin_id_for_test? end end end + + test 'establish_connection_timeout' do + @d = d = create_driver(%[ + hard_timeout 1 + + host #{TARGET_HOST} + port #{@target_port} + + ]) + + node = d.instance.nodes.first + mock_sock = flexmock('socket') + mock_sock.should_receive(:read_nonblock).with(512).and_return('').at_least.once + + ri = Fluent::Plugin::ForwardOutput::ConnectionManager::RequestInfo.new(:helo) + + assert_true node.available? + node.establish_connection(mock_sock, ri) + assert_false node.available? + + logs = d.logs + assert{ logs.any?{|log| log.include?('handshake timeout after 1.0s') } } + end end