diff --git a/src/admin.rs b/src/admin.rs index f08ef2e1..da1815a1 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -891,7 +891,7 @@ where match parts.len() { 0 => { for (_, pool) in get_all_pools() { - pool.resume(); + pool.resume().await; } let mut res = BytesMut::new(); @@ -911,7 +911,7 @@ where match get_pool(database, user) { Some(pool) => { - pool.resume(); + pool.resume().await; let mut res = BytesMut::new(); diff --git a/src/client.rs b/src/client.rs index c72e9d2a..336ae512 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1057,6 +1057,8 @@ where }; // Check if the pool is paused and wait until it's resumed. + // This check is needed for clients already in the query loop when PAUSE is issued, + // and also after pool refresh (RELOAD) to ensure pause state is respected. pool.wait_paused().await; // Refresh pool information, something might have changed. diff --git a/src/config.rs b/src/config.rs index e56f92b9..f4ca0e7d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,7 @@ use tokio::io::AsyncReadExt; use crate::dns_cache::CachedResolver; use crate::errors::Error; -use crate::pool::{ClientServerMap, ConnectionPool}; +use crate::pool::{get_all_pools, ClientServerMap, ConnectionPool}; use crate::sharding::ShardingFunction; use crate::stats::AddressStats; use crate::tls::{load_certs, load_keys}; @@ -1604,6 +1604,18 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result e + connection_error = e + end + end + + # Give the connection attempt time to complete authentication + # Without the fix: This would timeout or hang + # With the fix: Connection should complete within 2 seconds + sleep(2) + + # ASSERTION 1: Connection should have completed (authentication done) + expect(connection_completed).to be(true), + "Connection should complete authentication even during PAUSE. " \ + "Error: #{connection_error}" + + # ASSERTION 2: Query should NOT have completed yet (waiting for RESUME) + expect(query_result).to be_nil, + "Query should not execute while pool is paused" + + # Wait a bit more to ensure query is truly blocked + sleep(2) + expect(query_result).to be_nil, + "Query should still be blocked 4 seconds after PAUSE" + + # Now RESUME the pool + admin_conn.async_exec("RESUME") + + # Wait for the query to complete (should happen immediately after RESUME) + connect_thread.join(5) # 5 second timeout + + # ASSERTION 3: Thread should have completed + expect(connect_thread.alive?).to be(false), + "Connection thread should complete after RESUME" + + # ASSERTION 4: Query should have completed successfully + expect(query_result).not_to be_nil, + "Query should complete after RESUME" + expect(query_result.first["test_value"]).to eq("1"), + "Query should return correct result" + + # ASSERTION 5: Query should have been blocked for at least 2 seconds + # (the time between when we checked it was nil and when we issued RESUME) + query_duration = query_end_time - query_start_time + expect(query_duration).to be >= 2.0, + "Query should have been blocked for at least 2 seconds during PAUSE, " \ + "but completed in #{query_duration} seconds" + + admin_conn.close + end + + it "should handle multiple new connections during PAUSE" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + + # PAUSE the pool + admin_conn.async_exec("PAUSE") + + # Start multiple connections during PAUSE + threads = [] + connection_results = [] + + 5.times do |i| + threads << Thread.new do + begin + conn = PG::connect(pgcat_conn_str) + result = conn.async_exec("SELECT #{i} as conn_id").to_a + conn.close + connection_results[i] = result.first["conn_id"].to_i + rescue => e + connection_results[i] = "ERROR: #{e.message}" + end + end + end + + # Wait a moment for connections to authenticate + sleep(2) + + # All connections should be waiting (not completed queries) + expect(connection_results.compact.size).to eq(0), + "No queries should complete during PAUSE" + + # RESUME + admin_conn.async_exec("RESUME") + + # Wait for all threads to complete + threads.each { |t| t.join(10) } + + # All connections should have completed successfully + expect(connection_results.compact.size).to eq(5), + "All 5 connections should complete after RESUME" + + # Verify each connection got the right result + 5.times do |i| + expect(connection_results[i]).to eq(i), + "Connection #{i} should return correct result" + end + + admin_conn.close + end + + it "should not affect admin connections during PAUSE" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + + # PAUSE the pool + admin_conn.async_exec("PAUSE") + + # Admin connection should still work immediately + # (not blocked by PAUSE) + result = admin_conn.async_exec("SHOW DATABASES").to_a + expect(result).not_to be_empty + + # Create a NEW admin connection during PAUSE + # This should work immediately + new_admin_conn = PG::connect(processes.pgcat.admin_connection_string) + result = new_admin_conn.async_exec("SHOW POOLS").to_a + expect(result).not_to be_empty + + new_admin_conn.close + + # RESUME + admin_conn.async_exec("RESUME") + admin_conn.close + end + end +end