diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index fbbe03aa6..cec0a7579 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -5,23 +5,26 @@ package Anvil::Tools::Database; use strict; use warnings; +use Data::Dumper; use DBI; use Scalar::Util qw(weaken isweak); -use Data::Dumper; +use Proc::Simple; use Text::Diff; -use Time::HiRes qw(gettimeofday tv_interval); +use Time::HiRes qw(gettimeofday sleep tv_interval); use XML::LibXML; our $VERSION = "3.0.0"; my $THIS_FILE = "Database.pm"; ### Methods; +# add_listener # archive_database # backup_database # check_file_locations # check_hosts # check_lock_age # check_for_schema +# clone_connection # configure_pgsql # connect # disconnect @@ -101,7 +104,9 @@ my $THIS_FILE = "Database.pm"; # read # read_variable # refresh_timestamp +# remove_listener # resync_databases +# run_listener # shutdown # track_files # update_host_status @@ -146,7 +151,9 @@ sub new { my $class = shift; my $self = {}; - + + $self->{listeners} = {}; + bless $self, $class; return ($self); @@ -176,6 +183,68 @@ sub parent ############################################################################################################# +=head2 add_listener + +This method adds 1 listener to the primary database. The listener runs when the primary database executes a pg_notify with the matching name. + +It takes the same parameters as run_listener, with the following additions. + +Parameters; + +=head3 name (required) + +The name of the events to listen for. This must match with the name provided to 1 pg_notify call in procedures called by database triggers. + +=cut +sub add_listener +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->add_listener()" } }); + + my $notify_name = $parameters->{name}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { notify_name => $notify_name } }); + + if (not $notify_name) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->add_listener()", + parameter => "name", + value => $notify_name, + } }); + + return (1, undef, "missing name"); + } + + my $listener = Proc::Simple->new(); + + # The reference of all listeners are stored in $self->{listeners}, which will be removed on DESTROY of $self. + # + # Set the flag to kill the listener when all references to it are removed. + $listener->kill_on_destroy(1); + + my $forked = $listener->start(\&run_listener, $self, $parameters); + + if (not $forked) + { + return (2, undef, $!); + } + + if (not exists $self->{listeners}{$notify_name}) + { + $self->{listeners}{$notify_name} = {}; + } + + $self->{listeners}{$notify_name}{$listener->pid} = $listener; + + return (0, $listener); +} + + =head2 archive_database This method takes an array reference of database tables and check each to see if their history schema version needs to be archived or not. @@ -870,6 +939,112 @@ sub check_for_schema } +=head2 clone_connection + +This method attempts to clone and replace the primary database connection in the cache. + +When fork is called in a process where there is a database connection, the connection is not duplicated like other variables in the fork (or child process). + +This method is designed to be used in a fork to avoid multiple forks from sharing the parent process's connection. + +Although not recommended, it should still be safe to use in a non-fork process because the original connection should be released and garbage collected. + +Parameters; + +=head3 disconnect_base (optional, default 0) + +If set, the disconnect() will be called on the original connection when the clone succeeds. + +=head3 uuid (optional) + +If set, connection to the database identified by the provided UUID will be cloned instead of the primary database. + +=cut +sub clone_connection +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->clone_connection()" } }); + + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $disconnect_base = $parameters->{disconnect_base} // 0; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + db_uuid => $db_uuid, + disconnect_base => $disconnect_base, + } }); + + my $return_code = 0; + + if (not $db_uuid) + { + $return_code = 1; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { return_code => $return_code }, prefix => "Database->clone_connection()" }); + return ($return_code); + } + + # Get the copied parent's database handle, which was made when fork() + my $dbh = $anvil->data->{cache}{database_handle}{$db_uuid}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { dbh => $dbh } }); + + if ((not $dbh) or (not $dbh->ping())) + { + $return_code = 2; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { return_code => $return_code }, prefix => "Database->clone_connection()" }); + return ($return_code); + } + + # Clear the error container before the clone. + local $@; + + # Clone the parent's database handle for child use + my $clone = eval { $dbh->clone({ + RaiseError => 1, + AutoCommit => 1, + AutoInactiveDestroy => 1, + pg_enable_utf8 => 1, + }); }; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + '$@' => $@, + base => $dbh, + clone => $clone, + }, prefix => "dbh" }); + + if (not $clone) + { + # Failed to clone the parent's database handle + $return_code = 3; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { return_code => $return_code }, prefix => "Database->clone_connection()" }); + return ($return_code); + } + + if ($disconnect_base) + { + $dbh->disconnect({ debug => $debug }); + } + + # Release the reference to the copied parent's dbh; this will not close the parent's original database handle when auto_inactive_destroy is set + undef $anvil->data->{cache}{database_handle}{$db_uuid}; + + # Add the cloned child's database handle + $anvil->data->{cache}{database_handle}{$db_uuid} = $clone; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + "cache::database_handle::${db_uuid}" => $anvil->data->{cache}{database_handle}{$db_uuid}, + } }); + + return ($return_code, $clone); +} + + =head2 configure_pgsql This configures the local database server. Specifically, it checks to make sure the daemon is running and starts it if not. It also checks the C<< pg_hba.conf >> configuration to make sure it is set properly to listen on this machine's IP addresses and interfaces. @@ -18910,6 +19085,75 @@ sub refresh_timestamp } +=head2 remove_listener + +This method removes listener(s) registered with add_listener. + +Parameters; + +=head3 name (required) + +This is used to match the listeners registered under the given pg_notify name. + +=head3 pid (optional) + +If set, only the listener with the given PID will be killed, instead of all listeners registered under the given pg_notify name. + +=cut +sub remove_listener +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->remove_listener()" } }); + + my $listener_pid = $parameters->{pid}; + my $notify_name = $parameters->{name}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + listener_pid => $listener_pid, + notify_name => $notify_name, + } }); + + if (not $notify_name) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->remove_listener()", + parameter => "name", + value => $notify_name, + } }); + + return (1); + } + + my @listeners; + + if ((defined $listener_pid) and ($listener_pid =~ /^\d+$/)) + { + # Get only the listener with the given PID. + @listeners = ($self->{listeners}{$notify_name}{$listener_pid}); + } + else + { + # Include all listeners under the notify name to the scope. + @listeners = values(%{$self->{listeners}{$notify_name}}); + } + + foreach my $listener (@listeners) + { + # Forget the to-be killed listener. + delete $self->{listeners}{$notify_name}{$listener->pid}; + + # Make certain the listener is dead and stays dead. + $listener->kill() or $listener->kill("SIGKILL"); + } + + return (0); +} + + =head2 resync_databases This will resync the database data on this and peer database(s) if needed. It takes no arguments and will immediately return unless C<< sys::database::resync_needed >> was set. @@ -19518,6 +19762,180 @@ sub resync_databases } +=head2 run_listener + +This method is an indefinite loop that listens for notify calls from the primary database. + +Parameters; + +=head3 blocking (optional, default 1) + +If set, the listener process will listen to database notifications indefinitely. + +=head3 name (required) + +The name of the events to listen for. This must match with the name provided to 1 pg_notify call in procedures called by database triggers. + +=head3 on_child_forked (optional) + +If set, runs the referenced subroutine in the listener process after it is created. + +=head3 on_failed_to_clone (optional) + +If set, runs the referenced subroutine when the call to clone_connection() fails with a non-zero code. + +=head3 on_failed_to_ping (optional) + +If set, runs the referenced subroutine when a ping to database fails during the blocking check for notify. Likely used for clean up before terminating. + +=head3 on_notify (optional) + +If set, runs the referenced subroutine when a database notification is received. + +=head3 ping_interval (optional, default 60) + +Determines the interval in B<< seconds >> between pinging the database to ensure the connection is healthy. The listener will terminate if a ping fails. + +=head3 uuid (optional) + +If set, the listener will be added to the connection of the database identified with the provided UUID. + + +=cut +sub run_listener +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->run_listener()" } }); + + my $blocking = $parameters->{blocking} // 1; + my $db_ping_interval = $parameters->{ping_interval} // 60; + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $notify_name = $parameters->{name}; + my $on_child_forked = $parameters->{on_child_forked}; + my $on_failed_to_clone = $parameters->{on_failed_to_clone}; + my $on_failed_to_ping = $parameters->{on_failed_to_ping}; + my $on_notify = $parameters->{on_notify}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + blocking => $blocking, + db_uuid => $db_uuid, + notify_name => $notify_name, + on_child_forked => $on_child_forked, + on_failed_to_clone => $on_failed_to_clone, + on_notify => $on_notify, + } }); + + # Don't disconnect on exit until we've cloned the (probably primary) database connection. + + if (not $notify_name) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->run_listener()", + parameter => "name", + value => $notify_name, + } }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + } + + if ($db_ping_interval !~ /^\d+$/) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->run_listener()", + parameter => "ping_interval", + value => $db_ping_interval, + } }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 2 }); + } + + if ((not $db_uuid) or (not exists $anvil->data->{database}{$db_uuid})) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->run_listener()", + parameter => "uuid", + value => $db_uuid, + } }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 3 }); + } + + $on_child_forked->({ anvil => $anvil }) if (ref($on_child_forked) eq "CODE"); + + my ($clone_connection_code, $dbh) = $self->clone_connection({ uuid => $db_uuid }); + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + clone_connection_code => $clone_connection_code, + dbh => $dbh, + } }); + + if (not $dbh) + { + $on_failed_to_clone->({ anvil => $anvil }) if (ref($on_failed_to_clone) eq "CODE"); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 4 }); + } + + # We've cloned the database connection, remember to disconnect on exit after this point. + + my $listen = eval { $dbh->do("LISTEN ".$notify_name); }; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { listen => $listen } }); + + if (not $listen) + { + $anvil->nice_exit({ exit_code => 5 }); + } + + my $step = 0.1; + my $count = 0; + + while ($blocking) + { + while (my $notify = eval { $dbh->pg_notifies(); }) + { + my ($name, $pid, $payload) = @$notify; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + name => $name, + pid => $pid, + payload => $payload, + }, prefix => "pg_notify" }); + + $on_notify->({ anvil => $anvil, notify => $notify }) if (ref($on_notify) eq "CODE"); + } + + if ($count >= $db_ping_interval) + { + my $ping = eval { $dbh->ping(); }; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { ping => $ping } }); + + if (not $ping) + { + $on_failed_to_ping->({ anvil => $anvil }) if (ref($on_failed_to_ping) eq "CODE"); + + $anvil->nice_exit({ exit_code => 6 }); + } + + # Reset the counter + $count = 0; + } + + $count += $step; + + sleep($step); + } + + # The listener process should never reach here. + $anvil->nice_exit({ exit_code => 7 }); +} + + =head2 shutdown This gracefully shuts down the local database, waiting for active connections to exit before doing so. This call only works on a Striker dashboard. It creates a dump file of the database as part of the shutdown. It always returns C<< 0 >>. diff --git a/share/anvil.sql b/share/anvil.sql index f0028313f..4d6c7bc31 100644 --- a/share/anvil.sql +++ b/share/anvil.sql @@ -806,12 +806,12 @@ CREATE TABLE history.jobs ( ); ALTER TABLE history.jobs OWNER TO admin; -CREATE FUNCTION history_jobs() RETURNS trigger +CREATE OR REPLACE FUNCTION history_jobs() RETURNS trigger AS $$ DECLARE history_jobs RECORD; BEGIN - SELECT INTO history_jobs * FROM jobs WHERE job_uuid = new.job_uuid; + SELECT INTO history_jobs * FROM jobs WHERE job_uuid = NEW.job_uuid; INSERT INTO history.jobs (job_uuid, job_host_uuid, @@ -840,6 +840,7 @@ BEGIN history_jobs.job_description, history_jobs.job_status, history_jobs.modified_date); + PERFORM pg_notify('after_insert_or_update_job', row_to_json(NEW)::text); RETURN NULL; END; $$ diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 5b63a2232..5a728573e 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -154,6 +154,7 @@ use File::Spec::Functions; use File::Temp qw(tempdir); use IO::Socket::UNIX; use JSON; +use Proc::Simple; use Text::ParseWords; $| = 1; @@ -176,10 +177,10 @@ main(); sub access_chain { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $chain_str = $parameters->{chain}; - # optional: + # Optional: my $chain_args = $parameters->{chain_args} // []; my @chain = split(/->|[.]/, $chain_str); @@ -262,37 +263,13 @@ sub access_chain return (@results); } -# only used by child processes to clone the parent's database handles -sub clone_database_handles -{ - my $parameters = shift; - my $anvil = $parameters->{anvil}; - - foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) - { - if ((not exists $anvil->data->{cache}{database_handle}{$uuid}) or (not $anvil->data->{cache}{database_handle}{$uuid})) - { - # Useless handle, skip it. - next; - } - # get the copied parent's database handle, which was made when fork() - my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; - # clone the parent's database handle for child use - my $child_dbh = $dbh->clone(); - # destroy the copied parent's dbh; this will not close the parent's original database handle because auto_inactive_destroy is set - undef $anvil->data->{cache}{database_handle}{$uuid}; - # add the cloned child's dbh - $anvil->data->{cache}{database_handle}{$uuid} = $child_dbh; - } -} - sub db_access { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $sql = $parameters->{sql}; - # optional: + # Optional: my $mode = $parameters->{db_access_mode} // ""; my $uuid = $parameters->{db_uuid}; @@ -334,11 +311,11 @@ sub emit sub get_scmd_args { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $input = $parameters->{input}; my $get_values = $parameters->{get_values}; - # optional: + # Optional: my $cmd = $parameters->{cmd}; my $arg_names = $parameters->{names} // []; @@ -363,35 +340,69 @@ sub get_scmd_args return $args; } -sub handle_connections +sub handle_clone_database_connection_failure { my $parameters = shift; + # Required: my $anvil = $parameters->{anvil}; - my $server = $parameters->{server}; + my $responder = $parameters->{responder}; - local $SIG{INT} = sub { - emit("sigint"); + pstderr("failed to clone primary database handle"); - close($server); + $responder->shutdown(SHUT_RDWR); +} - emit("exit"); +sub handle_responder_output_setup +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $responder = $parameters->{responder}; - $anvil->catch_sig({ signal => "INT" }); - }; + # Disconnect from the parent's outputs to avoid interference - local $SIG{TERM} = sub { - emit("sigterm"); + close(STDOUT); + close(STDERR); - close($server); + # Redirect outputs to the responder for transport - emit("exit"); + open(STDOUT, ">&", $responder) or do { + print $responder "failed to open STDOUT; cause: ".$!."\n"; - $anvil->catch_sig({ signal => "TERM" }); + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + }; + + open(STDERR, ">&", $responder) or do { + print $responder "failed to open STDERR; cause: ".$!."\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); }; +} + +sub handle_connections +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $server = $parameters->{server}; emit("listening"); + my $emit_listener_event = sub { emit("dbl:".($_[1] // $$)."-".$_[0]); }; + my $emit_responder_event = sub { emit("responder:".($_[1] // $$)."-".$_[0]); }; + while (my $responder = $server->accept() or do { + # + # Ignore interrupt caused by auto SIGCHLD in the reaper of Proc::Simple. + # + # accept() will fail with $! set to "Interrupted system call" without ignoring the interrupt caused by SIGCHLD. + # + next if $!{EINTR}; + pstderr("failed to accept connections; cause: ".$!); close($server); @@ -401,112 +412,52 @@ sub handle_connections { my $request_line = <$responder>; - # responder is done reading + # Responder is done reading $responder->shutdown(SHUT_RD); - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { request_line => $request_line } }); + chomp($request_line); - last if ($request_line =~ /^(?:q|quit)\s*$/); - - my $pid = fork; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { request_line => $request_line } }); - if (not defined $pid) + if ($request_line =~ /^(?:q|quit)\s*$/) { - pstderr("failed to fork on receive; cause: ".$!); + $responder->shutdown(SHUT_WR); - next; + last; } - if ($pid) - { - emit("responder:".$pid."-forked"); + manage_database_listeners({ + anvil => $anvil, + emit => $emit_listener_event, + input => $request_line, + responder => $responder, + server => $server, + }) and do { + # Don't close the responder because it's being used by the listener to send data. next; - } - - ############# - ### BEGIN ### responder block - ############# - - # close the server because the child doesn't need it - close($server); - - # disconnect from the parent's outputs to avoid interference - - close(STDOUT); - close(STDERR); - - # redirect outputs to the responder for transport - - open(STDOUT, ">&", $responder) or do { - print $responder "failed to open STDOUT; cause: ".$!."\n"; - - $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); }; - open(STDERR, ">&", $responder) or do { - print $responder "failed to open STDERR; cause: ".$!."\n"; - - $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - }; - - # clone the database handle for this child to avoid interfering with another process that needs to use the database - clone_database_handles({ anvil => $anvil }); - - my @cmd_lines = split(/;;/, $request_line); - - foreach my $cmd_line (@cmd_lines) - { - $cmd_line =~ s/^\s+//; - $cmd_line =~ s/\s+$//; - - my $cmd_line_id; - - if ($cmd_line =~ s/^([[:xdigit:]]{8}-(?:[[:xdigit:]]{4}-){3}[[:xdigit:]]{12})\s+//) - { - $cmd_line_id = $1; - } - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { - cmd_line => $cmd_line, - cmd_line_id => $cmd_line_id, - } }); - - if ($cmd_line =~ /^$scmd_db_read\s+/) - { - process_scmd_db({ anvil => $anvil, cmd => $scmd_db_read, input => $cmd_line, lid => $cmd_line_id }); - } - elsif ($cmd_line =~ /^$scmd_db_write\s+/) - { - process_scmd_db({ anvil => $anvil, cmd => $scmd_db_write, input => $cmd_line, lid => $cmd_line_id }); - } - elsif ($cmd_line =~ /^$scmd_execute\s+/) - { - process_scmd_execute({ anvil => $anvil, input => $cmd_line, lid => $cmd_line_id }); - } - } + my $response_process = Proc::Simple->new(); - # responder is done writing - $responder->shutdown(SHUT_WR); + $response_process->start(\&run_responder, { + anvil => $anvil, + emit => $emit_responder_event, + line => $request_line, + responder => $responder, + server => $server, + }) or do { + pstderr("failed to fork on receive; cause: ".$!); - emit("responder:".$$."-exit"); + $responder->shutdown(SHUT_WR); - $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); + next; + }; - ############# - #### END #### responder block - ############# + $emit_responder_event->("forked", $response_process->pid); } - close($server); - - emit("exit"); - - $anvil->nice_exit({ exit_code => 0 }); + return 1; } sub is_array @@ -526,9 +477,9 @@ sub main $anvil->Get->switches; my $daemonize = $anvil->data->{switches}{'daemonize'} // 0; - my $script_file = $anvil->data->{switches}{'script'} // "-"; my $working_dir = $anvil->data->{switches}{'working-dir'} // cwd(); + emit("pid:".$$); emit("initialized"); $anvil->Database->connect({ auto_inactive_destroy => 1 }); @@ -570,181 +521,187 @@ sub main $anvil->nice_exit({ exit_code => 1 }); }; - # make child processes start-and-forget because we don't need to wait for them - local $SIG{CHLD} = "IGNORE"; - - if ($daemonize) + if (not $daemonize) { - return handle_connections({ anvil => $anvil, server => $server }); - } + my $emit_interface_event = sub { emit("interface:".($_[1] // $$)."-".$_[0]); }; - # make 1 child to interact on stdio - my $interface_pid = fork; + my $interface_process = Proc::Simple->new(); - if (not defined $interface_pid) - { - pstderr("failed to fork IO interface; cause: ".$!); - - close($server); + $interface_process->start(\&run_interface, { + anvil => $anvil, + emit => $emit_interface_event, + server => $server, + socket_path => $socket_path, + }) or do { + pstderr("failed to fork IO interface; cause: ". $!); - $anvil->nice_exit({ exit_code => 1 }); - } + close($server); - if ($interface_pid) - { - emit("interface:".$interface_pid."-forked"); + $anvil->nice_exit({ exit_code => 1 }); + }; - handle_connections({ anvil => $anvil, server => $server }); + $emit_interface_event->("forked", $interface_process->pid); } - ############# - ### BEGIN ### interface block - ############# - - # close the server because the child doesn't need it - close($server); - - my $script_file_handle; - local $SIG{INT} = sub { - emit("interface:".$$."-sigint"); + emit("sigint"); - close($script_file_handle); + close($server); - emit("interface:".$$."-exit"); + emit("exit"); $anvil->catch_sig({ signal => "INT" }); }; local $SIG{TERM} = sub { - emit("interface:".$$."-sigterm"); + emit("sigterm"); - close($script_file_handle); + close($server); - emit("interface:".$$."-exit"); + emit("exit"); $anvil->catch_sig({ signal => "TERM" }); }; - eval { - # TODO: make this script read piped input + # The main process shouldn't do anything other than handling connections. + handle_connections({ anvil => $anvil, server => $server }); - $script_file = "-" if ($script_file =~ /^#!SET!#$/); + close($server); - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_file => $script_file } }); + emit("exit"); - if ($script_file =~ /^-$/) - { - open($script_file_handle, "-"); - } - else - { - open($script_file_handle, "< :encoding(UTF-8)", $script_file); - } - } or do { - # open() sets $! upon error, different from the database module failure (which sets $@) - pstderr("failed to open ".$script_file." as script input; cause: ".$!); + $anvil->nice_exit({ exit_code => 0 }); +} - $anvil->nice_exit({ exit_code => 1 }); - }; +sub manage_database_listeners +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $input = $parameters->{input}; + my $responder = $parameters->{responder}; + my $server = $parameters->{server}; - emit("interface:".$$."-ready"); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input }, prefix => "manage_database_listeners" }); - while (my $script_line = <$script_file_handle>) + chomp($input); + + # Ignore because it's not a database listener action + if (not ($input =~ s/^dbl:://)) { - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_line => $script_line } }); + return 0; + } - if ($script_line =~ /^(?:q|quit)\s*$/) + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input } }); + + if ($input =~ s/^list//) + { + my $listeners = $anvil->Database->{listeners}; + + foreach my $name (sort { $a cmp $b } keys %{$listeners}) { - my $quitter = IO::Socket::UNIX->new( - Peer => $socket_path, - Type => SOCK_STREAM, - ) or do { - pstderr("failed to create QUIT connection to ".$socket_path."; cause: $@"); + foreach my $pid (sort { $a cmp $b } keys %{$listeners->{$name}}) + { + pstdout($name.":".$pid); + } + } - $anvil->nice_exit({ exit_code => 1 }); - }; + return 1; + } - print $quitter $script_line; + if ($input =~ s/^remove\s+(\w+)//) + { + my $name = $1; - $quitter->shutdown(SHUT_RDWR); + chomp($input); - last; + my $pid; + + if ($input =~ s/^(\d+)//) + { + $pid = $2; } - my $pid = fork; # the child process starts here when spawned successfully: + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name, pid => $pid } }); - # - fork returns `undef` when it fails to spawn the child - # - fork returns the child's pid to the parent - # - fork returns `0` to the child + $anvil->Database->remove_listener({ name => $name, pid => $pid }); - if (not defined $pid) { - pstderr("failed to fork on send; cause: ".$!); + return 1; + } - next; - } + if ($input =~ s/^add\s+(\w+)//) + { + my $name = $1; - if ($pid) - { - emit("requester:".$pid."-forked"); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name } }); - next; - } + my $handle_child_forked = sub { + # Close the server because the child doesn't need it + close($server); - ############# - ### BEGIN ### requester block - ############# + handle_responder_output_setup({ anvil => $anvil, responder => $responder }) + }; - # the child doesn't need to process input - close($script_file_handle); + my $handle_failed_to_clone = sub { + handle_clone_database_connection_failure({ anvil => $anvil, responder => $responder }); + }; - my $requester = IO::Socket::UNIX->new( - Peer => $socket_path, - Type => SOCK_STREAM, - ) or do { - pstderr("failed to create connection to ".$socket_path."; cause: ".$@); + my $handle_failed_to_ping = sub { + pstderr("failed to ping database"); - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + $responder->shutdown(SHUT_RDWR); }; - print $requester $script_line; + my $handle_notify = sub { + my $parameters = shift; + my $notify = $parameters->{notify}; - $requester->shutdown(SHUT_WR); + my ($name, $pid, $payload) = @$notify; - while (my $line = <$requester>) - { - chomp($line); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + name => $name, + pid => $pid, + payload => $payload, + }, prefix => "handle_notify" }); - eval { - my $decoded = decode_json($line); - my $encoded = JSON->new->utf8->allow_blessed->pretty->encode($decoded); + $emit->($name.":".$payload); + }; - pstdout($encoded); - } or do { - pstdout($line); - } - } + local $SIG{INT} = sub { + $emit->("sigint"); + $emit->("exit"); - $requester->shutdown(SHUT_RD); + $anvil->catch_sig({ signal => "INT" }); + }; - emit("requester:".$$."-exit"); + local $SIG{TERM} = sub { + $emit->("sigterm"); + $emit->("exit"); - $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); + $anvil->catch_sig({ signal => "TERM" }); + }; - ############# - #### END #### requester block - ############# - } + my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ + debug => 2, + name => $name, + on_child_forked => $handle_child_forked, + on_failed_to_clone => $handle_failed_to_clone, + on_failed_to_ping => $handle_failed_to_ping, + on_notify => $handle_notify, + }); - close($script_file_handle); + if ($add_listener_code != 0) + { + pstderr("failed to fork on receive; cause: ".$error); - emit("interface:".$$."-exit"); + return 0; + } - $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); + $emit->("forked", $listener->pid); - ############# - #### END #### interface block - ############# + return 1; + } } sub prettify @@ -762,11 +719,11 @@ sub prettify sub process_scmd_db { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $cmd = $parameters->{cmd}; my $input = $parameters->{input}; - # optional: + # Optional: my $lid = $parameters->{lid} // ""; my $mode; @@ -811,10 +768,10 @@ sub process_scmd_db sub process_scmd_execute { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $input = $parameters->{input}; - # optional: + # Optional: my $lid = $parameters->{lid} // ""; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { @@ -868,3 +825,234 @@ sub pstderr { print STDERR "error: ".$_[0]."\n" if defined $_[0]; } + +sub run_interface +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $server = $parameters->{server}; + my $socket_path = $parameters->{socket_path}; + + my $script_file = $anvil->data->{switches}{'script'} // "-"; + + # Close the server because the child doesn't need it. + close($server); + + my $script_file_handle; + + # Setup custom signal handlers for the child process. + + local $SIG{INT} = sub { + $emit->("sigint"); + + close($script_file_handle); + + $emit->("exit"); + + $anvil->catch_sig({ signal => "INT" }); + }; + + local $SIG{TERM} = sub { + $emit->("sigterm"); + + close($script_file_handle); + + $emit->("exit"); + + $anvil->catch_sig({ signal => "TERM" }); + }; + + eval { + # TODO: make this script read piped input. + + $script_file = "-" if ($script_file =~ /^#!SET!#$/); + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_file => $script_file } }); + + if ($script_file eq "-") + { + open($script_file_handle, $script_file); + } + else + { + open($script_file_handle, "< :encoding(UTF-8)", $script_file); + } + } or do { + # open() sets $! upon error, different from the database module failure (which sets $@). + pstderr("failed to open ".$script_file." as script input; cause: ".$!); + + $anvil->nice_exit({ exit_code => 1 }); + }; + + $emit->("ready"); + + while (my $script_line = <$script_file_handle>) + { + chomp($script_line); + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_line => $script_line } }); + + if ($script_line =~ /^(?:q|quit)\s*$/) + { + my $quitter = IO::Socket::UNIX->new( + Peer => $socket_path, + Type => SOCK_STREAM, + ) or do { + pstderr("failed to create QUIT connection to ".$socket_path."; cause: $@"); + + $anvil->nice_exit({ exit_code => 1 }); + }; + + print $quitter $script_line; + + $quitter->shutdown(SHUT_RDWR); + + last; + } + + my $emit_requester_event = sub { emit("requester:".($_[1] // $$)."-".$_[0]); }; + + my $request_process = Proc::Simple->new(); + + $request_process->start(\&run_requester, { + anvil => $anvil, + emit => $emit_requester_event, + script_fh => $script_file_handle, + script_line => $script_line, + socket_path => $socket_path, + }) or do { + pstderr("failed to fork requester; cause: ". $!); + + next; + }; + + $emit_requester_event->("forked", $request_process->pid); + } + + close($script_file_handle); + + $emit->("exit"); + + $anvil->nice_exit({ exit_code => 0 }); +} + +sub run_requester +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $script_fh = $parameters->{script_fh}; + my $script_line = $parameters->{script_line}; + my $socket_path = $parameters->{socket_path}; + + # The child doesn't need to process input. + close($script_fh); + + my $requester = IO::Socket::UNIX->new( + Peer => $socket_path, + Type => SOCK_STREAM, + ) or do { + pstderr("failed to create connection to ".$socket_path."; cause: ".$@); + + # We're not using the database connection(s) here; it's not cloned. + # The cache is pointing to the main process's connection(s); don't close it. + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + }; + + print $requester $script_line; + + $requester->shutdown(SHUT_WR); + + while (my $line = <$requester>) + { + chomp($line); + + eval { + my $decoded = decode_json($line); + my $encoded = JSON->new->utf8->allow_blessed->pretty->encode($decoded); + + pstdout($encoded); + } or do { + pstdout($line); + } + } + + $requester->shutdown(SHUT_RD); + + $emit->("exit"); + + # Same; don't disconnect because the cache is pointing to the original connection(s). + $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); +} + +sub run_responder +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $line = $parameters->{line}; + my $responder = $parameters->{responder}; + my $server = $parameters->{server}; + + # Close the server because the child doesn't need it + close($server); + + handle_responder_output_setup({ anvil => $anvil, responder => $responder }); + + # Clone the database handle for this child to avoid interfering with another process that needs to use the database + # + # The database handle variable only holds a reference to the parent's database handle before clone completes, therefore we shouldn't call disconnect unless clone succeeds + + my ($clone_connection_code, $clone) = $anvil->Database->clone_connection({ debug => 2 }); + + if (not $clone) + { + handle_clone_database_connection_failure({ anvil => $anvil, responder => $responder }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + } + + my @cmd_lines = split(/;;/, $line); + + foreach my $cmd_line (@cmd_lines) + { + $cmd_line =~ s/^\s+//; + $cmd_line =~ s/\s+$//; + + my $cmd_line_id; + + if ($cmd_line =~ s/^([[:xdigit:]]{8}-(?:[[:xdigit:]]{4}-){3}[[:xdigit:]]{12})\s+//) + { + $cmd_line_id = $1; + } + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + cmd_line => $cmd_line, + cmd_line_id => $cmd_line_id, + } }); + + if ($cmd_line =~ /^$scmd_db_read\s+/) + { + process_scmd_db({ anvil => $anvil, cmd => $scmd_db_read, input => $cmd_line, lid => $cmd_line_id }); + } + elsif ($cmd_line =~ /^$scmd_db_write\s+/) + { + process_scmd_db({ anvil => $anvil, cmd => $scmd_db_write, input => $cmd_line, lid => $cmd_line_id }); + } + elsif ($cmd_line =~ /^$scmd_execute\s+/) + { + process_scmd_execute({ anvil => $anvil, input => $cmd_line, lid => $cmd_line_id }); + } + } + + $emit->("exit"); + + # Responder is done writing + $responder->shutdown(SHUT_WR); + + $anvil->nice_exit({ exit_code => 0 }); +} diff --git a/tools/anvil-daemon b/tools/anvil-daemon index 25279be2b..e8f4169c6 100755 --- a/tools/anvil-daemon +++ b/tools/anvil-daemon @@ -514,7 +514,11 @@ sub handle_periodic_tasks # This checks to see if the strikers in the DB are also in anvil.conf check_strikers($anvil); - + + # Check if the job listener is alive, try spawning it if not. + # Let the next minute try again if the spawn fails. + spawn_job_listener($anvil) if (not check_job_listener($anvil)); + # Do Striker-specific minute tasks $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { host_type => $host_type }}); if ($host_type eq "striker") @@ -1422,7 +1426,10 @@ sub run_once # Make sure we don't spam the hell out of the console with network manager messages on boot check_sysctl($anvil); - + + # Start listening for jobs. + spawn_job_listener($anvil); + if ($anvil->data->{switches}{'startup-only'}) { $anvil->nice_exit({exit_code => 0}); @@ -2105,7 +2112,10 @@ sub keep_running sub run_jobs { my ($anvil, $startup) = @_; + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { startup => $startup }}); + + $startup = $startup // 0; # Don't start jobs for 30 seconds after startup. if (not $startup) @@ -2181,7 +2191,7 @@ sub run_jobs 's13:started_seconds_ago' => $started_seconds_ago, 's14:updated_seconds_ago' => $updated_seconds_ago, }}); - + # If we're not configured, we will only run the 'anvil-configure-host' job my $configured = $anvil->System->check_if_configured; $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { configured => $configured }}); @@ -2241,13 +2251,13 @@ sub run_jobs $anvil->data->{sys}{jobs_running} = 1; $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "sys::jobs_running" => $anvil->data->{sys}{jobs_running} }}); } - + + # Get the list of processes on this machine. + $anvil->System->pids({ignore_me => 1}); + # See if the job was picked up by a now-dead instance. if ($job_picked_up_by) { - # Check if the PID is still active. - $anvil->System->pids({ignore_me => 1}); - ### TODO: Add a check to verify the job isn't hung. # Skip if this job is in progress. if (not exists $anvil->data->{pids}{$job_picked_up_by}) @@ -2316,6 +2326,44 @@ sub run_jobs } } } + elsif ($job_progress == 0) # The race condition handling should only look at not-started jobs. + { + # Maybe the database doesn't know about the job yet, try to find it in the processes. + # + # This should prevent duplicates caused by race condition between the daemon's main process and its database listener. + + my $found = 0; + + foreach my $pid (keys %{$anvil->data->{pids}}) + { + my $process = $anvil->data->{pids}{$pid}; + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { pid => $pid, %$process }}); + + # Only look at processes with the '--job-uuid' flag; ignore the ones started on CLI. + if ($process->{command} =~ /--job-uuid(?:=|\s*)(\w{8}-(?:\w{4}-){3}\w{12})/) + { + my $process_flag_job_uuid = $1; + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + job_uuid => $job_uuid, + process_flag_job_uuid => $process_flag_job_uuid, + }}); + + if ($process_flag_job_uuid eq $job_uuid) + { + # The jobs is already started by another process but not recorded into the database yet. + # + # Mark to skip it. + $found = 1; + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { found => $found }}); + + last; + } + } + } + + # Don't run the job when it's alredy running. + next if ($found); + } # Convert the double-banged strings into a proper message. my $say_title = $job_title ? $anvil->Words->parse_banged_string({key_string => $job_title}) : ""; @@ -2590,3 +2638,84 @@ sub check_files return(0); } + +sub check_job_listener +{ + my $anvil = shift; + + my $listeners = $anvil->Database->{listeners}{"after_insert_or_update_job"}; + + my $count = 0; + + # Loop through job listeners only, there should only be 1. + foreach my $pid (keys %{$listeners}) + { + my $listener = $listeners->{$pid}; + + if ($listener->poll()) + { + # Adjust the return code, any greater than 0 is a pass. + $count += 1; + } + else + { + # The job listener was found dead during a periodic check, dispose the remains. + delete $listeners->{$pid}; + # Beat it just to make sure it's actually dead. + $listener->kill() or $listener->kill("SIGKILL"); + } + } + + return ($count); +} + +sub spawn_job_listener +{ + my $anvil = shift; + + my $handle_notify = sub { + my $params = shift; + my $notify = $params->{notify}; + + my ($name, $pid, $payload) = @$notify; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + name => $name, + pid => $pid, + payload => $payload, + }, prefix => "handle_notify" }); + + my $job = eval { decode_json($payload); }; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 3, list => { job => $job } }); + + if (not $job) + { + # Ignore job(s) with malformed payload. + return (0); + } + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => $job, prefix => "notify" }); + + if ($job->{job_progress} != 0) + { + # Ignore running or finished job(s). + return (0); + } + + run_jobs($anvil, 0); + }; + + my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ + debug => 2, + name => "after_insert_or_update_job", + on_notify => $handle_notify, + }); + + if ($add_listener_code != 0) + { + return (0, undef, $error); + } + + return (1, $listener); +} diff --git a/tools/anvil-version-changes b/tools/anvil-version-changes index 27663ee3b..8256172de 100755 --- a/tools/anvil-version-changes +++ b/tools/anvil-version-changes @@ -108,7 +108,10 @@ sub striker_checks # This adds notes to storage_group_members update_host_variables($anvil); - + + # Change the trigger of the jobs table to include the notify call. + update_jobs_trigger($anvil); + return(0); } @@ -862,6 +865,58 @@ CREATE TRIGGER trigger_host_variables return(0); } +sub update_jobs_trigger +{ + my ($anvil) = @_; + + foreach my $uuid (sort {$a cmp $b} keys %{$anvil->data->{cache}{database_handle}}) + { + my $sql = q|CREATE OR REPLACE FUNCTION history_jobs() RETURNS trigger +AS $$ +DECLARE + history_jobs RECORD; +BEGIN + SELECT INTO history_jobs * FROM jobs WHERE job_uuid = NEW.job_uuid; + INSERT INTO history.jobs + (job_uuid, + job_host_uuid, + job_command, + job_data, + job_picked_up_by, + job_picked_up_at, + job_updated, + job_name, + job_progress, + job_title, + job_description, + job_status, + modified_date) + VALUES + (history_jobs.job_uuid, + history_jobs.job_host_uuid, + history_jobs.job_command, + history_jobs.job_data, + history_jobs.job_picked_up_by, + history_jobs.job_picked_up_at, + history_jobs.job_updated, + history_jobs.job_name, + history_jobs.job_progress, + history_jobs.job_title, + history_jobs.job_description, + history_jobs.job_status, + history_jobs.modified_date); + PERFORM pg_notify('after_insert_or_update_job', row_to_json(NEW)::text); + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +|; + $anvil->Database->write({debug => 2, uuid => $uuid, query => $sql, source => $THIS_FILE, line => __LINE__}); + } + + return(0); +} + sub update_storage_group_members { my ($anvil) = @_;