Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpanfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ on 'test' => sub {
requires 'Log::Any::Test', '>= 1.710';
requires 'Log::Any::Adapter::TAP';
requires 'File::Temp';
requires 'Test::NoLeaks';
};

on 'develop' => sub {
Expand Down
49 changes: 37 additions & 12 deletions lib/Database/Async/Engine/PostgreSQL.pm
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Database::Async::Engine->register_class(

sub configure {
my ($self, %args) = @_;
for (qw(service encoding application_name)) {
for (qw(service encoding application_name connection_timeout)) {
$self->{$_} = delete $args{$_} if exists $args{$_};
}
return $self->next::method(%args);
Expand Down Expand Up @@ -167,9 +167,7 @@ async sub connect {
my $connected = $self->connected;
die 'We think we are already connected, and that is bad' if $connected->as_numeric;

# Initial connection is made directly through the URI
# parameters. Eventually we also want to support UNIX
# socket and other types.
# Initial connection is made directly through the URI parameters
$self->{uri} ||= $self->uri_for_service($self->service) if $self->service;
my $uri = $self->uri;
die 'bad URI' unless ref $uri;
Expand All @@ -182,8 +180,6 @@ async sub connect {
$Protocol::Database::PostgreSQL::Constants::SSL_NAME_MAP{$mode} // die 'unknown SSL mode ' . $mode;
};

# We're assuming TCP (either v4 or v6) here, but there's not really any reason we couldn't have
# UNIX sockets or other transport layers here other than lack of demand so far.
my @connect_params;
if ($uri->host and not $uri->host =~ m!^[/@]!) {
@connect_params = (
Expand All @@ -208,7 +204,19 @@ async sub connect {
}
);
}
my $sock = await $loop->connect(@connect_params);

# We have a few stages in connection, so we'll create a common timeout Future and
# set a limit on _total_ elapsed time for connection establishment, including SSL
# but _not_ including authentication. If there's no timeout, we just use a Future
# which never resolves.
my $timeout = $self->connection_timeout ? $loop->timeout_future(
after => $self->connection_timeout
) : $loop->new_future->set_label('infinite_timeout_for_postgresql_connection');

my $sock = await Future->needs_any(
$loop->connect(@connect_params),
$timeout->without_cancel, # keep this around for the next stage
);

if ($sock->sockdomain == Socket::PF_INET or $sock->sockdomain == Socket::PF_INET6) {
my $local = join ':', $sock->sockhost_service(1);
Expand All @@ -226,14 +234,20 @@ async sub connect {
on_read => sub { 0 }
)
);
Scalar::Util::weaken($self->{stream} = $stream);

# SSL is conveniently simple: a prefix exchange before the real session starts,
# and the user can decide whether SSL is mandatory or optional.
$stream = await $self->negotiate_ssl(
stream => $stream,
# TLS is conveniently simple: a prefix exchange before the real session starts,
# and the user can decide whether TLS is mandatory or optional. This may return
# the original stream, or a new TLS-wrapped one.
$stream = await Future->needs_any(
$self->negotiate_ssl(
stream => $stream,
),
$timeout, # no need for ->without_cancel since this is the last step
);

# The returned `$stream` may have changed
Scalar::Util::weaken($self->{stream} = $stream);

$self->outgoing->each(sub {
$log->tracef('Write bytes [%v02x]', $_);
$self->ready_for_query->set_string('');
Expand Down Expand Up @@ -406,8 +420,19 @@ async sub negotiate_ssl {
}

sub is_replication { shift->{is_replication} //= 0 }

sub application_name { shift->{application_name} //= 'perl' }

=head2 connection_timeout

The timeout in seconds allowed for the initial database connection, including SSL negotiation.

Defaults to no timeout (waits indefinitely).

=cut

sub connection_timeout { shift->{connection_timeout} }

=head2 uri_for_dsn

Returns a L<URI> corresponding to the given L<database source name|https://en.wikipedia.org/wiki/Data_source_name>.
Expand Down
116 changes: 116 additions & 0 deletions t/loop-removal.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/usr/bin/env perl
use strict;
use warnings;

no indirect qw(fatal);
use utf8;

use Test::More;
use Test::NoLeaks;
use Future::AsyncAwait;
use Syntax::Keyword::Try;

use IO::Async::Loop;
use Database::Async;
use Database::Async::Engine::PostgreSQL;

my $loop = IO::Async::Loop->new;

my $uri = URI->new('postgresql://example@127.0.0.1:5000/empty?password=example-password');
Copy link

@chylli-deriv chylli-deriv Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question: why put password as a URI parameter? I tried but seems URI::db doesn't support such format. password need to put after user:
https://metacpan.org/pod/URI::db#Format

db:engine:[//[user[:password]@][host][:port]/][dbname][?params][#fragment]


# Attempt to connect and remove from the event loop a few times in succession.
# This would also need to confirm no FDs or other leftovers.
async sub cleanup_ok {
test_noleaks(
code => sub {
(async sub {
$loop->add(
my $instance = Database::Async->new(
uri => $uri
)
);

try {
await Future->wait_any(
$loop->timeout_future(after => 0.5),
$instance->query('select 1')->void,
);
await $loop->delay_future(after => 0.05);
} catch ($e) {
note "failed - $e";
$instance->remove_from_parent;
}
})->()->get;
},
track_memory => 1,
track_fds => 1,
passes => 10,
warmup_passes => 0,
tolerate_hits => 1,
);
}

subtest 'connection refused' => sub {
# We set up a listener to grab a port, then shut it down
# immediately afterwards. This gives us a port which is
# "less" likely to be reässigned to anyone else in the short
# time that this test runs.
my $listener = $loop->listen(
service => 0,
socktype => 'stream',
on_stream => sub {
my ($stream) = @_;
$stream->configure(on_read => sub { 0 });
$loop->add($stream)
}
)->get;
my $sock = $listener->read_handle;
my $port = $sock->sockport;
note "Listening on port ", $port, " - will close and try for connection-refused handling";
$listener->configure(handle => undef);
$listener->remove_from_parent;
$sock->close;
$uri->port($port);
cleanup_ok()->get;
done_testing;
};

subtest 'connection queued' => sub {
# Here we want the incoming connections to sit in SYN_SENT state, waiting
# in the TCP accept() queue backlog: we can override the acceptor in
# IO::Async::Listener to achieve this.
my $listener = $loop->listen(
service => 0,
socktype => 'stream',
acceptor => sub {
note 'accept() called, ignoring';
},
# No stream will ever succeed, so this can be empty
on_stream => sub { },
)->get;
my $port = $listener->read_handle->sockport;
note "Listening but not accepting on port ", $port;
$uri->port($port);
cleanup_ok()->get;
done_testing;
};

subtest 'connection accepted but no response' => sub {
my $listener = $loop->listen(
service => 0,
socktype => 'stream',
on_stream => sub {
my ($stream) = @_;
# We don't want to read, and we don't want to write - just sit
# there passively after accepting the connection
$stream->configure(on_read => sub { 0 });
$loop->add($stream)
}
)->get;
my $port = $listener->read_handle->sockport;
note "Listening on port ", $port;
$uri->port($port);
cleanup_ok()->get;
done_testing;
};