diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 4c323bb0ab..fe9c253ff0 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -620,7 +620,17 @@ def verify_connection end def establish_connection(sock, ri) + start_time = Fluent::Clock.now + timeout = @sender.hard_timeout + while ri.state != :established + # Check for timeout to prevent infinite loop + if Fluent::Clock.now - start_time > timeout + @log.warn "handshake timeout after #{timeout}s", host: @host, port: @port + disable! + break + end + begin # TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly. # We need rewrite around here using new socket/server plugin helper. diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 3ec2f321f1..0c44e1d74e 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -1406,4 +1406,27 @@ def plugin_id_for_test? assert_equal 0, @d.instance.healthy_nodes_count assert_equal 0, @d.instance.registered_nodes_count end + + test 'establish_connection_timeout' do + @d = d = create_driver(%[ + hard_timeout 1 + + host #{TARGET_HOST} + port #{@target_port} + + ]) + + node = d.instance.nodes.first + mock_sock = flexmock('socket') + mock_sock.should_receive(:read_nonblock).with(512).and_return('').at_least.once + + ri = Fluent::Plugin::ForwardOutput::ConnectionManager::RequestInfo.new(:helo) + + assert_true node.available? + node.establish_connection(mock_sock, ri) + assert_false node.available? + + logs = d.logs + assert{ logs.any?{|log| log.include?('handshake timeout after 1.0s') } } + end end