diff --git a/cpanfile b/cpanfile index 5e80f29..70aa3dd 100644 --- a/cpanfile +++ b/cpanfile @@ -30,6 +30,7 @@ on 'test' => sub { requires 'Log::Any::Test', '>= 1.710'; requires 'Log::Any::Adapter::TAP'; requires 'File::Temp'; + requires 'Test::NoLeaks'; }; on 'develop' => sub { diff --git a/lib/Database/Async/Engine/PostgreSQL.pm b/lib/Database/Async/Engine/PostgreSQL.pm index fedeaf2..7b14bf7 100644 --- a/lib/Database/Async/Engine/PostgreSQL.pm +++ b/lib/Database/Async/Engine/PostgreSQL.pm @@ -90,7 +90,7 @@ Database::Async::Engine->register_class( sub configure { my ($self, %args) = @_; - for (qw(service encoding application_name)) { + for (qw(service encoding application_name connection_timeout)) { $self->{$_} = delete $args{$_} if exists $args{$_}; } return $self->next::method(%args); @@ -167,9 +167,7 @@ async sub connect { my $connected = $self->connected; die 'We think we are already connected, and that is bad' if $connected->as_numeric; - # Initial connection is made directly through the URI - # parameters. Eventually we also want to support UNIX - # socket and other types. + # Initial connection is made directly through the URI parameters $self->{uri} ||= $self->uri_for_service($self->service) if $self->service; my $uri = $self->uri; die 'bad URI' unless ref $uri; @@ -182,8 +180,6 @@ async sub connect { $Protocol::Database::PostgreSQL::Constants::SSL_NAME_MAP{$mode} // die 'unknown SSL mode ' . $mode; }; - # We're assuming TCP (either v4 or v6) here, but there's not really any reason we couldn't have - # UNIX sockets or other transport layers here other than lack of demand so far. my @connect_params; if ($uri->host and not $uri->host =~ m!^[/@]!) { @connect_params = ( @@ -208,7 +204,19 @@ async sub connect { } ); } - my $sock = await $loop->connect(@connect_params); + + # We have a few stages in connection, so we'll create a common timeout Future and + # set a limit on _total_ elapsed time for connection establishment, including SSL + # but _not_ including authentication. If there's no timeout, we just use a Future + # which never resolves. + my $timeout = $self->connection_timeout ? $loop->timeout_future( + after => $self->connection_timeout + ) : $loop->new_future->set_label('infinite_timeout_for_postgresql_connection'); + + my $sock = await Future->needs_any( + $loop->connect(@connect_params), + $timeout->without_cancel, # keep this around for the next stage + ); if ($sock->sockdomain == Socket::PF_INET or $sock->sockdomain == Socket::PF_INET6) { my $local = join ':', $sock->sockhost_service(1); @@ -226,14 +234,20 @@ async sub connect { on_read => sub { 0 } ) ); + Scalar::Util::weaken($self->{stream} = $stream); - # SSL is conveniently simple: a prefix exchange before the real session starts, - # and the user can decide whether SSL is mandatory or optional. - $stream = await $self->negotiate_ssl( - stream => $stream, + # TLS is conveniently simple: a prefix exchange before the real session starts, + # and the user can decide whether TLS is mandatory or optional. This may return + # the original stream, or a new TLS-wrapped one. + $stream = await Future->needs_any( + $self->negotiate_ssl( + stream => $stream, + ), + $timeout, # no need for ->without_cancel since this is the last step ); - + # The returned `$stream` may have changed Scalar::Util::weaken($self->{stream} = $stream); + $self->outgoing->each(sub { $log->tracef('Write bytes [%v02x]', $_); $self->ready_for_query->set_string(''); @@ -406,8 +420,19 @@ async sub negotiate_ssl { } sub is_replication { shift->{is_replication} //= 0 } + sub application_name { shift->{application_name} //= 'perl' } +=head2 connection_timeout + +The timeout in seconds allowed for the initial database connection, including SSL negotiation. + +Defaults to no timeout (waits indefinitely). + +=cut + +sub connection_timeout { shift->{connection_timeout} } + =head2 uri_for_dsn Returns a L corresponding to the given L. diff --git a/t/loop-removal.t b/t/loop-removal.t new file mode 100644 index 0000000..61c88c0 --- /dev/null +++ b/t/loop-removal.t @@ -0,0 +1,116 @@ +#!/usr/bin/env perl +use strict; +use warnings; + +no indirect qw(fatal); +use utf8; + +use Test::More; +use Test::NoLeaks; +use Future::AsyncAwait; +use Syntax::Keyword::Try; + +use IO::Async::Loop; +use Database::Async; +use Database::Async::Engine::PostgreSQL; + +my $loop = IO::Async::Loop->new; + +my $uri = URI->new('postgresql://example@127.0.0.1:5000/empty?password=example-password'); + +# Attempt to connect and remove from the event loop a few times in succession. +# This would also need to confirm no FDs or other leftovers. +async sub cleanup_ok { + test_noleaks( + code => sub { + (async sub { + $loop->add( + my $instance = Database::Async->new( + uri => $uri + ) + ); + + try { + await Future->wait_any( + $loop->timeout_future(after => 0.5), + $instance->query('select 1')->void, + ); + await $loop->delay_future(after => 0.05); + } catch ($e) { + note "failed - $e"; + $instance->remove_from_parent; + } + })->()->get; + }, + track_memory => 1, + track_fds => 1, + passes => 10, + warmup_passes => 0, + tolerate_hits => 1, + ); +} + +subtest 'connection refused' => sub { + # We set up a listener to grab a port, then shut it down + # immediately afterwards. This gives us a port which is + # "less" likely to be reƤssigned to anyone else in the short + # time that this test runs. + my $listener = $loop->listen( + service => 0, + socktype => 'stream', + on_stream => sub { + my ($stream) = @_; + $stream->configure(on_read => sub { 0 }); + $loop->add($stream) + } + )->get; + my $sock = $listener->read_handle; + my $port = $sock->sockport; + note "Listening on port ", $port, " - will close and try for connection-refused handling"; + $listener->configure(handle => undef); + $listener->remove_from_parent; + $sock->close; + $uri->port($port); + cleanup_ok()->get; + done_testing; +}; + +subtest 'connection queued' => sub { + # Here we want the incoming connections to sit in SYN_SENT state, waiting + # in the TCP accept() queue backlog: we can override the acceptor in + # IO::Async::Listener to achieve this. + my $listener = $loop->listen( + service => 0, + socktype => 'stream', + acceptor => sub { + note 'accept() called, ignoring'; + }, + # No stream will ever succeed, so this can be empty + on_stream => sub { }, + )->get; + my $port = $listener->read_handle->sockport; + note "Listening but not accepting on port ", $port; + $uri->port($port); + cleanup_ok()->get; + done_testing; +}; + +subtest 'connection accepted but no response' => sub { + my $listener = $loop->listen( + service => 0, + socktype => 'stream', + on_stream => sub { + my ($stream) = @_; + # We don't want to read, and we don't want to write - just sit + # there passively after accepting the connection + $stream->configure(on_read => sub { 0 }); + $loop->add($stream) + } + )->get; + my $port = $listener->read_handle->sockport; + note "Listening on port ", $port; + $uri->port($port); + cleanup_ok()->get; + done_testing; +}; +