From 9682a20d6315368a1ec098ddb1b01b749ad887f8 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 4 Mar 2025 01:16:44 -0500 Subject: [PATCH 01/47] fix(tools): don't fork when there's no usable DB connection in access module --- tools/anvil-access-module | 69 ++++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 5b63a2232..a8cb5bb9b 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -163,9 +163,10 @@ my $running_directory = ($0 =~ /^(.*?)\/$THIS_FILE$/)[0]; $running_directory =~ s/^\./$ENV{PWD}/ if $running_directory =~ /^\./ && $ENV{PWD}; -my $scmd_db_read = "r"; -my $scmd_db_write = "w"; -my $scmd_execute = "x"; +my $scmd_db_listen = "listen"; +my $scmd_db_read = "r"; +my $scmd_db_write = "w"; +my $scmd_execute = "x"; main(); @@ -262,28 +263,62 @@ sub access_chain return (@results); } -# only used by child processes to clone the parent's database handles -sub clone_database_handles +sub check_database_handles { my $parameters = shift; my $anvil = $parameters->{anvil}; + my $success_count = 0; + foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) { - if ((not exists $anvil->data->{cache}{database_handle}{$uuid}) or (not $anvil->data->{cache}{database_handle}{$uuid})) + my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; + + if ((not exists $dbh) or (not $dbh) or (not $dbh->ping)) { - # Useless handle, skip it. + # ignore broken database handle(s) next; } + + $success_count += 1; + } + + return $success_count; +} + +# only used by child processes to clone the parent's database handles +sub clone_database_handles +{ + my $parameters = shift; + my $anvil = $parameters->{anvil}; + + my $success_count = 0; + + foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) + { # get the copied parent's database handle, which was made when fork() my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; + + my $child_dbh; + # clone the parent's database handle for child use - my $child_dbh = $dbh->clone(); + my $success = eval { $child_dbh = $dbh->clone(); }; + + if (not $success) + { + # failed to clone the parent's database handle; skip it + next; + } + # destroy the copied parent's dbh; this will not close the parent's original database handle because auto_inactive_destroy is set undef $anvil->data->{cache}{database_handle}{$uuid}; # add the cloned child's dbh $anvil->data->{cache}{database_handle}{$uuid} = $child_dbh; + + $success_count += 1; } + + return $success_count; } sub db_access @@ -408,6 +443,12 @@ sub handle_connections last if ($request_line =~ /^(?:q|quit)\s*$/); + check_database_handles({ anvil => $anvil }) or do { + pstderr("no usable database handle"); + + next; + } + my $pid = fork; if (not defined $pid) @@ -455,7 +496,13 @@ sub handle_connections }; # clone the database handle for this child to avoid interfering with another process that needs to use the database - clone_database_handles({ anvil => $anvil }); + clone_database_handles({ anvil => $anvil }) or do { + print $responder "failed to clone database handle(s)\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + } my @cmd_lines = split(/;;/, $request_line); @@ -488,6 +535,10 @@ sub handle_connections { process_scmd_execute({ anvil => $anvil, input => $cmd_line, lid => $cmd_line_id }); } + elsif ($cmd_line =~ /^$scmd_db_listen\s+/) + { + # it's a request to register a listener to a database trigger + } } # responder is done writing From 2eeedef6faac9297004f1ebbfbe97db06b167376 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 5 Mar 2025 14:49:13 -0500 Subject: [PATCH 02/47] fix(tools): disconnect cloned DB connection in access module --- tools/anvil-access-module | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index a8cb5bb9b..b5149cb4e 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -496,6 +496,9 @@ sub handle_connections }; # clone the database handle for this child to avoid interfering with another process that needs to use the database + # + # the database handle variable only holds a reference to the parent's database handle before clone completes, therefore we shouldn't call disconnect unless clone succeeds + clone_database_handles({ anvil => $anvil }) or do { print $responder "failed to clone database handle(s)\n"; @@ -546,7 +549,7 @@ sub handle_connections emit("responder:".$$."-exit"); - $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); + $anvil->nice_exit({ exit_code => 0 }); ############# #### END #### responder block From 6cd626b2347e7ae75d7a9e3bc3c91813b0709510 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 5 Mar 2025 15:39:08 -0500 Subject: [PATCH 03/47] fix(tools): improve DB check, clone logging in access module --- tools/anvil-access-module | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index b5149cb4e..1454b7592 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -268,22 +268,29 @@ sub check_database_handles my $parameters = shift; my $anvil = $parameters->{anvil}; - my $success_count = 0; + my $success = 0; foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) { my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + database_uuid => $uuid, + dbh_local => $dbh, + } }); + if ((not exists $dbh) or (not $dbh) or (not $dbh->ping)) { # ignore broken database handle(s) next; } - $success_count += 1; + $success += 1; } - return $success_count; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { success => $success }, prefix => "check_database_handles" }); + + return $success; } # only used by child processes to clone the parent's database handles @@ -292,33 +299,44 @@ sub clone_database_handles my $parameters = shift; my $anvil = $parameters->{anvil}; - my $success_count = 0; + my $success = 0; foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) { # get the copied parent's database handle, which was made when fork() my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; + # release the reference to the copied parent's dbh; this will not close the parent's original database handle because auto_inactive_destroy is set + undef $anvil->data->{cache}{database_handle}{$uuid}; + my $child_dbh; # clone the parent's database handle for child use - my $success = eval { $child_dbh = $dbh->clone(); }; + my $cloned = eval { $child_dbh = $dbh->clone(); }; - if (not $success) + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + database_uuid => $uuid, + dbh_local => $dbh, + dbh_cache => $anvil->data->{cache}{database_handle}{$uuid}, + cloned => $cloned, + dbh_child => $child_dbh, + } }); + + if (not $cloned) { # failed to clone the parent's database handle; skip it next; } - # destroy the copied parent's dbh; this will not close the parent's original database handle because auto_inactive_destroy is set - undef $anvil->data->{cache}{database_handle}{$uuid}; # add the cloned child's dbh $anvil->data->{cache}{database_handle}{$uuid} = $child_dbh; - $success_count += 1; + $success += 1; } - return $success_count; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { success => $success }, prefix => "clone_database_handles" }); + + return $success; } sub db_access From 212cfa1ad51c0ec25c411a68a7a1bbb481f27d8e Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 19:42:00 -0400 Subject: [PATCH 04/47] fix(share): notify in insert or update job trigger function --- share/anvil.sql | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/share/anvil.sql b/share/anvil.sql index f0028313f..4d6c7bc31 100644 --- a/share/anvil.sql +++ b/share/anvil.sql @@ -806,12 +806,12 @@ CREATE TABLE history.jobs ( ); ALTER TABLE history.jobs OWNER TO admin; -CREATE FUNCTION history_jobs() RETURNS trigger +CREATE OR REPLACE FUNCTION history_jobs() RETURNS trigger AS $$ DECLARE history_jobs RECORD; BEGIN - SELECT INTO history_jobs * FROM jobs WHERE job_uuid = new.job_uuid; + SELECT INTO history_jobs * FROM jobs WHERE job_uuid = NEW.job_uuid; INSERT INTO history.jobs (job_uuid, job_host_uuid, @@ -840,6 +840,7 @@ BEGIN history_jobs.job_description, history_jobs.job_status, history_jobs.modified_date); + PERFORM pg_notify('after_insert_or_update_job', row_to_json(NEW)::text); RETURN NULL; END; $$ From 50ddec61e688a23fd5efab1c21fb7843a2e33a01 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 19:55:37 -0400 Subject: [PATCH 05/47] fix(tools): rm listen scmd, comment after handle conn in access module --- tools/anvil-access-module | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 1454b7592..9d2fe795e 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -163,7 +163,6 @@ my $running_directory = ($0 =~ /^(.*?)\/$THIS_FILE$/)[0]; $running_directory =~ s/^\./$ENV{PWD}/ if $running_directory =~ /^\./ && $ENV{PWD}; -my $scmd_db_listen = "listen"; my $scmd_db_read = "r"; my $scmd_db_write = "w"; my $scmd_execute = "x"; @@ -281,7 +280,6 @@ sub check_database_handles if ((not exists $dbh) or (not $dbh) or (not $dbh->ping)) { - # ignore broken database handle(s) next; } @@ -419,6 +417,7 @@ sub get_scmd_args sub handle_connections { my $parameters = shift; + # required: my $anvil = $parameters->{anvil}; my $server = $parameters->{server}; @@ -556,10 +555,6 @@ sub handle_connections { process_scmd_execute({ anvil => $anvil, input => $cmd_line, lid => $cmd_line_id }); } - elsif ($cmd_line =~ /^$scmd_db_listen\s+/) - { - # it's a request to register a listener to a database trigger - } } # responder is done writing @@ -647,7 +642,9 @@ sub main if ($daemonize) { - return handle_connections({ anvil => $anvil, server => $server }); + handle_connections({ anvil => $anvil, server => $server }); + # when running as daemon, the main process shouldn't do anything other than handling connections + return; } # make 1 child to interact on stdio @@ -667,6 +664,8 @@ sub main emit("interface:".$interface_pid."-forked"); handle_connections({ anvil => $anvil, server => $server }); + # after starting a child process to handle the stdin interactions, the main process should only handle connections + return; } ############# From 541a07c67fcfb1a634500625f24f8b5838fa204d Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 19:57:33 -0400 Subject: [PATCH 06/47] fix(tools): add database notification listener management in access module --- tools/anvil-access-module | 233 +++++++++++++++++++++++++++++--------- 1 file changed, 178 insertions(+), 55 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 9d2fe795e..406d53c00 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -382,6 +382,84 @@ sub emit pstdout("event=".$_[0]); } +# +# this subroutine should only run in the main process +# +sub fork_responder +{ + my $parameters = shift; + # required: + my $anvil = $parameters->{anvil}; + my $responder = $parameters->{responder}; + my $server = $parameters->{server}; + + check_database_handles({ anvil => $anvil }) or do { + pstderr("no usable database handle"); + + return 0; + } + + my $pid = fork; + + if (not defined $pid) + { + pstderr("failed to fork on receive; cause: ".$!); + + return 0; + } + + if ($pid) + { + emit("responder:".$pid."-forked"); + + return 0; + } + + ############# + ### BEGIN ### responder block + ############# + + # close the server because the child doesn't need it + close($server); + + # disconnect from the parent's outputs to avoid interference + + close(STDOUT); + close(STDERR); + + # redirect outputs to the responder for transport + + open(STDOUT, ">&", $responder) or do { + print $responder "failed to open STDOUT; cause: ".$!."\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + }; + + open(STDERR, ">&", $responder) or do { + print $responder "failed to open STDERR; cause: ".$!."\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + }; + + # clone the database handle for this child to avoid interfering with another process that needs to use the database + # + # the database handle variable only holds a reference to the parent's database handle before clone completes, therefore we shouldn't call disconnect unless clone succeeds + + clone_database_handles({ anvil => $anvil }) or do { + print $responder "failed to clone database handle(s)\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + } + + return 1; +} + sub get_scmd_args { my $parameters = shift; @@ -414,6 +492,87 @@ sub get_scmd_args return $args; } +# +# this subroutine should only run in the main process +# +sub manage_database_listeners +{ + my $parameters = shift; + # required: + my $anvil = $parameters->{anvil}; + my $db_listeners = $parameters->{db_listeners}; + my $input = $parameters->{input}; + my $responder = $parameters->{responder}; + my $server = $parameters->{server}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input }, prefix => "manage_database_listeners" }); + + # ignore because it's not a database listener action + if (not ($input =~ s/^\s*dbl:://)) + { + return 0; + } + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input } }); + + if ($input =~ s/^list//) + { + foreach my $trigger (sort { $a cmp $b } keys %{$db_listeners}) + { + my $pid = $db_listeners->{$trigger}; + + pstdout($trigger.":".$pid); + } + + return 1; + } + + if ($input =~ s/^remove\s+(\d+)//) + { + my $pid = $1; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { pid => $pid } }); + + kill 'TERM' $pid; + + return 1; + } + + if ($input =~ s/^add\s+(\w+)//) + { + my $trigger = $1; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { trigger => $trigger } }); + + fork_responder({ + anvil => $anvil, + responder => $responder, + server => $server, + }) or do { + return 1; + }; + + my $dbh; + + foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) + { + $dbh = $anvil->data->{cache}{database_handle}{$uuid}; + + eval { $dbh->do("LISTEN ".$trigger); } and do { + last; + } + } + + while (my $notify = eval { $dbh->pg_notifies; }) { + my ($name, $pid, $payload) = @$notify; + + emit($name.":".$pid.":".$payload); + } + + return 1; + } +} + sub handle_connections { my $parameters = shift; @@ -421,6 +580,8 @@ sub handle_connections my $anvil = $parameters->{anvil}; my $server = $parameters->{server}; + my $database_listeners = {}; + local $SIG{INT} = sub { emit("sigint"); @@ -460,69 +621,31 @@ sub handle_connections last if ($request_line =~ /^(?:q|quit)\s*$/); - check_database_handles({ anvil => $anvil }) or do { - pstderr("no usable database handle"); - - next; - } - - my $pid = fork; - - if (not defined $pid) - { - pstderr("failed to fork on receive; cause: ".$!); + manage_database_listeners({ + anvil => $anvil, + db_listeners => $database_listeners, + input => $request_line, + responder => $responder, + server => $server, + }) and do { + pstdout($$."::manage_database_listeners ended with true"); next; - } - - if ($pid) - { - emit("responder:".$pid."-forked"); - - next; - } - - ############# - ### BEGIN ### responder block - ############# - - # close the server because the child doesn't need it - close($server); - - # disconnect from the parent's outputs to avoid interference - - close(STDOUT); - close(STDERR); - - # redirect outputs to the responder for transport - - open(STDOUT, ">&", $responder) or do { - print $responder "failed to open STDOUT; cause: ".$!."\n"; - - $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); }; - open(STDERR, ">&", $responder) or do { - print $responder "failed to open STDERR; cause: ".$!."\n"; + pstdout($$."::manage_database_listeners ended with false"); - $responder->shutdown(SHUT_RDWR); + fork_responder({ + anvil => $anvil, + responder => $responder, + server => $server, + }) or do { + pstdout($$."::fork_responder ended with false"); - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + next; }; - # clone the database handle for this child to avoid interfering with another process that needs to use the database - # - # the database handle variable only holds a reference to the parent's database handle before clone completes, therefore we shouldn't call disconnect unless clone succeeds - - clone_database_handles({ anvil => $anvil }) or do { - print $responder "failed to clone database handle(s)\n"; - - $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - } + pstdout($$."::fork_responder ended with true"); my @cmd_lines = split(/;;/, $request_line); From 3b2a8691139ed98c546aa8830528b5324c5ceb68 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 20:43:57 -0400 Subject: [PATCH 07/47] fix(tools): correct HASH dbh exists check in access module --- tools/anvil-access-module | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 406d53c00..631b85a2a 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -271,6 +271,11 @@ sub check_database_handles foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) { + if (not exists $anvil->data->{cache}{database_handle}{$uuid}) + { + next; + } + my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { @@ -278,7 +283,7 @@ sub check_database_handles dbh_local => $dbh, } }); - if ((not exists $dbh) or (not $dbh) or (not $dbh->ping)) + if ((not $dbh) or (not $dbh->ping)) { next; } From 2e54802e48f74432f9325492154f2013ad5b1495 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 20:44:55 -0400 Subject: [PATCH 08/47] fix(tools): correct kill db listener syntax in access module --- tools/anvil-access-module | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 631b85a2a..fa0fe7db3 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -538,7 +538,7 @@ sub manage_database_listeners $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { pid => $pid } }); - kill 'TERM' $pid; + kill('TERM', $pid); return 1; } From d2136896f52374e15ee1989c9ef722fc37a05654 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 20:49:56 -0400 Subject: [PATCH 09/47] fix(tools): correct syntax in fork_responder of access module --- tools/anvil-access-module | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index fa0fe7db3..74d1770d9 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -402,7 +402,7 @@ sub fork_responder pstderr("no usable database handle"); return 0; - } + }; my $pid = fork; @@ -460,7 +460,7 @@ sub fork_responder $responder->shutdown(SHUT_RDWR); $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - } + }; return 1; } From bef1ef00699e2b7da3cf2d834b3ee1f07f63f6aa Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 22:58:33 -0400 Subject: [PATCH 10/47] fix(tools): emit before SHUT_WR in handle_connection of access module --- tools/anvil-access-module | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 74d1770d9..2c23a91fa 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -685,11 +685,11 @@ sub handle_connections } } + emit("responder:".$$."-exit"); + # responder is done writing $responder->shutdown(SHUT_WR); - emit("responder:".$$."-exit"); - $anvil->nice_exit({ exit_code => 0 }); ############# From 502ff4e6a00ab04e699ab5cbdc44f589dc49e201 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 23:09:15 -0400 Subject: [PATCH 11/47] fix(tools): clean up inputs with chomp in access module --- tools/anvil-access-module | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 2c23a91fa..16a8a7936 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -619,6 +619,8 @@ sub handle_connections { my $request_line = <$responder>; + chomp($request_line); + # responder is done reading $responder->shutdown(SHUT_RD); @@ -851,6 +853,8 @@ sub main while (my $script_line = <$script_file_handle>) { + chomp($script_line); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_line => $script_line } }); if ($script_line =~ /^(?:q|quit)\s*$/) From 78bb5b105b248ea4985fe23ae2100b6a9c91fe38 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 11 Mar 2025 23:40:54 -0400 Subject: [PATCH 12/47] fix(tools): make exit_responder to pair with fork_responder in access module --- tools/anvil-access-module | 53 ++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 16a8a7936..9b8dce6fe 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -387,9 +387,29 @@ sub emit pstdout("event=".$_[0]); } -# -# this subroutine should only run in the main process -# +sub exit_responder +{ + my $parameters = shift; + # required: + my $anvil = $parameters->{anvil}; + my $responder = $parameters->{responder}; + + emit("responder:".$$."-exit"); + + # responder is done writing + $responder->shutdown(SHUT_WR); + + $anvil->nice_exit({ exit_code => 0 }); + + ############# + #### END #### responder block + ############# +} + +# notes: +# - this subroutine should only run in the main process +# - every line after the call to this function are a part of the responder process +# - calling this subroutine requires calling exit_responder later for clean up sub fork_responder { my $parameters = shift; @@ -497,9 +517,7 @@ sub get_scmd_args return $args; } -# # this subroutine should only run in the main process -# sub manage_database_listeners { my $parameters = shift; @@ -549,11 +567,7 @@ sub manage_database_listeners $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { trigger => $trigger } }); - fork_responder({ - anvil => $anvil, - responder => $responder, - server => $server, - }) or do { + fork_responder({ anvil => $anvil, responder => $responder, server => $server }) or do { return 1; }; @@ -574,7 +588,7 @@ sub manage_database_listeners emit($name.":".$pid.":".$payload); } - return 1; + exit_responder({ anvil => $anvil, responder => $responder }); } } @@ -642,11 +656,7 @@ sub handle_connections pstdout($$."::manage_database_listeners ended with false"); - fork_responder({ - anvil => $anvil, - responder => $responder, - server => $server, - }) or do { + fork_responder({ anvil => $anvil, responder => $responder, server => $server }) or do { pstdout($$."::fork_responder ended with false"); next; @@ -687,16 +697,7 @@ sub handle_connections } } - emit("responder:".$$."-exit"); - - # responder is done writing - $responder->shutdown(SHUT_WR); - - $anvil->nice_exit({ exit_code => 0 }); - - ############# - #### END #### responder block - ############# + exit_responder({ anvil => $anvil, responder => $responder }); } close($server); From 891c41fb868b2c409d6826a5eb955da82bd3a427 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Mon, 17 Mar 2025 19:30:10 -0400 Subject: [PATCH 13/47] feat: write add_listener, clone_connection methods in database module --- Anvil/Tools/Database.pm | 222 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index fbbe03aa6..c2a2103de 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -16,12 +16,14 @@ our $VERSION = "3.0.0"; my $THIS_FILE = "Database.pm"; ### Methods; +# add_listener # archive_database # backup_database # check_file_locations # check_hosts # check_lock_age # check_for_schema +# clone_connection # configure_pgsql # connect # disconnect @@ -175,6 +177,154 @@ sub parent # Public methods # ############################################################################################################# +=head2 add_listener + +This method adds 1 listener to the primary database. The listener runs when the primary database executes a pg_notify with the matching name. + +Parameters; + +=head3 blocking (optional, default 1) + +If set, the listener process will listen to database notifications indefinitely. + +=head3 name (required) + +The name of the events to listen for. This must match with the name provided to 1 pg_notify call in procedures called by database triggers. + +=head3 on_fork_child (optional) + +If set, runs the referenced subroutine in the listener process after it is created. + +=head3 on_notify (optional) + +If set, runs the referenced subroutine when a database notification is received. + +=head3 uuid (optional) + +If set, the listener will be added to the connection of the database identified with the provided UUID. + +=cut +sub add_listener +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->add_listener()" } }); + + my $blocking = $parameters->{blocking} // 1; + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $fork_child_handler = $parameters->{on_fork_child}; + my $notify_handler = $parameters->{on_notify}; + my $notify_name = $parameters->{name}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + blocking => $blocking, + db_uuid => $db_uuid, + fork_child_handler => $fork_child_handler, + fork_fail_handler => $fork_fail_handler, + fork_parent_handler => $fork_parent_handler, + notify_handler => $notify_handler, + notify_name => $notify_name, + } }); + + if (not $notify_name) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0020", variables => { + method => "Database->add_listener()", + parameter => "name", + } }); + + return (1); + } + + if (not $db_uuid) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0020", variables => { + method => "Database->add_listener()", + parameter => "uuid", + } }); + + return (2); + } + + my $fork = fork; + + if (not defined $fork) + { + return (3); + } + + if ($fork) + { + # The parent must record the listener process PID for removal at a later point. + + return (0, $fork); + } + + ############# + ### BEGIN ### child block + ############# + + $fork_child_handler->({ anvil => $anvil }) if (ref($fork_child_handler) eq "CODE"); + + my ($clone_connection_code, $dbh) = $self->clone_connection({ uuid => $db_uuid }); + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + clone_connection_code => $clone_connection_code, + dbh => $dbh, + } }); + + if ((not $dbh) or (not eval { $dbh->ping })) + { + $anvil->nice_exit({ exit_code => 1 }); + } + + my $listen = eval { $dbh->do("LISTEN ".$trigger); }; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { listen => $listen } }); + + if (not $listen) + { + $anvil->nice_exit({ exit_code => 2 }); + } + + while ($blocking) + { + while (my $notify = eval { $dbh->pg_notifies; }) + { + my ($name, $pid, $payload) = @$notify; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + name => $name, + pid => $pid, + payload => $payload, + }, prefix => "pg_notify" }); + + $notify_handler->({ anvil => $anvil, notify => $notify }) if (ref($notify_handler) eq "CODE"); + } + + my $ping = eval { $dbh->ping; }; + + if (not $ping) + { + # TODO: Maybe try recovering the listener process? + + $anvil->nice_exit({ exit_code => 3 }); + } + + sleep(0.1); + } + + # The listener process should never reach here. + $anvil->nice_exit({ exit_code => 4 }); + + ############# + #### END #### child block + ############# +} + =head2 archive_database @@ -870,6 +1020,78 @@ sub check_for_schema } +=head2 clone_connection + +This method attempts to clone and replace the primary database connection in the cache. + +When fork is called in a process where there is a database connection, the connection is not duplicated like other variables in the fork (or child process). + +This method is designed to be used in a fork to avoid multiple forks from sharing the parent process's connection. + +Although not recommended, it should still be safe to use in a non-fork process because the original connection should be released and garbage collected. + +Parameters; + +=head3 disconnect_base (optional, default 0) + +If set, the disconnect() will be called on the original connection when the clone succeeds. + +=head3 uuid (optional) + +If set, connection to the database identified by the provided UUID will be cloned instead of the primary database. + +=cut +sub clone_connection +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->clone_connection()" } }); + + my $disconnect_base = $parameters->{disconnect_base} // 0; + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + + if (not $db_uuid) + { + return (1); + } + + # Get the copied parent's database handle, which was made when fork() + my $dbh = $anvil->data->{cache}{database_handle}{$db_uuid}; + + if ((not $dbh) or (not $dbh->ping())) + { + return (2); + } + + # Clone the parent's database handle for child use + my $clone = eval { $dbh->clone(); }; + + if (not $clone) + { + # Failed to clone the parent's database handle + return (3); + } + + if ($disconnect_base) + { + $dbh->disconnect(); + } + else + { + # Release the reference to the copied parent's dbh; this will not close the parent's original database handle when auto_inactive_destroy is set + undef $anvil->data->{cache}{database_handle}{$db_uuid}; + } + + # Add the cloned child's database handle + $anvil->data->{cache}{database_handle}{$db_uuid} = $clone; + + return (0, $clone); +} + + =head2 configure_pgsql This configures the local database server. Specifically, it checks to make sure the daemon is running and starts it if not. It also checks the C<< pg_hba.conf >> configuration to make sure it is set properly to listen on this machine's IP addresses and interfaces. From 740c69128bc7d5d1a25a53643308262cb6390ddb Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Mon, 17 Mar 2025 21:08:31 -0400 Subject: [PATCH 14/47] fix: add hook when dbh clone fails in Database->add_listener() --- Anvil/Tools/Database.pm | 58 ++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index c2a2103de..114beefa3 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -191,6 +191,10 @@ If set, the listener process will listen to database notifications indefinitely. The name of the events to listen for. This must match with the name provided to 1 pg_notify call in procedures called by database triggers. +=head3 on_clone_fail (optional) + +If set, runs the referenced subroutine when the call to clone_connection() fails with a non-zero code. + =head3 on_fork_child (optional) If set, runs the referenced subroutine in the listener process after it is created. @@ -213,20 +217,20 @@ sub add_listener $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->add_listener()" } }); - my $blocking = $parameters->{blocking} // 1; - my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; - my $fork_child_handler = $parameters->{on_fork_child}; - my $notify_handler = $parameters->{on_notify}; - my $notify_name = $parameters->{name}; + my $blocking = $parameters->{blocking} // 1; + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $clone_fail_handler = $parameters->{on_clone_fail}; + my $fork_child_handler = $parameters->{on_fork_child}; + my $notify_handler = $parameters->{on_notify}; + my $notify_name = $parameters->{name}; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { - blocking => $blocking, - db_uuid => $db_uuid, - fork_child_handler => $fork_child_handler, - fork_fail_handler => $fork_fail_handler, - fork_parent_handler => $fork_parent_handler, - notify_handler => $notify_handler, - notify_name => $notify_name, + blocking => $blocking, + db_uuid => $db_uuid, + clone_fail_handler => $clone_fail_handler, + fork_child_handler => $fork_child_handler, + notify_handler => $notify_handler, + notify_name => $notify_name, } }); if (not $notify_name) @@ -236,7 +240,7 @@ sub add_listener parameter => "name", } }); - return (1); + return (1, undef, "missing name"); } if (not $db_uuid) @@ -246,14 +250,14 @@ sub add_listener parameter => "uuid", } }); - return (2); + return (2, undef, "missing uuid"); } my $fork = fork; if (not defined $fork) { - return (3); + return (3, undef, $!); } if ($fork) @@ -276,9 +280,11 @@ sub add_listener dbh => $dbh, } }); - if ((not $dbh) or (not eval { $dbh->ping })) + if (not $dbh) { - $anvil->nice_exit({ exit_code => 1 }); + $clone_fail_handler->({ anvil => $anvil }) if (ref($clone_fail_handler) eq "CODE"); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); } my $listen = eval { $dbh->do("LISTEN ".$trigger); }; @@ -292,7 +298,7 @@ sub add_listener while ($blocking) { - while (my $notify = eval { $dbh->pg_notifies; }) + while (my $notify = eval { $dbh->pg_notifies(); }) { my ($name, $pid, $payload) = @$notify; @@ -305,7 +311,7 @@ sub add_listener $notify_handler->({ anvil => $anvil, notify => $notify }) if (ref($notify_handler) eq "CODE"); } - my $ping = eval { $dbh->ping; }; + my $ping = eval { $dbh->ping(); }; if (not $ping) { @@ -1069,6 +1075,12 @@ sub clone_connection # Clone the parent's database handle for child use my $clone = eval { $dbh->clone(); }; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + base_dbh => $dbh, + clone_dbh => $clone, + database_uuid => $db_uuid, + } }); + if (not $clone) { # Failed to clone the parent's database handle @@ -1079,11 +1091,9 @@ sub clone_connection { $dbh->disconnect(); } - else - { - # Release the reference to the copied parent's dbh; this will not close the parent's original database handle when auto_inactive_destroy is set - undef $anvil->data->{cache}{database_handle}{$db_uuid}; - } + + # Release the reference to the copied parent's dbh; this will not close the parent's original database handle when auto_inactive_destroy is set + undef $anvil->data->{cache}{database_handle}{$db_uuid}; # Add the cloned child's database handle $anvil->data->{cache}{database_handle}{$db_uuid} = $clone; From 67e19d1aa22b1234297350fb5296828a8ad29c75 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Mon, 17 Mar 2025 22:53:31 -0400 Subject: [PATCH 15/47] fix(tools): apply add_listener, clone_connection database methods in access module --- tools/anvil-access-module | 167 ++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 88 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 9b8dce6fe..4230203b3 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -296,52 +296,6 @@ sub check_database_handles return $success; } -# only used by child processes to clone the parent's database handles -sub clone_database_handles -{ - my $parameters = shift; - my $anvil = $parameters->{anvil}; - - my $success = 0; - - foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) - { - # get the copied parent's database handle, which was made when fork() - my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; - - # release the reference to the copied parent's dbh; this will not close the parent's original database handle because auto_inactive_destroy is set - undef $anvil->data->{cache}{database_handle}{$uuid}; - - my $child_dbh; - - # clone the parent's database handle for child use - my $cloned = eval { $child_dbh = $dbh->clone(); }; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { - database_uuid => $uuid, - dbh_local => $dbh, - dbh_cache => $anvil->data->{cache}{database_handle}{$uuid}, - cloned => $cloned, - dbh_child => $child_dbh, - } }); - - if (not $cloned) - { - # failed to clone the parent's database handle; skip it - next; - } - - # add the cloned child's dbh - $anvil->data->{cache}{database_handle}{$uuid} = $child_dbh; - - $success += 1; - } - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { success => $success }, prefix => "clone_database_handles" }); - - return $success; -} - sub db_access { my $parameters = shift; @@ -447,40 +401,18 @@ sub fork_responder # close the server because the child doesn't need it close($server); - # disconnect from the parent's outputs to avoid interference - - close(STDOUT); - close(STDERR); - - # redirect outputs to the responder for transport - - open(STDOUT, ">&", $responder) or do { - print $responder "failed to open STDOUT; cause: ".$!."\n"; - - $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - }; - - open(STDERR, ">&", $responder) or do { - print $responder "failed to open STDERR; cause: ".$!."\n"; - - $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - }; + handle_responder_output_setup({ anvil => $anvil, responder => $responder }); # clone the database handle for this child to avoid interfering with another process that needs to use the database # # the database handle variable only holds a reference to the parent's database handle before clone completes, therefore we shouldn't call disconnect unless clone succeeds - clone_database_handles({ anvil => $anvil }) or do { - print $responder "failed to clone database handle(s)\n"; + my ($clone_connection_code, $clone) = $anvil->Database->clone_connection({ debug => 2 }); - $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - }; + if (not $clone) + { + handle_database_clone_connection_failure({ anvil => $anvil, responder => $responder }); + } return 1; } @@ -567,31 +499,90 @@ sub manage_database_listeners $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { trigger => $trigger } }); - fork_responder({ anvil => $anvil, responder => $responder, server => $server }) or do { - return 1; + my $clone_fail_handler = \&handle_database_clone_connection_failure; + + my $fork_child_handler = sub { + # close the server because the child doesn't need it + close($server); + + handle_responder_output_setup({ anvil => $anvil, responder => $responder }); + } + + my $notify_handler = sub { + my $params = shift; + my $notify = $params->{notify}; + + my ($n_name, $n_pid, $n_payload) = @$notify; + + emit($n_name.":".$n_pid.":".$n_payload); }; - my $dbh; + my ($add_listener_code, $pid, $error) = $anvil->Database->add_listener({ + debug => 2, + name => $trigger, + on_clone_fail => $clone_fail_handler, + on_fork_child => $fork_child_handler, + on_notify => $notify_handler, + }); - foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) + if ($add_listener_code != 0) { - $dbh = $anvil->data->{cache}{database_handle}{$uuid}; + pstdout("failed to fork on receive; cause: ".$error); - eval { $dbh->do("LISTEN ".$trigger); } and do { - last; - } + return 0; } - while (my $notify = eval { $dbh->pg_notifies; }) { - my ($name, $pid, $payload) = @$notify; + emit("db-listener:".$pid."-forked"); - emit($name.":".$pid.":".$payload); - } - - exit_responder({ anvil => $anvil, responder => $responder }); + return 1; } } +sub handle_database_clone_connection_failure +{ + my $parameters = shift; + # required: + my $anvil = $parameters->{anvil}; + my $responder = $parameters->{responder}; + + print $responder "failed to clone primary database handle\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); +} + +sub handle_responder_output_setup +{ + my $parameters = shift; + # required: + my $anvil = $parameters->{anvil}; + my $responder = $parameters->{responder}; + + # disconnect from the parent's outputs to avoid interference + + close(STDOUT); + close(STDERR); + + # redirect outputs to the responder for transport + + open(STDOUT, ">&", $responder) or do { + print $responder "failed to open STDOUT; cause: ".$!."\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + }; + + open(STDERR, ">&", $responder) or do { + print $responder "failed to open STDERR; cause: ".$!."\n"; + + $responder->shutdown(SHUT_RDWR); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + }; +} + sub handle_connections { my $parameters = shift; From 14f8bed505dd5f16f4374103aa7bf2136ef14607 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 18 Mar 2025 16:40:05 -0400 Subject: [PATCH 16/47] fix: don't ping every iteration of blocking in Database->add_listener --- Anvil/Tools/Database.pm | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 114beefa3..c62c915a7 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -296,6 +296,9 @@ sub add_listener $anvil->nice_exit({ exit_code => 2 }); } + my $step = 0.1; + my $interval = 0; + while ($blocking) { while (my $notify = eval { $dbh->pg_notifies(); }) @@ -311,16 +314,24 @@ sub add_listener $notify_handler->({ anvil => $anvil, notify => $notify }) if (ref($notify_handler) eq "CODE"); } - my $ping = eval { $dbh->ping(); }; - - if (not $ping) + if ($interval >= 30) { - # TODO: Maybe try recovering the listener process? + my $ping = eval { $dbh->ping(); }; + + if (not $ping) + { + # TODO: Maybe try recovering the listener process? - $anvil->nice_exit({ exit_code => 3 }); + $anvil->nice_exit({ exit_code => 3 }); + } + + # Reset the interval counter + $interval = 0; } - sleep(0.1); + $interval += $step; + + sleep($step); } # The listener process should never reach here. From 8d3bd5d1cfcf1526b06942d7401024f2f1477ff1 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 18 Mar 2025 17:58:47 -0400 Subject: [PATCH 17/47] fix: add CHLD signal param to Database->add_listener --- Anvil/Tools/Database.pm | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index c62c915a7..92e81ee4f 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -203,6 +203,10 @@ If set, runs the referenced subroutine in the listener process after it is creat If set, runs the referenced subroutine when a database notification is received. +=head3 sigchld (optional) + +If set, assigns the provided value to C<>. Defaults to C<<"IGNORE">> to make the listener process start-and-forget because there's usually no need to wait for it. + =head3 uuid (optional) If set, the listener will be added to the connection of the database identified with the provided UUID. @@ -217,20 +221,22 @@ sub add_listener $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->add_listener()" } }); - my $blocking = $parameters->{blocking} // 1; - my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $blocking = $parameters->{blocking} // 1; + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; my $clone_fail_handler = $parameters->{on_clone_fail}; my $fork_child_handler = $parameters->{on_fork_child}; my $notify_handler = $parameters->{on_notify}; my $notify_name = $parameters->{name}; + my $sigchld = $parameters->{sigchld} // "IGNORE"; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { - blocking => $blocking, - db_uuid => $db_uuid, - clone_fail_handler => $clone_fail_handler, - fork_child_handler => $fork_child_handler, - notify_handler => $notify_handler, - notify_name => $notify_name, + blocking => $blocking, + db_uuid => $db_uuid, + clone_fail_handler => $clone_fail_handler, + fork_child_handler => $fork_child_handler, + notify_handler => $notify_handler, + notify_name => $notify_name, + sigchld => $sigchld, } }); if (not $notify_name) @@ -253,6 +259,10 @@ sub add_listener return (2, undef, "missing uuid"); } + # Set signals behaviours + + local $SIG{CHLD} = $sigchld; + my $fork = fork; if (not defined $fork) From e1e136c75216018742f6d75e372bca429bcbc102 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 19 Mar 2025 14:54:09 -0400 Subject: [PATCH 18/47] fix: correct listen command in Database->add_listener --- Anvil/Tools/Database.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 92e81ee4f..46884abf0 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -297,7 +297,7 @@ sub add_listener $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); } - my $listen = eval { $dbh->do("LISTEN ".$trigger); }; + my $listen = eval { $dbh->do("LISTEN ".$notify_name); }; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { listen => $listen } }); From fc4480bcf18aed0e3412bce1cf2ea7f3109fa592 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 19 Mar 2025 15:07:13 -0400 Subject: [PATCH 19/47] fix: expose db ping interval in Database->add_listener --- Anvil/Tools/Database.pm | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 46884abf0..4aea35999 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -221,13 +221,14 @@ sub add_listener $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->add_listener()" } }); - my $blocking = $parameters->{blocking} // 1; - my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; - my $clone_fail_handler = $parameters->{on_clone_fail}; - my $fork_child_handler = $parameters->{on_fork_child}; - my $notify_handler = $parameters->{on_notify}; - my $notify_name = $parameters->{name}; - my $sigchld = $parameters->{sigchld} // "IGNORE"; + my $blocking = $parameters->{blocking} // 1; + my $db_ping_interval = $parameters->{ping_interval} // 300; + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $clone_fail_handler = $parameters->{on_clone_fail}; + my $fork_child_handler = $parameters->{on_fork_child}; + my $notify_handler = $parameters->{on_notify}; + my $notify_name = $parameters->{name}; + my $sigchld = $parameters->{sigchld} // "IGNORE"; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { blocking => $blocking, @@ -306,8 +307,8 @@ sub add_listener $anvil->nice_exit({ exit_code => 2 }); } - my $step = 0.1; - my $interval = 0; + my $step = 0.1; + my $count = 0; while ($blocking) { @@ -324,7 +325,7 @@ sub add_listener $notify_handler->({ anvil => $anvil, notify => $notify }) if (ref($notify_handler) eq "CODE"); } - if ($interval >= 30) + if ($count >= $db_ping_interval) { my $ping = eval { $dbh->ping(); }; @@ -335,11 +336,11 @@ sub add_listener $anvil->nice_exit({ exit_code => 3 }); } - # Reset the interval counter - $interval = 0; + # Reset the counter + $count = 0; } - $interval += $step; + $count += $step; sleep($step); } From 537281f6e54a403174e09b8da8edf6e29d3f51ad Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 19 Mar 2025 15:23:13 -0400 Subject: [PATCH 20/47] fix(tools): correct syntax in manage listeners of access module --- tools/anvil-access-module | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 4230203b3..98e215746 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -506,7 +506,7 @@ sub manage_database_listeners close($server); handle_responder_output_setup({ anvil => $anvil, responder => $responder }); - } + }; my $notify_handler = sub { my $params = shift; From d451c5cd0b105eacda4ebf7347410ff5111ab867 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 19 Mar 2025 16:05:08 -0400 Subject: [PATCH 21/47] fix(tools): rename trigger->notify_name, register new listeners in access module --- tools/anvil-access-module | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 98e215746..ae05819f0 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -472,11 +472,11 @@ sub manage_database_listeners if ($input =~ s/^list//) { - foreach my $trigger (sort { $a cmp $b } keys %{$db_listeners}) + foreach my $notify_name (sort { $a cmp $b } keys %{$db_listeners}) { - my $pid = $db_listeners->{$trigger}; + my $pid = $db_listeners->{$notify_name}; - pstdout($trigger.":".$pid); + pstdout($notify_name.":".$pid); } return 1; @@ -495,9 +495,9 @@ sub manage_database_listeners if ($input =~ s/^add\s+(\w+)//) { - my $trigger = $1; + my $notify_name = $1; - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { trigger => $trigger } }); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { notify_name => $notify_name } }); my $clone_fail_handler = \&handle_database_clone_connection_failure; @@ -519,7 +519,7 @@ sub manage_database_listeners my ($add_listener_code, $pid, $error) = $anvil->Database->add_listener({ debug => 2, - name => $trigger, + name => $notify_name, on_clone_fail => $clone_fail_handler, on_fork_child => $fork_child_handler, on_notify => $notify_handler, @@ -532,6 +532,8 @@ sub manage_database_listeners return 0; } + $db_listeners->{$notify_name} = $pid; + emit("db-listener:".$pid."-forked"); return 1; From 9fa5858379b1faa1da5309e1ae4d43e78edafeeb Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Thu, 20 Mar 2025 17:28:57 -0400 Subject: [PATCH 22/47] fix(tools): migrate interface to Proc::Simple in access module --- tools/anvil-access-module | 382 ++++++++++++++++++++------------------ 1 file changed, 206 insertions(+), 176 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index ae05819f0..ad3266c55 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -154,6 +154,7 @@ use File::Spec::Functions; use File::Temp qw(tempdir); use IO::Socket::UNIX; use JSON; +use Proc::Simple; use Text::ParseWords; $| = 1; @@ -341,12 +342,15 @@ sub emit pstdout("event=".$_[0]); } -sub exit_responder +# +# this subroutine cleans up the fork made by fork_response_process() +# +sub exit_response_process { my $parameters = shift; # required: - my $anvil = $parameters->{anvil}; - my $responder = $parameters->{responder}; + my $anvil = $parameters->{anvil}; + my $responder = $parameters->{responder}; emit("responder:".$$."-exit"); @@ -356,15 +360,16 @@ sub exit_responder $anvil->nice_exit({ exit_code => 0 }); ############# - #### END #### responder block + #### END #### response process block ############# } -# notes: -# - this subroutine should only run in the main process -# - every line after the call to this function are a part of the responder process -# - calling this subroutine requires calling exit_responder later for clean up -sub fork_responder +# +# this subroutine should only run in the main process +# +# every line after the call to this function are a part of the responder process +# +sub fork_response_process { my $parameters = shift; # required: @@ -395,7 +400,7 @@ sub fork_responder } ############# - ### BEGIN ### responder block + ### BEGIN ### response process block ############# # close the server because the child doesn't need it @@ -644,18 +649,22 @@ sub handle_connections }) and do { pstdout($$."::manage_database_listeners ended with true"); + $responder->shutdown(SHUT_WR); + next; }; pstdout($$."::manage_database_listeners ended with false"); - fork_responder({ anvil => $anvil, responder => $responder, server => $server }) or do { - pstdout($$."::fork_responder ended with false"); + fork_response_process({ anvil => $anvil, responder => $responder, server => $server }) or do { + pstdout($$."::fork_response_process ended with false"); + + $responder->shutdown(SHUT_WR); next; }; - pstdout($$."::fork_responder ended with true"); + pstdout($$."::fork_response_process ended with true"); my @cmd_lines = split(/;;/, $request_line); @@ -690,7 +699,7 @@ sub handle_connections } } - exit_responder({ anvil => $anvil, responder => $responder }); + exit_response_process({ anvil => $anvil, responder => $responder }); } close($server); @@ -717,9 +726,9 @@ sub main $anvil->Get->switches; my $daemonize = $anvil->data->{switches}{'daemonize'} // 0; - my $script_file = $anvil->data->{switches}{'script'} // "-"; my $working_dir = $anvil->data->{switches}{'working-dir'} // cwd(); + emit("pid:".$$); emit("initialized"); $anvil->Database->connect({ auto_inactive_destroy => 1 }); @@ -761,187 +770,44 @@ sub main $anvil->nice_exit({ exit_code => 1 }); }; - # make child processes start-and-forget because we don't need to wait for them + # Make child processes start-and-forget because we don't need to wait for them local $SIG{CHLD} = "IGNORE"; if ($daemonize) { + # When running as daemon, the main process shouldn't do anything other than handling connections handle_connections({ anvil => $anvil, server => $server }); - # when running as daemon, the main process shouldn't do anything other than handling connections - return; - } - - # make 1 child to interact on stdio - my $interface_pid = fork; - - if (not defined $interface_pid) - { - pstderr("failed to fork IO interface; cause: ".$!); close($server); - $anvil->nice_exit({ exit_code => 1 }); + $anvil->nice_exit({ exit_code => 0 }); } - if ($interface_pid) - { - emit("interface:".$interface_pid."-forked"); - - handle_connections({ anvil => $anvil, server => $server }); - # after starting a child process to handle the stdin interactions, the main process should only handle connections - return; - } - - ############# - ### BEGIN ### interface block - ############# - - # close the server because the child doesn't need it - close($server); - - my $script_file_handle; - - local $SIG{INT} = sub { - emit("interface:".$$."-sigint"); - - close($script_file_handle); - - emit("interface:".$$."-exit"); - - $anvil->catch_sig({ signal => "INT" }); - }; - - local $SIG{TERM} = sub { - emit("interface:".$$."-sigterm"); - - close($script_file_handle); - - emit("interface:".$$."-exit"); - - $anvil->catch_sig({ signal => "TERM" }); - }; - - eval { - # TODO: make this script read piped input - - $script_file = "-" if ($script_file =~ /^#!SET!#$/); - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_file => $script_file } }); - - if ($script_file =~ /^-$/) - { - open($script_file_handle, "-"); - } - else - { - open($script_file_handle, "< :encoding(UTF-8)", $script_file); - } - } or do { - # open() sets $! upon error, different from the database module failure (which sets $@) - pstderr("failed to open ".$script_file." as script input; cause: ".$!); + my $interface_emitter = sub { emit("interface:".$$."-".$_[0]); }; - $anvil->nice_exit({ exit_code => 1 }); - }; + my $interface = Proc::Simple->new(); - emit("interface:".$$."-ready"); + my $forked = $interface->start(\&run_interface, { + anvil => $anvil, + emit => $interface_emitter, + server => $server, + socket_path => $socket_path, + }); - while (my $script_line = <$script_file_handle>) + if (not $forked) { - chomp($script_line); - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_line => $script_line } }); - - if ($script_line =~ /^(?:q|quit)\s*$/) - { - my $quitter = IO::Socket::UNIX->new( - Peer => $socket_path, - Type => SOCK_STREAM, - ) or do { - pstderr("failed to create QUIT connection to ".$socket_path."; cause: $@"); - - $anvil->nice_exit({ exit_code => 1 }); - }; - - print $quitter $script_line; - - $quitter->shutdown(SHUT_RDWR); - - last; - } - - my $pid = fork; # the child process starts here when spawned successfully: - - # - fork returns `undef` when it fails to spawn the child - # - fork returns the child's pid to the parent - # - fork returns `0` to the child - - if (not defined $pid) { - pstderr("failed to fork on send; cause: ".$!); - - next; - } - - if ($pid) - { - emit("requester:".$pid."-forked"); - - next; - } - - ############# - ### BEGIN ### requester block - ############# - - # the child doesn't need to process input - close($script_file_handle); - - my $requester = IO::Socket::UNIX->new( - Peer => $socket_path, - Type => SOCK_STREAM, - ) or do { - pstderr("failed to create connection to ".$socket_path."; cause: ".$@); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - }; - - print $requester $script_line; - - $requester->shutdown(SHUT_WR); - - while (my $line = <$requester>) - { - chomp($line); - - eval { - my $decoded = decode_json($line); - my $encoded = JSON->new->utf8->allow_blessed->pretty->encode($decoded); - - pstdout($encoded); - } or do { - pstdout($line); - } - } + pstderr("failed to fork IO interface; cause: ". $!); - $requester->shutdown(SHUT_RD); - - emit("requester:".$$."-exit"); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); + close($server); - ############# - #### END #### requester block - ############# + $anvil->nice_exit({ exit_code => 1 }); } - close($script_file_handle); - - emit("interface:".$$."-exit"); + $interface_emitter->("forked"); - $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); + handle_connections({ anvil => $anvil, server => $server }); - ############# - #### END #### interface block - ############# + $anvil->nice_exit({ exit_code => 0 }); } sub prettify @@ -1065,3 +931,167 @@ sub pstderr { print STDERR "error: ".$_[0]."\n" if defined $_[0]; } + +sub run_interface +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $server = $parameters->{server}; + my $socket_path = $parameters->{socket_path}; + + return (0) if (not $anvil); + return (0) if (not $server); + + my $script_file = $anvil->data->{switches}{'script'} // "-"; + + # Close the server because the child doesn't need it. + close($server); + + my $script_file_handle; + + # Setup custom signal handlers for the child process. + + local $SIG{INT} = sub { + $emit->("sigint"); + + close($script_file_handle); + + $emit->("exit"); + + $anvil->catch_sig({ signal => "INT" }); + }; + + local $SIG{TERM} = sub { + $emit->("sigterm"); + + close($script_file_handle); + + $emit->("exit"); + + $anvil->catch_sig({ signal => "TERM" }); + }; + + eval { + # TODO: make this script read piped input. + + $script_file = "-" if ($script_file =~ /^#!SET!#$/); + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_file => $script_file } }); + + if ($script_file =~ /^-$/) + { + open($script_file_handle, "-"); + } + else + { + open($script_file_handle, "< :encoding(UTF-8)", $script_file); + } + } or do { + # open() sets $! upon error, different from the database module failure (which sets $@). + pstderr("failed to open ".$script_file." as script input; cause: ".$!); + + $anvil->nice_exit({ exit_code => 1 }); + }; + + $emit->("ready"); + + while (my $script_line = <$script_file_handle>) + { + chomp($script_line); + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_line => $script_line } }); + + if ($script_line =~ /^(?:q|quit)\s*$/) + { + my $quitter = IO::Socket::UNIX->new( + Peer => $socket_path, + Type => SOCK_STREAM, + ) or do { + pstderr("failed to create QUIT connection to ".$socket_path."; cause: $@"); + + $anvil->nice_exit({ exit_code => 1 }); + }; + + print $quitter $script_line; + + $quitter->shutdown(SHUT_RDWR); + + last; + } + + local $requester_emitter = sub { emit("requester:".$$."-".$_[0]); }; + + my $requester = Proc::Simple->new(); + + $requester->start(\&run_requester, { + anvil => $anvil, + emit => $requester_emitter, + script_fh => $script_file_handle, + script_line => $script_line, + socket_path => $socket_path, + }); + + $requester_emitter->("forked"); + } + + close($script_file_handle); + + $emit->("exit"); + + $anvil->nice_exit({ exit_code => 0 }); +} + +sub run_requester +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $script_fh = $parameters->{script_fh}; + my $script_line = $parameters->{script_line}; + my $socket_path = $parameters->{socket_path}; + + return (0) if (not $anvil); + return (0) if (not $script_fh); + + # The child doesn't need to process input. + close($script_fh); + + my $requester = IO::Socket::UNIX->new( + Peer => $socket_path, + Type => SOCK_STREAM, + ) or do { + pstderr("failed to create connection to ".$socket_path."; cause: ".$@); + + # We're not using the database connection(s) here; it's not cloned. + # The cache is pointing to the main process's connection(s); don't close it. + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + }; + + print $requester $script_line; + + $requester->shutdown(SHUT_WR); + + while (my $line = <$requester>) + { + chomp($line); + + eval { + my $decoded = decode_json($line); + my $encoded = JSON->new->utf8->allow_blessed->pretty->encode($decoded); + + pstdout($encoded); + } or do { + pstdout($line); + } + } + + $requester->shutdown(SHUT_RD); + + $emit->("exit"); + + # Same; don't close because the cache is pointing to the original connection(s). + $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); +} \ No newline at end of file From e0bb7b1fa035c0b3644ebff272efd7f9d22642e7 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 21 Mar 2025 18:32:16 -0400 Subject: [PATCH 23/47] fix: migrate to Proc::Simple in database listener management --- Anvil/Tools/Database.pm | 345 ++++++++++++++++++++++++++-------------- 1 file changed, 226 insertions(+), 119 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 4aea35999..f2076eca4 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -5,9 +5,10 @@ package Anvil::Tools::Database; use strict; use warnings; +use Data::Dumper; use DBI; use Scalar::Util qw(weaken isweak); -use Data::Dumper; +use Proc::Simple; use Text::Diff; use Time::HiRes qw(gettimeofday tv_interval); use XML::LibXML; @@ -103,7 +104,9 @@ my $THIS_FILE = "Database.pm"; # read # read_variable # refresh_timestamp +# remove_listener # resync_databases +# run_listener # shutdown # track_files # update_host_status @@ -148,7 +151,9 @@ sub new { my $class = shift; my $self = {}; - + + $self->{listeners} = {}; + bless $self, $class; return ($self); @@ -177,6 +182,7 @@ sub parent # Public methods # ############################################################################################################# + =head2 add_listener This method adds 1 listener to the primary database. The listener runs when the primary database executes a pg_notify with the matching name. @@ -191,11 +197,11 @@ If set, the listener process will listen to database notifications indefinitely. The name of the events to listen for. This must match with the name provided to 1 pg_notify call in procedures called by database triggers. -=head3 on_clone_fail (optional) +=head3 on_failed_to_clone (optional) If set, runs the referenced subroutine when the call to clone_connection() fails with a non-zero code. -=head3 on_fork_child (optional) +=head3 on_begin_child (optional) If set, runs the referenced subroutine in the listener process after it is created. @@ -203,10 +209,6 @@ If set, runs the referenced subroutine in the listener process after it is creat If set, runs the referenced subroutine when a database notification is received. -=head3 sigchld (optional) - -If set, assigns the provided value to C<>. Defaults to C<<"IGNORE">> to make the listener process start-and-forget because there's usually no need to wait for it. - =head3 uuid (optional) If set, the listener will be added to the connection of the database identified with the provided UUID. @@ -221,136 +223,36 @@ sub add_listener $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->add_listener()" } }); - my $blocking = $parameters->{blocking} // 1; - my $db_ping_interval = $parameters->{ping_interval} // 300; - my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; - my $clone_fail_handler = $parameters->{on_clone_fail}; - my $fork_child_handler = $parameters->{on_fork_child}; - my $notify_handler = $parameters->{on_notify}; - my $notify_name = $parameters->{name}; - my $sigchld = $parameters->{sigchld} // "IGNORE"; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { - blocking => $blocking, - db_uuid => $db_uuid, - clone_fail_handler => $clone_fail_handler, - fork_child_handler => $fork_child_handler, - notify_handler => $notify_handler, - notify_name => $notify_name, - sigchld => $sigchld, - } }); + my $notify_name = $parameters->{name}; if (not $notify_name) { - $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0020", variables => { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { method => "Database->add_listener()", parameter => "name", + value => $notify_name, } }); return (1, undef, "missing name"); } - if (not $db_uuid) - { - $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0020", variables => { - method => "Database->add_listener()", - parameter => "uuid", - } }); - - return (2, undef, "missing uuid"); - } + my $listener = Proc::Simple->new(); - # Set signals behaviours + my $forked = $listener->start(\&run_listener, $self, $parameters); - local $SIG{CHLD} = $sigchld; - - my $fork = fork; - - if (not defined $fork) + if (not $forked) { - return (3, undef, $!); + return (2, undef, $!); } - if ($fork) + if (not exists $self->{listeners}{$notify_name}) { - # The parent must record the listener process PID for removal at a later point. - - return (0, $fork); + $self->{listeners}{$notify_name} = {}; } - ############# - ### BEGIN ### child block - ############# - - $fork_child_handler->({ anvil => $anvil }) if (ref($fork_child_handler) eq "CODE"); + $self->{listeners}{$notify_name}{$listener->pid} = $listener; - my ($clone_connection_code, $dbh) = $self->clone_connection({ uuid => $db_uuid }); - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { - clone_connection_code => $clone_connection_code, - dbh => $dbh, - } }); - - if (not $dbh) - { - $clone_fail_handler->({ anvil => $anvil }) if (ref($clone_fail_handler) eq "CODE"); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); - } - - my $listen = eval { $dbh->do("LISTEN ".$notify_name); }; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { listen => $listen } }); - - if (not $listen) - { - $anvil->nice_exit({ exit_code => 2 }); - } - - my $step = 0.1; - my $count = 0; - - while ($blocking) - { - while (my $notify = eval { $dbh->pg_notifies(); }) - { - my ($name, $pid, $payload) = @$notify; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { - name => $name, - pid => $pid, - payload => $payload, - }, prefix => "pg_notify" }); - - $notify_handler->({ anvil => $anvil, notify => $notify }) if (ref($notify_handler) eq "CODE"); - } - - if ($count >= $db_ping_interval) - { - my $ping = eval { $dbh->ping(); }; - - if (not $ping) - { - # TODO: Maybe try recovering the listener process? - - $anvil->nice_exit({ exit_code => 3 }); - } - - # Reset the counter - $count = 0; - } - - $count += $step; - - sleep($step); - } - - # The listener process should never reach here. - $anvil->nice_exit({ exit_code => 4 }); - - ############# - #### END #### child block - ############# + return (0, $listener); } @@ -19164,6 +19066,75 @@ sub refresh_timestamp } +=head2 remove_listener + +This method removes listener(s) registered with add_listener. + +Parameters; + +=head3 name (required) + +This is used to match the listeners registered under the given pg_notify name. + +=head3 pid (optional) + +If set, only the listener with the given PID will be killed, instead of all listeners registered under the given pg_notify name. + +=cut +sub remove_listener +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->remove_listener()" } }); + + my $listener_pid = $parameters->{pid}; + my $notify_name = $parameters->{name}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + listener_pid => $listener_pid, + notify_name => $notify_name, + } }); + + if (not $notify_name) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->remove_listener()", + parameter => "name", + value => $notify_name, + } }); + + return (1); + } + + my @listeners; + + if ((defined $listener_pid) and ($listener_pid =~ /^\d+$/)) + { + # Get only the listener with the given PID. + @listeners = ($self->{listeners}{$notify_name}{$listener_pid}); + } + else + { + # Include all listeners under the notify name to the scope. + @listeners = values(%{$self->{listeners}{$notify_name}}); + } + + foreach my $listener (@listeners) + { + # Forget the to-be killed listener. + delete $self->{listeners}{$notify_name}{$listener->pid}; + + # Make certain the listener is dead and stays dead. + $listener->kill() or $listener->kill("SIGKILL"); + } + + return (0); +} + + =head2 resync_databases This will resync the database data on this and peer database(s) if needed. It takes no arguments and will immediately return unless C<< sys::database::resync_needed >> was set. @@ -19772,6 +19743,142 @@ sub resync_databases } +=head2 run_listener + +This method is an indefinite loop that listens for notify calls from the primary database. + +The parameters are exactly the same as the ones used in add_listener. + +=cut +sub run_listener +{ + my $self = shift; + my $parameters = shift; + my $anvil = $self->parent; + my $debug = $parameters->{debug} // 3; + + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->run_listener()" } }); + + my $blocking = $parameters->{blocking} // 1; + my $db_ping_interval = $parameters->{ping_interval} // 300; + my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $notify_name = $parameters->{name}; + my $on_begin_child = $parameters->{on_begin_child}; + my $on_failed_to_clone = $parameters->{on_failed_to_clone}; + my $on_notify = $parameters->{on_notify}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + blocking => $blocking, + db_uuid => $db_uuid, + notify_name => $notify_name, + on_begin_child => $on_begin_child, + on_failed_to_clone => $on_failed_to_clone, + on_notify => $on_notify, + } }); + + # Don't disconnect on exit until we've cloned the (probably primary) database connection. + + if (not $notify_name) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->run_listener()", + parameter => "name", + value => $notify_name, + } }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); + } + + if ($db_ping_interval !~ /^\d+$/) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->run_listener()", + parameter => "ping_interval", + value => $db_ping_interval, + } }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 2 }); + } + + if ((not $db_uuid) or (not exists $anvil->data->{database}{$db_uuid})) + { + $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { + method => "Database->run_listener()", + parameter => "uuid", + value => $db_uuid, + } }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 3 }); + } + + $on_begin_child->({ anvil => $anvil }) if (ref($on_begin_child) eq "CODE"); + + my ($clone_connection_code, $dbh) = $self->clone_connection({ uuid => $db_uuid }); + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + clone_connection_code => $clone_connection_code, + dbh => $dbh, + } }); + + if (not $dbh) + { + $on_failed_to_clone->({ anvil => $anvil }) if (ref($on_failed_to_clone) eq "CODE"); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 4 }); + } + + # We've cloned the database connection, remember to disconnect on exit after this point. + + my $listen = eval { $dbh->do("LISTEN ".$notify_name); }; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { listen => $listen } }); + + if (not $listen) + { + $anvil->nice_exit({ exit_code => 5 }); + } + + my $step = 0.1; + my $count = 0; + + while ($blocking) + { + while (my $notify = eval { $dbh->pg_notifies(); }) + { + my ($name, $pid, $payload) = @$notify; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + name => $name, + pid => $pid, + payload => $payload, + }, prefix => "pg_notify" }); + + $on_notify->({ anvil => $anvil, notify => $notify }) if (ref($on_notify) eq "CODE"); + } + + if ($count >= $db_ping_interval) + { + my $ping = eval { $dbh->ping(); }; + + if (not $ping) + { + $anvil->nice_exit({ exit_code => 6 }); + } + + # Reset the counter + $count = 0; + } + + $count += $step; + + sleep($step); + } + + # The listener process should never reach here. + $anvil->nice_exit({ exit_code => 7 }); +} + + =head2 shutdown This gracefully shuts down the local database, waiting for active connections to exit before doing so. This call only works on a Striker dashboard. It creates a dump file of the database as part of the shutdown. It always returns C<< 0 >>. From 9c626e7c8706900432baed12ed77fc6579dd3658 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 21 Mar 2025 20:15:03 -0400 Subject: [PATCH 24/47] fix(tools): migrate all forks to Proc::Simple in access module --- tools/anvil-access-module | 564 +++++++++++++++++--------------------- 1 file changed, 248 insertions(+), 316 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index ad3266c55..2e3ccab02 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -263,40 +263,6 @@ sub access_chain return (@results); } -sub check_database_handles -{ - my $parameters = shift; - my $anvil = $parameters->{anvil}; - - my $success = 0; - - foreach my $uuid (sort { $a cmp $b } keys %{$anvil->data->{database}}) - { - if (not exists $anvil->data->{cache}{database_handle}{$uuid}) - { - next; - } - - my $dbh = $anvil->data->{cache}{database_handle}{$uuid}; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { - database_uuid => $uuid, - dbh_local => $dbh, - } }); - - if ((not $dbh) or (not $dbh->ping)) - { - next; - } - - $success += 1; - } - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { success => $success }, prefix => "check_database_handles" }); - - return $success; -} - sub db_access { my $parameters = shift; @@ -342,86 +308,6 @@ sub emit pstdout("event=".$_[0]); } -# -# this subroutine cleans up the fork made by fork_response_process() -# -sub exit_response_process -{ - my $parameters = shift; - # required: - my $anvil = $parameters->{anvil}; - my $responder = $parameters->{responder}; - - emit("responder:".$$."-exit"); - - # responder is done writing - $responder->shutdown(SHUT_WR); - - $anvil->nice_exit({ exit_code => 0 }); - - ############# - #### END #### response process block - ############# -} - -# -# this subroutine should only run in the main process -# -# every line after the call to this function are a part of the responder process -# -sub fork_response_process -{ - my $parameters = shift; - # required: - my $anvil = $parameters->{anvil}; - my $responder = $parameters->{responder}; - my $server = $parameters->{server}; - - check_database_handles({ anvil => $anvil }) or do { - pstderr("no usable database handle"); - - return 0; - }; - - my $pid = fork; - - if (not defined $pid) - { - pstderr("failed to fork on receive; cause: ".$!); - - return 0; - } - - if ($pid) - { - emit("responder:".$pid."-forked"); - - return 0; - } - - ############# - ### BEGIN ### response process block - ############# - - # close the server because the child doesn't need it - close($server); - - handle_responder_output_setup({ anvil => $anvil, responder => $responder }); - - # clone the database handle for this child to avoid interfering with another process that needs to use the database - # - # the database handle variable only holds a reference to the parent's database handle before clone completes, therefore we shouldn't call disconnect unless clone succeeds - - my ($clone_connection_code, $clone) = $anvil->Database->clone_connection({ debug => 2 }); - - if (not $clone) - { - handle_database_clone_connection_failure({ anvil => $anvil, responder => $responder }); - } - - return 1; -} - sub get_scmd_args { my $parameters = shift; @@ -454,101 +340,10 @@ sub get_scmd_args return $args; } -# this subroutine should only run in the main process -sub manage_database_listeners -{ - my $parameters = shift; - # required: - my $anvil = $parameters->{anvil}; - my $db_listeners = $parameters->{db_listeners}; - my $input = $parameters->{input}; - my $responder = $parameters->{responder}; - my $server = $parameters->{server}; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input }, prefix => "manage_database_listeners" }); - - # ignore because it's not a database listener action - if (not ($input =~ s/^\s*dbl:://)) - { - return 0; - } - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input } }); - - if ($input =~ s/^list//) - { - foreach my $notify_name (sort { $a cmp $b } keys %{$db_listeners}) - { - my $pid = $db_listeners->{$notify_name}; - - pstdout($notify_name.":".$pid); - } - - return 1; - } - - if ($input =~ s/^remove\s+(\d+)//) - { - my $pid = $1; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { pid => $pid } }); - - kill('TERM', $pid); - - return 1; - } - - if ($input =~ s/^add\s+(\w+)//) - { - my $notify_name = $1; - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { notify_name => $notify_name } }); - - my $clone_fail_handler = \&handle_database_clone_connection_failure; - - my $fork_child_handler = sub { - # close the server because the child doesn't need it - close($server); - - handle_responder_output_setup({ anvil => $anvil, responder => $responder }); - }; - - my $notify_handler = sub { - my $params = shift; - my $notify = $params->{notify}; - - my ($n_name, $n_pid, $n_payload) = @$notify; - - emit($n_name.":".$n_pid.":".$n_payload); - }; - - my ($add_listener_code, $pid, $error) = $anvil->Database->add_listener({ - debug => 2, - name => $notify_name, - on_clone_fail => $clone_fail_handler, - on_fork_child => $fork_child_handler, - on_notify => $notify_handler, - }); - - if ($add_listener_code != 0) - { - pstdout("failed to fork on receive; cause: ".$error); - - return 0; - } - - $db_listeners->{$notify_name} = $pid; - - emit("db-listener:".$pid."-forked"); - - return 1; - } -} - -sub handle_database_clone_connection_failure +sub handle_clone_database_connection_failure { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $responder = $parameters->{responder}; @@ -562,16 +357,16 @@ sub handle_database_clone_connection_failure sub handle_responder_output_setup { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $responder = $parameters->{responder}; - # disconnect from the parent's outputs to avoid interference + # Disconnect from the parent's outputs to avoid interference close(STDOUT); close(STDERR); - # redirect outputs to the responder for transport + # Redirect outputs to the responder for transport open(STDOUT, ">&", $responder) or do { print $responder "failed to open STDOUT; cause: ".$!."\n"; @@ -593,32 +388,10 @@ sub handle_responder_output_setup sub handle_connections { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $server = $parameters->{server}; - my $database_listeners = {}; - - local $SIG{INT} = sub { - emit("sigint"); - - close($server); - - emit("exit"); - - $anvil->catch_sig({ signal => "INT" }); - }; - - local $SIG{TERM} = sub { - emit("sigterm"); - - close($server); - - emit("exit"); - - $anvil->catch_sig({ signal => "TERM" }); - }; - emit("listening"); while (my $responder = $server->accept() or do { @@ -633,19 +406,26 @@ sub handle_connections chomp($request_line); - # responder is done reading + # Responder is done reading $responder->shutdown(SHUT_RD); $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { request_line => $request_line } }); - last if ($request_line =~ /^(?:q|quit)\s*$/); + if ($request_line =~ /^(?:q|quit)\s*$/) + { + $responder->shutdown(SHUT_WR); + + last; + } + + my $emit_listener_event = sub { emit("dbl:".($_[1] // $$)."-".$_[0]); }; manage_database_listeners({ - anvil => $anvil, - db_listeners => $database_listeners, - input => $request_line, - responder => $responder, - server => $server, + anvil => $anvil, + emit => $emit_listener_event, + input => $request_line, + responder => $responder, + server => $server, }) and do { pstdout($$."::manage_database_listeners ended with true"); @@ -656,57 +436,28 @@ sub handle_connections pstdout($$."::manage_database_listeners ended with false"); - fork_response_process({ anvil => $anvil, responder => $responder, server => $server }) or do { - pstdout($$."::fork_response_process ended with false"); + my $emit_responder_event = sub { emit("responder:".($_[1] // $$)."-".$_[0]); }; + + my $response_process = Proc::Simple->new(); + + $response_process->start(\&run_responder, { + anvil => $anvil, + emit => $emit_responder_event, + line => $request_line, + responder => $responder, + server => $server, + }) or do { + pstderr("failed to fork on receive; cause: ".$!); $responder->shutdown(SHUT_WR); next; }; - pstdout($$."::fork_response_process ended with true"); - - my @cmd_lines = split(/;;/, $request_line); - - foreach my $cmd_line (@cmd_lines) - { - $cmd_line =~ s/^\s+//; - $cmd_line =~ s/\s+$//; - - my $cmd_line_id; - - if ($cmd_line =~ s/^([[:xdigit:]]{8}-(?:[[:xdigit:]]{4}-){3}[[:xdigit:]]{12})\s+//) - { - $cmd_line_id = $1; - } - - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { - cmd_line => $cmd_line, - cmd_line_id => $cmd_line_id, - } }); - - if ($cmd_line =~ /^$scmd_db_read\s+/) - { - process_scmd_db({ anvil => $anvil, cmd => $scmd_db_read, input => $cmd_line, lid => $cmd_line_id }); - } - elsif ($cmd_line =~ /^$scmd_db_write\s+/) - { - process_scmd_db({ anvil => $anvil, cmd => $scmd_db_write, input => $cmd_line, lid => $cmd_line_id }); - } - elsif ($cmd_line =~ /^$scmd_execute\s+/) - { - process_scmd_execute({ anvil => $anvil, input => $cmd_line, lid => $cmd_line_id }); - } - } - - exit_response_process({ anvil => $anvil, responder => $responder }); + $emit_responder_event->("forked", $response_process->pid); } - close($server); - - emit("exit"); - - $anvil->nice_exit({ exit_code => 0 }); + return 1; } sub is_array @@ -770,46 +521,156 @@ sub main $anvil->nice_exit({ exit_code => 1 }); }; - # Make child processes start-and-forget because we don't need to wait for them - local $SIG{CHLD} = "IGNORE"; - - if ($daemonize) + if (not $daemonize) { - # When running as daemon, the main process shouldn't do anything other than handling connections - handle_connections({ anvil => $anvil, server => $server }); + my $emit_interface_event = sub { emit("interface:".$$."-".$_[0]); }; - close($server); + my $interface_process = Proc::Simple->new(); + + $interface_process->start(\&run_interface, { + anvil => $anvil, + emit => $emit_interface_event, + server => $server, + socket_path => $socket_path, + }) or do { + pstderr("failed to fork IO interface; cause: ". $!); - $anvil->nice_exit({ exit_code => 0 }); + close($server); + + $anvil->nice_exit({ exit_code => 1 }); + }; + + $emit_interface_event->("forked"); } - my $interface_emitter = sub { emit("interface:".$$."-".$_[0]); }; + local $SIG{INT} = sub { + emit("sigint"); - my $interface = Proc::Simple->new(); + close($server); - my $forked = $interface->start(\&run_interface, { - anvil => $anvil, - emit => $interface_emitter, - server => $server, - socket_path => $socket_path, - }); + emit("exit"); - if (not $forked) - { - pstderr("failed to fork IO interface; cause: ". $!); + $anvil->catch_sig({ signal => "INT" }); + }; + + local $SIG{TERM} = sub { + emit("sigterm"); close($server); - $anvil->nice_exit({ exit_code => 1 }); - } + emit("exit"); - $interface_emitter->("forked"); + $anvil->catch_sig({ signal => "TERM" }); + }; + # The main process shouldn't do anything other than handling connections. handle_connections({ anvil => $anvil, server => $server }); + close($server); + + emit("exit"); + $anvil->nice_exit({ exit_code => 0 }); } +sub manage_database_listeners +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $input = $parameters->{input}; + my $responder = $parameters->{responder}; + my $server = $parameters->{server}; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input }, prefix => "manage_database_listeners" }); + + chomp($input); + + # Ignore because it's not a database listener action + if (not ($input =~ s/^dbl:://)) + { + return 0; + } + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { input => $input } }); + + if ($input =~ s/^list//) + { + my $listeners = $anvil->Database->listeners; + + foreach my $name (sort { $a cmp $b } keys %{$listeners}) + { + foreach my $pid (sort { $a cmp $b } keys %{$listeners->{$name}}) + { + pstdout($name.":".$pid); + } + } + + return 1; + } + + if ($input =~ s/^remove\s+(\w+)//) + { + my $name = $1; + + chomp($input); + + my $pid; + + if ($input =~ s/^(\d+)//) + { + $pid = $2; + } + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name, pid => $pid } }); + + $anvil->Database->remove_listener({ name => $name, pid => $pid }); + + return 1; + } + + if ($input =~ s/^add\s+(\w+)//) + { + my $name = $1; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name } }); + + + + my ($add_listener_code, $pid, $error) = $anvil->Database->add_listener({ + debug => 2, + name => $name, + on_failed_to_clone => \&handle_clone_database_connection_failure, + on_begin_child => sub { + # Close the server because the child doesn't need it + close($server); + + handle_responder_output_setup({ anvil => $anvil, responder => $responder }); + }, + on_notify => sub { + my $parameters = shift; + my $notify = $parameters->{notify}; + + my ($name, $pid, $payload) = @$notify; + + pstdout($name.":".$pid.":".$payload); + }, + }); + + if ($add_listener_code != 0) + { + pstdout("failed to fork on receive; cause: ".$error); + + return 0; + } + + $emit->("forked"); + + return 1; + } +} + sub prettify { my $var_value = shift; @@ -825,11 +686,11 @@ sub prettify sub process_scmd_db { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $cmd = $parameters->{cmd}; my $input = $parameters->{input}; - # optional: + # Optional: my $lid = $parameters->{lid} // ""; my $mode; @@ -874,10 +735,10 @@ sub process_scmd_db sub process_scmd_execute { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $input = $parameters->{input}; - # optional: + # Optional: my $lid = $parameters->{lid} // ""; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { @@ -1021,19 +882,23 @@ sub run_interface last; } - local $requester_emitter = sub { emit("requester:".$$."-".$_[0]); }; + local $emit_requester_event = sub { emit("requester:".($_[1] // $$)."-".$_[0]); }; - my $requester = Proc::Simple->new(); + my $request_process = Proc::Simple->new(); - $requester->start(\&run_requester, { + $request_process->start(\&run_requester, { anvil => $anvil, - emit => $requester_emitter, + emit => $emit_requester_event, script_fh => $script_file_handle, script_line => $script_line, socket_path => $socket_path, - }); + }) or do { + pstderr("failed to fork requester; cause: ". $!); + + next; + }; - $requester_emitter->("forked"); + $emit_requester_event->("forked", $request_process->pid); } close($script_file_handle); @@ -1094,4 +959,71 @@ sub run_requester # Same; don't close because the cache is pointing to the original connection(s). $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); -} \ No newline at end of file +} + +sub run_responder +{ + my $parameters = shift; + # Required: + my $anvil = $parameters->{anvil}; + my $emit = $parameters->{emit}; + my $line = $parameters->{line}; + my $responder = $parameters->{responder}; + my $server = $parameters->{server}; + + # Close the server because the child doesn't need it + close($server); + + handle_responder_output_setup({ anvil => $anvil, responder => $responder }); + + # Clone the database handle for this child to avoid interfering with another process that needs to use the database + # + # The database handle variable only holds a reference to the parent's database handle before clone completes, therefore we shouldn't call disconnect unless clone succeeds + + my ($clone_connection_code, $clone) = $anvil->Database->clone_connection({ debug => 2 }); + + if (not $clone) + { + handle_clone_database_connection_failure({ anvil => $anvil, responder => $responder }); + } + + my @cmd_lines = split(/;;/, $line); + + foreach my $cmd_line (@cmd_lines) + { + $cmd_line =~ s/^\s+//; + $cmd_line =~ s/\s+$//; + + my $cmd_line_id; + + if ($cmd_line =~ s/^([[:xdigit:]]{8}-(?:[[:xdigit:]]{4}-){3}[[:xdigit:]]{12})\s+//) + { + $cmd_line_id = $1; + } + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + cmd_line => $cmd_line, + cmd_line_id => $cmd_line_id, + } }); + + if ($cmd_line =~ /^$scmd_db_read\s+/) + { + process_scmd_db({ anvil => $anvil, cmd => $scmd_db_read, input => $cmd_line, lid => $cmd_line_id }); + } + elsif ($cmd_line =~ /^$scmd_db_write\s+/) + { + process_scmd_db({ anvil => $anvil, cmd => $scmd_db_write, input => $cmd_line, lid => $cmd_line_id }); + } + elsif ($cmd_line =~ /^$scmd_execute\s+/) + { + process_scmd_execute({ anvil => $anvil, input => $cmd_line, lid => $cmd_line_id }); + } + } + + $emit->("exit"); + + # Responder is done writing + $responder->shutdown(SHUT_WR); + + $anvil->nice_exit({ exit_code => 0 }); +} From 2bff599cab2b9b3c4be6485ee152a0e315b1b156 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 21 Mar 2025 20:18:25 -0400 Subject: [PATCH 25/47] fix(tools): correct emit requester event fn var scope in access module --- tools/anvil-access-module | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 2e3ccab02..35bb63ad5 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -882,7 +882,7 @@ sub run_interface last; } - local $emit_requester_event = sub { emit("requester:".($_[1] // $$)."-".$_[0]); }; + my $emit_requester_event = sub { emit("requester:".($_[1] // $$)."-".$_[0]); }; my $request_process = Proc::Simple->new(); From 6ed254a5bb55d4e16f5a56238564cc28675bdd4c Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 21 Mar 2025 20:26:25 -0400 Subject: [PATCH 26/47] fix(tools): correct forked events in access module --- tools/anvil-access-module | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 35bb63ad5..467c755bc 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -523,7 +523,7 @@ sub main if (not $daemonize) { - my $emit_interface_event = sub { emit("interface:".$$."-".$_[0]); }; + my $emit_interface_event = sub { emit("interface:".($_[1] // $$)."-".$_[0]); }; my $interface_process = Proc::Simple->new(); @@ -540,7 +540,7 @@ sub main $anvil->nice_exit({ exit_code => 1 }); }; - $emit_interface_event->("forked"); + $emit_interface_event->("forked", $interface_process->pid); } local $SIG{INT} = sub { @@ -636,8 +636,6 @@ sub manage_database_listeners $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name } }); - - my ($add_listener_code, $pid, $error) = $anvil->Database->add_listener({ debug => 2, name => $name, @@ -665,7 +663,7 @@ sub manage_database_listeners return 0; } - $emit->("forked"); + $emit->("forked", $pid); return 1; } From b0eb3570206ed53eb6d0dc0876c47792ae781495 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 21 Mar 2025 20:51:39 -0400 Subject: [PATCH 27/47] fix(tools): correct list database listeners in access module --- tools/anvil-access-module | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 467c755bc..3decfce01 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -597,7 +597,7 @@ sub manage_database_listeners if ($input =~ s/^list//) { - my $listeners = $anvil->Database->listeners; + my $listeners = $anvil->Database->{listeners}; foreach my $name (sort { $a cmp $b } keys %{$listeners}) { From a0da9c966b5de200024166cde327b545655d73fc Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Sat, 22 Mar 2025 01:36:41 -0400 Subject: [PATCH 28/47] fix(tools): prevent SIGCHLD from crashing on IO:Socket's accept() --- tools/anvil-access-module | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 3decfce01..86a3a23c7 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -177,10 +177,10 @@ main(); sub access_chain { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $chain_str = $parameters->{chain}; - # optional: + # Optional: my $chain_args = $parameters->{chain_args} // []; my @chain = split(/->|[.]/, $chain_str); @@ -266,10 +266,10 @@ sub access_chain sub db_access { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $sql = $parameters->{sql}; - # optional: + # Optional: my $mode = $parameters->{db_access_mode} // ""; my $uuid = $parameters->{db_uuid}; @@ -311,11 +311,11 @@ sub emit sub get_scmd_args { my $parameters = shift; - # required: + # Required: my $anvil = $parameters->{anvil}; my $input = $parameters->{input}; my $get_values = $parameters->{get_values}; - # optional: + # Optional: my $cmd = $parameters->{cmd}; my $arg_names = $parameters->{names} // []; @@ -394,7 +394,17 @@ sub handle_connections emit("listening"); + my $emit_listener_event = sub { emit("dbl:".($_[1] // $$)."-".$_[0]); }; + my $emit_responder_event = sub { emit("responder:".($_[1] // $$)."-".$_[0]); }; + while (my $responder = $server->accept() or do { + # + # Ignore interrupt caused by auto SIGCHLD in the reaper of Proc::Simple. + # + # accept() will fail with $! set to "Interrupted system call" without ignoring the interrupt caused by SIGCHLD. + # + next if $!{EINTR}; + pstderr("failed to accept connections; cause: ".$!); close($server); @@ -404,11 +414,11 @@ sub handle_connections { my $request_line = <$responder>; - chomp($request_line); - # Responder is done reading $responder->shutdown(SHUT_RD); + chomp($request_line); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { request_line => $request_line } }); if ($request_line =~ /^(?:q|quit)\s*$/) @@ -418,8 +428,6 @@ sub handle_connections last; } - my $emit_listener_event = sub { emit("dbl:".($_[1] // $$)."-".$_[0]); }; - manage_database_listeners({ anvil => $anvil, emit => $emit_listener_event, @@ -436,8 +444,6 @@ sub handle_connections pstdout($$."::manage_database_listeners ended with false"); - my $emit_responder_event = sub { emit("responder:".($_[1] // $$)."-".$_[0]); }; - my $response_process = Proc::Simple->new(); $response_process->start(\&run_responder, { @@ -800,9 +806,6 @@ sub run_interface my $server = $parameters->{server}; my $socket_path = $parameters->{socket_path}; - return (0) if (not $anvil); - return (0) if (not $server); - my $script_file = $anvil->data->{switches}{'script'} // "-"; # Close the server because the child doesn't need it. @@ -916,9 +919,6 @@ sub run_requester my $script_line = $parameters->{script_line}; my $socket_path = $parameters->{socket_path}; - return (0) if (not $anvil); - return (0) if (not $script_fh); - # The child doesn't need to process input. close($script_fh); From 973c26a4f6ad988102823555f9b023b67f2d2ef6 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Sat, 22 Mar 2025 01:49:46 -0400 Subject: [PATCH 29/47] fix(tools): correct DB listener forked event in access module --- tools/anvil-access-module | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 86a3a23c7..8cc61df00 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -642,7 +642,7 @@ sub manage_database_listeners $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name } }); - my ($add_listener_code, $pid, $error) = $anvil->Database->add_listener({ + my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ debug => 2, name => $name, on_failed_to_clone => \&handle_clone_database_connection_failure, @@ -669,7 +669,7 @@ sub manage_database_listeners return 0; } - $emit->("forked", $pid); + $emit->("forked", $listener->pid); return 1; } From fc8b0ccb538317119923754ba9d39c2ac1d236f6 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Mon, 24 Mar 2025 17:58:29 -0400 Subject: [PATCH 30/47] fix(tools): edit comment, reuse var for script file in access module --- tools/anvil-access-module | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 8cc61df00..5db8468e5 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -842,9 +842,9 @@ sub run_interface $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { script_file => $script_file } }); - if ($script_file =~ /^-$/) + if ($script_file eq "-") { - open($script_file_handle, "-"); + open($script_file_handle, $script_file); } else { @@ -955,7 +955,7 @@ sub run_requester $emit->("exit"); - # Same; don't close because the cache is pointing to the original connection(s). + # Same; don't disconnect because the cache is pointing to the original connection(s). $anvil->nice_exit({ db_disconnect => 0, exit_code => 0 }); } From 052934fa5335e04f50ec5e6534f20a81b5645052 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 25 Mar 2025 00:33:08 -0400 Subject: [PATCH 31/47] fix: lower ping frequency, add failed ping hook in Database->run_listener --- Anvil/Tools/Database.pm | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index f2076eca4..0e9a1d804 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -19760,11 +19760,12 @@ sub run_listener $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->run_listener()" } }); my $blocking = $parameters->{blocking} // 1; - my $db_ping_interval = $parameters->{ping_interval} // 300; + my $db_ping_interval = $parameters->{ping_interval} // 1500; my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; my $notify_name = $parameters->{name}; my $on_begin_child = $parameters->{on_begin_child}; my $on_failed_to_clone = $parameters->{on_failed_to_clone}; + my $on_failed_to_ping = $parameters->{on_failed_to_ping}; my $on_notify = $parameters->{on_notify}; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { @@ -19838,7 +19839,7 @@ sub run_listener $anvil->nice_exit({ exit_code => 5 }); } - my $step = 0.1; + my $step = 0.2; my $count = 0; while ($blocking) @@ -19860,8 +19861,12 @@ sub run_listener { my $ping = eval { $dbh->ping(); }; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { ping => $ping } }); + if (not $ping) { + $on_failed_to_ping->({ anvil => $anvil }) if (ref($on_failed_to_ping) eq "CODE"); + $anvil->nice_exit({ exit_code => 6 }); } From 93f69b408eda94d3b53c89c27ed01373dcebeb6e Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 25 Mar 2025 00:34:41 -0400 Subject: [PATCH 32/47] fix(tools): allow listener to write to responder by keeping it alive in access module --- tools/anvil-access-module | 64 ++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 5db8468e5..783231d01 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -347,11 +347,9 @@ sub handle_clone_database_connection_failure my $anvil = $parameters->{anvil}; my $responder = $parameters->{responder}; - print $responder "failed to clone primary database handle\n"; + pstderr("failed to clone primary database handle"); $responder->shutdown(SHUT_RDWR); - - $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); } sub handle_responder_output_setup @@ -435,15 +433,11 @@ sub handle_connections responder => $responder, server => $server, }) and do { - pstdout($$."::manage_database_listeners ended with true"); - - $responder->shutdown(SHUT_WR); + # Don't close the responder because it's being used by the listener to send data. next; }; - pstdout($$."::manage_database_listeners ended with false"); - my $response_process = Proc::Simple->new(); $response_process->start(\&run_responder, { @@ -640,26 +634,46 @@ sub manage_database_listeners { my $name = $1; - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name } }); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name } }); - my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ - debug => 2, - name => $name, - on_failed_to_clone => \&handle_clone_database_connection_failure, - on_begin_child => sub { - # Close the server because the child doesn't need it - close($server); + my $handle_begin_child = sub { + # Close the server because the child doesn't need it + close($server); + + handle_responder_output_setup({ anvil => $anvil, responder => $responder }) + }; - handle_responder_output_setup({ anvil => $anvil, responder => $responder }); - }, - on_notify => sub { - my $parameters = shift; - my $notify = $parameters->{notify}; + my $handle_failed_to_clone = sub { + handle_clone_database_connection_failure({ anvil => $anvil, responder => $responder }); + }; - my ($name, $pid, $payload) = @$notify; + my $handle_failed_to_ping = sub { + pstderr("failed to ping database"); - pstdout($name.":".$pid.":".$payload); - }, + $responder->shutdown(SHUT_RDWR); + }; + + my $handle_notify = sub { + my $parameters = shift; + my $notify = $parameters->{notify}; + + my ($name, $pid, $payload) = @$notify; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + name => $name, + pid => $pid, + payload => $payload, + }, prefix => "handle_notify" }); + + pstdout($name.":".$pid.":".$payload); + }; + + my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ + debug => 2, + name => $name, + on_begin_child => $handle_begin_child, + on_failed_to_clone => $handle_failed_to_clone, + on_notify => $handle_notify, }); if ($add_listener_code != 0) @@ -983,6 +997,8 @@ sub run_responder if (not $clone) { handle_clone_database_connection_failure({ anvil => $anvil, responder => $responder }); + + $anvil->nice_exit({ db_disconnect => 0, exit_code => 1 }); } my @cmd_lines = split(/;;/, $line); From 38d676ab15c6751f9f9015367977eb0f55145f78 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 25 Mar 2025 16:45:41 -0400 Subject: [PATCH 33/47] fix: adjust ping interval, correct docs in Database->add_listener --- Anvil/Tools/Database.pm | 71 +++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 0e9a1d804..7fd97e3ba 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -10,7 +10,7 @@ use DBI; use Scalar::Util qw(weaken isweak); use Proc::Simple; use Text::Diff; -use Time::HiRes qw(gettimeofday tv_interval); +use Time::HiRes qw(gettimeofday sleep tv_interval); use XML::LibXML; our $VERSION = "3.0.0"; @@ -187,32 +187,14 @@ sub parent This method adds 1 listener to the primary database. The listener runs when the primary database executes a pg_notify with the matching name. -Parameters; - -=head3 blocking (optional, default 1) +It takes the same parameters as run_listener, with the following additions. -If set, the listener process will listen to database notifications indefinitely. +Parameters; =head3 name (required) The name of the events to listen for. This must match with the name provided to 1 pg_notify call in procedures called by database triggers. -=head3 on_failed_to_clone (optional) - -If set, runs the referenced subroutine when the call to clone_connection() fails with a non-zero code. - -=head3 on_begin_child (optional) - -If set, runs the referenced subroutine in the listener process after it is created. - -=head3 on_notify (optional) - -If set, runs the referenced subroutine when a database notification is received. - -=head3 uuid (optional) - -If set, the listener will be added to the connection of the database identified with the provided UUID. - =cut sub add_listener { @@ -225,6 +207,8 @@ sub add_listener my $notify_name = $parameters->{name}; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { notify_name => $notify_name } }); + if (not $notify_name) { $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0116", variables => { @@ -19747,7 +19731,40 @@ sub resync_databases This method is an indefinite loop that listens for notify calls from the primary database. -The parameters are exactly the same as the ones used in add_listener. +Parameters; + +=head3 blocking (optional, default 1) + +If set, the listener process will listen to database notifications indefinitely. + +=head3 name (required) + +The name of the events to listen for. This must match with the name provided to 1 pg_notify call in procedures called by database triggers. + +=head3 on_child_forked (optional) + +If set, runs the referenced subroutine in the listener process after it is created. + +=head3 on_failed_to_clone (optional) + +If set, runs the referenced subroutine when the call to clone_connection() fails with a non-zero code. + +=head3 on_failed_to_ping (optional) + +If set, runs the referenced subroutine when a ping to database fails during the blocking check for notify. Likely used for clean up before terminating. + +=head3 on_notify (optional) + +If set, runs the referenced subroutine when a database notification is received. + +=head3 ping_interval (optional, default 60) + +Determines the interval in B<< seconds >> between pinging the database to ensure the connection is healthy. The listener will terminate if a ping fails. + +=head3 uuid (optional) + +If set, the listener will be added to the connection of the database identified with the provided UUID. + =cut sub run_listener @@ -19760,10 +19777,10 @@ sub run_listener $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->run_listener()" } }); my $blocking = $parameters->{blocking} // 1; - my $db_ping_interval = $parameters->{ping_interval} // 1500; + my $db_ping_interval = $parameters->{ping_interval} // 60; my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; my $notify_name = $parameters->{name}; - my $on_begin_child = $parameters->{on_begin_child}; + my $on_child_forked = $parameters->{on_child_forked}; my $on_failed_to_clone = $parameters->{on_failed_to_clone}; my $on_failed_to_ping = $parameters->{on_failed_to_ping}; my $on_notify = $parameters->{on_notify}; @@ -19772,7 +19789,7 @@ sub run_listener blocking => $blocking, db_uuid => $db_uuid, notify_name => $notify_name, - on_begin_child => $on_begin_child, + on_child_forked => $on_child_forked, on_failed_to_clone => $on_failed_to_clone, on_notify => $on_notify, } }); @@ -19812,7 +19829,7 @@ sub run_listener $anvil->nice_exit({ db_disconnect => 0, exit_code => 3 }); } - $on_begin_child->({ anvil => $anvil }) if (ref($on_begin_child) eq "CODE"); + $on_child_forked->({ anvil => $anvil }) if (ref($on_child_forked) eq "CODE"); my ($clone_connection_code, $dbh) = $self->clone_connection({ uuid => $db_uuid }); @@ -19839,7 +19856,7 @@ sub run_listener $anvil->nice_exit({ exit_code => 5 }); } - my $step = 0.2; + my $step = 0.1; my $count = 0; while ($blocking) From acba5c00c890b12ca07e8357341494eab36464eb Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 25 Mar 2025 16:55:18 -0400 Subject: [PATCH 34/47] fix(tools): align handler changes in manage db listeners of access module --- tools/anvil-access-module | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 783231d01..6906670bd 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -636,7 +636,7 @@ sub manage_database_listeners $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { name => $name } }); - my $handle_begin_child = sub { + my $handle_child_forked = sub { # Close the server because the child doesn't need it close($server); @@ -665,20 +665,35 @@ sub manage_database_listeners payload => $payload, }, prefix => "handle_notify" }); - pstdout($name.":".$pid.":".$payload); + $emit->($name.":".$payload); + }; + + local $SIG{INT} = sub { + $emit->("sigint"); + $emit->("exit"); + + $anvil->catch_sig({ signal => "INT" }); + }; + + local $SIG{TERM} = sub { + $emit->("sigterm"); + $emit->("exit"); + + $anvil->catch_sig({ signal => "TERM" }); }; my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ debug => 2, name => $name, - on_begin_child => $handle_begin_child, + on_child_forked => $handle_child_forked, on_failed_to_clone => $handle_failed_to_clone, + on_failed_to_ping => $handle_failed_to_ping, on_notify => $handle_notify, }); if ($add_listener_code != 0) { - pstdout("failed to fork on receive; cause: ".$error); + pstderr("failed to fork on receive; cause: ".$error); return 0; } From 4f5cd3ea23e2d32bc4b8b31811ed01016631f1d9 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Thu, 27 Mar 2025 16:56:28 -0400 Subject: [PATCH 35/47] fix: add destructor to handle listeners clean up in Database module --- Anvil/Tools/Database.pm | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 7fd97e3ba..3c58e21e9 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -177,6 +177,17 @@ sub parent return ($self->{HANDLE}{TOOLS}); } +sub DESTROY +{ + my $self = shift; + + # Clean up all lingering database listeners + foreach my $notify_name (keys %{$self->{listeners}}) + { + $self->remove_listener({ name => $notify_name }); + } +} + ############################################################################################################# # Public methods # From 541288b5d792177e3246e84745e212c224caf56b Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 28 Mar 2025 17:05:59 -0400 Subject: [PATCH 36/47] fix(tools): use jobs listener in daemon --- tools/anvil-daemon | 119 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 112 insertions(+), 7 deletions(-) diff --git a/tools/anvil-daemon b/tools/anvil-daemon index 25279be2b..186fdb957 100755 --- a/tools/anvil-daemon +++ b/tools/anvil-daemon @@ -514,7 +514,11 @@ sub handle_periodic_tasks # This checks to see if the strikers in the DB are also in anvil.conf check_strikers($anvil); - + + # Check if the job listener is alive, try spawning it if not. + # Let the next minute try again if the spawn fails. + spawn_job_listener($anvil) if (not check_job_listener($anvil)); + # Do Striker-specific minute tasks $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { host_type => $host_type }}); if ($host_type eq "striker") @@ -1422,7 +1426,10 @@ sub run_once # Make sure we don't spam the hell out of the console with network manager messages on boot check_sysctl($anvil); - + + # Start listening for jobs. + spawn_job_listener($anvil); + if ($anvil->data->{switches}{'startup-only'}) { $anvil->nice_exit({exit_code => 0}); @@ -1964,7 +1971,7 @@ AND handle_special_cases($anvil); # Now look for jobs that have a job status of 'anvil_startup' - run_jobs($anvil, 1); + run_jobs($anvil, { startup => 1 }); # Check to make sure that we don't have conflicts with the Striker WebUI check_for_conflicting_web_ports($anvil); @@ -2094,7 +2101,7 @@ sub keep_running } # Run any pending jobs by calling 'anvil-jobs' with the 'job_uuid' as a background process. - run_jobs($anvil, 0); + run_jobs($anvil); return(0); } @@ -2104,8 +2111,16 @@ sub keep_running # invoked to handle it. sub run_jobs { - my ($anvil, $startup) = @_; - $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { startup => $startup }}); + my $anvil = shift; + my $parameters = shift; + # Specifies whether we're in the main process, which needs to make way for the database listener to avoid race conditions. + my $in_main = $parameters->{in_main} // 1; + my $startup = $parameters->{startup} // 0; + + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + in_main => $in_main, + startup => $startup, + }}); # Don't start jobs for 30 seconds after startup. if (not $startup) @@ -2181,7 +2196,16 @@ sub run_jobs 's13:started_seconds_ago' => $started_seconds_ago, 's14:updated_seconds_ago' => $updated_seconds_ago, }}); - + + # Let the database listener pick up jobs first, then let the main process pick up any leftovers. + if (($in_main) and (not $anvil->data->{seen_jobs}{$job_uuid})) + { + # We haven't seen this job, mark it as seen but don't process it. + $anvil->data->{seen_jobs}{$job_uuid} = 1; + + next; + } + # If we're not configured, we will only run the 'anvil-configure-host' job my $configured = $anvil->System->check_if_configured; $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { configured => $configured }}); @@ -2235,6 +2259,9 @@ sub run_jobs job_picked_up_by => $job_picked_up_by, "jobs::${job_uuid}::started" => $anvil->data->{jobs}{$job_uuid}{started}, }}); + + # Clean up the "seen" flag for this completed job. + delete $anvil->data->{seen_jobs}{$job_uuid} if ($in_main); } else { @@ -2590,3 +2617,81 @@ sub check_files return(0); } + +sub check_job_listener +{ + my $anvil = shift; + + my $listeners = $anvil->Database->{listeners}{"after_insert_or_update_job"}; + + my $count = 0; + + # Loop through job listeners only, there should only be 1. + foreach my $pid (key %{$listeners}) + { + my $listener = $listeners->{$pid}; + + if ($listener->poll()) + { + # Adjust the return code, any greater than 0 is a pass. + $count += 1; + } + else + { + # The job listener was found dead during a periodic check, dispose the remains. + delete $listeners->{$pid}; + # Beat it just to make sure it's actually dead. + $listener->kill() or $listener->kill("SIGKILL"); + } + } + + return ($count); +} + +sub spawn_job_listener +{ + my $anvil = shift; + + my $handle_notify = sub { + my $params = shift; + my $notify = $params->{notify}; + + my ($name, $pid, $payload) = @$notify; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { + name => $name, + pid => $pid, + payload => $payload, + }, prefix => "handle_notify" }); + + my $job = eval { decode_json($payload); }; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { job => $job } }); + + if (not $job) + { + # Ignore job(s) with malformed payload. + return (0); + } + + if ($job->{job_progress} != 0) + { + # Ignore running or finished job(s). + return (0); + } + + run_jobs($anvil, { in_main => 0 }); + }; + + my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ + name => "after_insert_or_update_job", + on_notify => $handle_notify, + }); + + if ($add_listener_code != 0) + { + return (0, undef, $error); + } + + return (1, $listener); +} From e3e3586e1d1003797bbfa4b8bbe0cf203843994a Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 28 Mar 2025 17:57:28 -0400 Subject: [PATCH 37/47] fix: don't rely on parent in database module because tools are destroyed prior --- Anvil/Tools/Database.pm | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 3c58e21e9..f75571017 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -181,10 +181,19 @@ sub DESTROY { my $self = shift; + my $listeners = $self->{listeners}; + # Clean up all lingering database listeners - foreach my $notify_name (keys %{$self->{listeners}}) + foreach my $forks (values %{$listeners}) { - $self->remove_listener({ name => $notify_name }); + foreach my $fork (values %{$forks}) + { + next if not $fork->poll(); + + print "Database listener with PID: [".$fork->pid."] exiting on DESTROY.\n"; + + $fork->kill() or $fork->kill("SIGKILL"); + } } } From ac1d39f96ef7cfdc8b6fd8848f4ce2b6045da5c1 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 28 Mar 2025 19:25:33 -0400 Subject: [PATCH 38/47] fix(tools): correct typo in daemon --- tools/anvil-daemon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/anvil-daemon b/tools/anvil-daemon index 186fdb957..28e914fa2 100755 --- a/tools/anvil-daemon +++ b/tools/anvil-daemon @@ -2627,7 +2627,7 @@ sub check_job_listener my $count = 0; # Loop through job listeners only, there should only be 1. - foreach my $pid (key %{$listeners}) + foreach my $pid (keys %{$listeners}) { my $listener = $listeners->{$pid}; From 71ac04bf2483d3dfe87558b8a61ee9898ac9441b Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Fri, 28 Mar 2025 20:18:55 -0400 Subject: [PATCH 39/47] fix(tools): update jobs trigger in version changes --- tools/anvil-version-changes | 57 ++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/tools/anvil-version-changes b/tools/anvil-version-changes index 27663ee3b..8256172de 100755 --- a/tools/anvil-version-changes +++ b/tools/anvil-version-changes @@ -108,7 +108,10 @@ sub striker_checks # This adds notes to storage_group_members update_host_variables($anvil); - + + # Change the trigger of the jobs table to include the notify call. + update_jobs_trigger($anvil); + return(0); } @@ -862,6 +865,58 @@ CREATE TRIGGER trigger_host_variables return(0); } +sub update_jobs_trigger +{ + my ($anvil) = @_; + + foreach my $uuid (sort {$a cmp $b} keys %{$anvil->data->{cache}{database_handle}}) + { + my $sql = q|CREATE OR REPLACE FUNCTION history_jobs() RETURNS trigger +AS $$ +DECLARE + history_jobs RECORD; +BEGIN + SELECT INTO history_jobs * FROM jobs WHERE job_uuid = NEW.job_uuid; + INSERT INTO history.jobs + (job_uuid, + job_host_uuid, + job_command, + job_data, + job_picked_up_by, + job_picked_up_at, + job_updated, + job_name, + job_progress, + job_title, + job_description, + job_status, + modified_date) + VALUES + (history_jobs.job_uuid, + history_jobs.job_host_uuid, + history_jobs.job_command, + history_jobs.job_data, + history_jobs.job_picked_up_by, + history_jobs.job_picked_up_at, + history_jobs.job_updated, + history_jobs.job_name, + history_jobs.job_progress, + history_jobs.job_title, + history_jobs.job_description, + history_jobs.job_status, + history_jobs.modified_date); + PERFORM pg_notify('after_insert_or_update_job', row_to_json(NEW)::text); + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +|; + $anvil->Database->write({debug => 2, uuid => $uuid, query => $sql, source => $THIS_FILE, line => __LINE__}); + } + + return(0); +} + sub update_storage_group_members { my ($anvil) = @_; From 7683a608a85f6806fdbe4dc290334bc1e48a1780 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Mon, 31 Mar 2025 14:09:56 -0400 Subject: [PATCH 40/47] fix(tools): remove excess spaces in access module --- tools/anvil-access-module | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/anvil-access-module b/tools/anvil-access-module index 6906670bd..5a728573e 100755 --- a/tools/anvil-access-module +++ b/tools/anvil-access-module @@ -164,9 +164,9 @@ my $running_directory = ($0 =~ /^(.*?)\/$THIS_FILE$/)[0]; $running_directory =~ s/^\./$ENV{PWD}/ if $running_directory =~ /^\./ && $ENV{PWD}; -my $scmd_db_read = "r"; -my $scmd_db_write = "w"; -my $scmd_execute = "x"; +my $scmd_db_read = "r"; +my $scmd_db_write = "w"; +my $scmd_execute = "x"; main(); From 0d9293fcf90a3180d9afbca74e2b20a264eda1ee Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 1 Apr 2025 17:22:40 -0400 Subject: [PATCH 41/47] fix: specify attr in clone of Database->clone_connection --- Anvil/Tools/Database.pm | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index f75571017..48aaf0bcd 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -1001,7 +1001,11 @@ sub clone_connection } # Clone the parent's database handle for child use - my $clone = eval { $dbh->clone(); }; + my $clone = eval { $dbh->clone({ + RaiseError => 1, + AutoCommit => 1, + pg_enable_utf8 => 1, + }); }; $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { base_dbh => $dbh, From 8d0f4969d89f23eb05b1e4955898d429bb76e6cc Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 1 Apr 2025 17:36:58 -0400 Subject: [PATCH 42/47] fix: improve logs in Database->clone_connection() --- Anvil/Tools/Database.pm | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index 48aaf0bcd..e1cf09d04 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -984,20 +984,35 @@ sub clone_connection $anvil->Log->entry({ source => $THIS_FILE, line => __LINE__, level => $debug, key => "log_0125", variables => { method => "Database->clone_connection()" } }); - my $disconnect_base = $parameters->{disconnect_base} // 0; my $db_uuid = $parameters->{uuid} // $anvil->data->{sys}{database}{primary_db}; + my $disconnect_base = $parameters->{disconnect_base} // 0; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + db_uuid => $db_uuid, + disconnect_base => $disconnect_base, + } }); + + my $return_code = 0; if (not $db_uuid) { - return (1); + $return_code = 1; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { return_code => $return_code }, prefix => "Database->clone_connection()" }); + return ($return_code); } # Get the copied parent's database handle, which was made when fork() my $dbh = $anvil->data->{cache}{database_handle}{$db_uuid}; + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { dbh => $dbh } }); + if ((not $dbh) or (not $dbh->ping())) { - return (2); + $return_code = 2; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { return_code => $return_code }, prefix => "Database->clone_connection()" }); + return ($return_code); } # Clone the parent's database handle for child use @@ -1007,21 +1022,20 @@ sub clone_connection pg_enable_utf8 => 1, }); }; - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { - base_dbh => $dbh, - clone_dbh => $clone, - database_uuid => $db_uuid, - } }); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { base => $dbh, clone => $clone }, prefix => "dbh" }); if (not $clone) { # Failed to clone the parent's database handle - return (3); + $return_code = 3; + + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { return_code => $return_code }, prefix => "Database->clone_connection()" }); + return ($return_code); } if ($disconnect_base) { - $dbh->disconnect(); + $dbh->disconnect({ debug => $debug }); } # Release the reference to the copied parent's dbh; this will not close the parent's original database handle when auto_inactive_destroy is set @@ -1030,7 +1044,11 @@ sub clone_connection # Add the cloned child's database handle $anvil->data->{cache}{database_handle}{$db_uuid} = $clone; - return (0, $clone); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + "cache::database_handle::${db_uuid}" => $anvil->data->{cache}{database_handle}{$db_uuid}, + } }); + + return ($return_code, $clone); } From e1f1bfbd5f02236473ab805b6d71cd8a00864e21 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 1 Apr 2025 17:48:12 -0400 Subject: [PATCH 43/47] fix(tools): remove seen_jobs flags in daemon --- tools/anvil-daemon | 38 ++++++++++++-------------------------- 1 file changed, 12 insertions(+), 26 deletions(-) diff --git a/tools/anvil-daemon b/tools/anvil-daemon index 28e914fa2..04d5a3872 100755 --- a/tools/anvil-daemon +++ b/tools/anvil-daemon @@ -1971,7 +1971,7 @@ AND handle_special_cases($anvil); # Now look for jobs that have a job status of 'anvil_startup' - run_jobs($anvil, { startup => 1 }); + run_jobs($anvil, 1); # Check to make sure that we don't have conflicts with the Striker WebUI check_for_conflicting_web_ports($anvil); @@ -2101,7 +2101,7 @@ sub keep_running } # Run any pending jobs by calling 'anvil-jobs' with the 'job_uuid' as a background process. - run_jobs($anvil); + run_jobs($anvil, 0); return(0); } @@ -2111,16 +2111,11 @@ sub keep_running # invoked to handle it. sub run_jobs { - my $anvil = shift; - my $parameters = shift; - # Specifies whether we're in the main process, which needs to make way for the database listener to avoid race conditions. - my $in_main = $parameters->{in_main} // 1; - my $startup = $parameters->{startup} // 0; - - $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { - in_main => $in_main, - startup => $startup, - }}); + my ($anvil, $startup) = @_; + + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { startup => $startup }}); + + $startup = $startup // 0; # Don't start jobs for 30 seconds after startup. if (not $startup) @@ -2197,15 +2192,6 @@ sub run_jobs 's14:updated_seconds_ago' => $updated_seconds_ago, }}); - # Let the database listener pick up jobs first, then let the main process pick up any leftovers. - if (($in_main) and (not $anvil->data->{seen_jobs}{$job_uuid})) - { - # We haven't seen this job, mark it as seen but don't process it. - $anvil->data->{seen_jobs}{$job_uuid} = 1; - - next; - } - # If we're not configured, we will only run the 'anvil-configure-host' job my $configured = $anvil->System->check_if_configured; $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { configured => $configured }}); @@ -2259,9 +2245,6 @@ sub run_jobs job_picked_up_by => $job_picked_up_by, "jobs::${job_uuid}::started" => $anvil->data->{jobs}{$job_uuid}{started}, }}); - - # Clean up the "seen" flag for this completed job. - delete $anvil->data->{seen_jobs}{$job_uuid} if ($in_main); } else { @@ -2666,7 +2649,7 @@ sub spawn_job_listener my $job = eval { decode_json($payload); }; - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => { job => $job } }); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 3, list => { job => $job } }); if (not $job) { @@ -2674,16 +2657,19 @@ sub spawn_job_listener return (0); } + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => 2, list => $job, prefix => "notify" }); + if ($job->{job_progress} != 0) { # Ignore running or finished job(s). return (0); } - run_jobs($anvil, { in_main => 0 }); + run_jobs($anvil, 0); }; my ($add_listener_code, $listener, $error) = $anvil->Database->add_listener({ + debug => 2, name => "after_insert_or_update_job", on_notify => $handle_notify, }); From b7143c68b27bf1d2b41c0dc85927ba3d83c1a2fa Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Tue, 1 Apr 2025 19:19:55 -0400 Subject: [PATCH 44/47] fix(tools): search processes to avoid duplicating jobs in daemon --- tools/anvil-daemon | 46 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/tools/anvil-daemon b/tools/anvil-daemon index 04d5a3872..8e38c8013 100755 --- a/tools/anvil-daemon +++ b/tools/anvil-daemon @@ -2251,13 +2251,13 @@ sub run_jobs $anvil->data->{sys}{jobs_running} = 1; $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "sys::jobs_running" => $anvil->data->{sys}{jobs_running} }}); } - + + # Get the list of processes on this machine. + $anvil->System->pids({ignore_me => 1}); + # See if the job was picked up by a now-dead instance. if ($job_picked_up_by) { - # Check if the PID is still active. - $anvil->System->pids({ignore_me => 1}); - ### TODO: Add a check to verify the job isn't hung. # Skip if this job is in progress. if (not exists $anvil->data->{pids}{$job_picked_up_by}) @@ -2326,6 +2326,44 @@ sub run_jobs } } } + else + { + # Maybe the database doesn't know about the job yet, try to find it in the processes. + # + # This should prevent duplicates caused by race condition between the daemon's main process and its database listener. + + my $found = 0; + + foreach my $pid (keys %{$anvil->data->{pids}}) + { + my $process = $anvil->data->{pids}{$pid}; + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { pid => $pid, %$process }}); + + # Only look at processes with the '--job-uuid' flag; ignore the ones started on CLI. + if ($process->{command} =~ /--job-uuid(?:=|\s*)(\w{8}-(?:\w{4}-){3}\w{12})/) + { + my $process_flag_job_uuid = $1; + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + job_uuid => $job_uuid, + process_flag_job_uuid => $process_flag_job_uuid, + }}); + + if ($process_flag_job_uuid eq $job_uuid) + { + # The jobs is already started by another process but not recorded into the database yet. + # + # Mark to skip it. + $found = 1; + $anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { found => $found }}); + + last; + } + } + } + + # Don't run the job when it's alredy running. + next if ($found); + } # Convert the double-banged strings into a proper message. my $say_title = $job_title ? $anvil->Words->parse_banged_string({key_string => $job_title}) : ""; From a22d471e928f9d867601c58117bf8d79c341d605 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 2 Apr 2025 00:42:10 -0400 Subject: [PATCH 45/47] fix: enable auto inactive destroy on clone, log special var in Database->clone_connection() --- Anvil/Tools/Database.pm | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index e1cf09d04..c514a3f0d 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -1015,14 +1015,22 @@ sub clone_connection return ($return_code); } + # Clear the error container before the clone. + local $@; + # Clone the parent's database handle for child use my $clone = eval { $dbh->clone({ - RaiseError => 1, - AutoCommit => 1, - pg_enable_utf8 => 1, + RaiseError => 1, + AutoCommit => 1, + AutoInactiveDestroy => 1, + pg_enable_utf8 => 1, }); }; - $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { base => $dbh, clone => $clone }, prefix => "dbh" }); + $anvil->Log->variables({ source => $THIS_FILE, line => __LINE__, level => $debug, list => { + '$@' => $@, + base => $dbh, + clone => $clone, + }, prefix => "dbh" }); if (not $clone) { From 130e77302409e0beef6365fb3d9048eaa1c4fd02 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 2 Apr 2025 01:11:50 -0400 Subject: [PATCH 46/47] fix(tools): find duplicates only for not-started jobs in daemon --- tools/anvil-daemon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/anvil-daemon b/tools/anvil-daemon index 8e38c8013..e8f4169c6 100755 --- a/tools/anvil-daemon +++ b/tools/anvil-daemon @@ -2326,7 +2326,7 @@ sub run_jobs } } } - else + elsif ($job_progress == 0) # The race condition handling should only look at not-started jobs. { # Maybe the database doesn't know about the job yet, try to find it in the processes. # From 76b45d258b533757de487d27df832775c09ecfa7 Mon Sep 17 00:00:00 2001 From: Tsu-ba-me Date: Wed, 2 Apr 2025 16:49:39 -0400 Subject: [PATCH 47/47] fix: replace destructor with Proc::Simple's destroy flag in Database->add_listener --- Anvil/Tools/Database.pm | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/Anvil/Tools/Database.pm b/Anvil/Tools/Database.pm index c514a3f0d..cec0a7579 100644 --- a/Anvil/Tools/Database.pm +++ b/Anvil/Tools/Database.pm @@ -177,26 +177,6 @@ sub parent return ($self->{HANDLE}{TOOLS}); } -sub DESTROY -{ - my $self = shift; - - my $listeners = $self->{listeners}; - - # Clean up all lingering database listeners - foreach my $forks (values %{$listeners}) - { - foreach my $fork (values %{$forks}) - { - next if not $fork->poll(); - - print "Database listener with PID: [".$fork->pid."] exiting on DESTROY.\n"; - - $fork->kill() or $fork->kill("SIGKILL"); - } - } -} - ############################################################################################################# # Public methods # @@ -242,6 +222,11 @@ sub add_listener my $listener = Proc::Simple->new(); + # The reference of all listeners are stored in $self->{listeners}, which will be removed on DESTROY of $self. + # + # Set the flag to kill the listener when all references to it are removed. + $listener->kill_on_destroy(1); + my $forked = $listener->start(\&run_listener, $self, $parameters); if (not $forked)