From 5d38abe0640c009d3a4796eb59d6446bd6b0aec6 Mon Sep 17 00:00:00 2001 From: mantis78 Date: Wed, 19 Nov 2025 11:30:50 +0800 Subject: [PATCH] Fix pool reference deadlock during PAUSE/RELOAD/RESUME MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes a critical deadlock issue where client connections would hang indefinitely during database switchover operations involving PAUSE, RELOAD, and RESUME commands in transaction pooling mode. ## Problem When performing a hot database switchover (PAUSE → RELOAD → RESUME), client applications would become permanently stuck when trying to establish new connections, even after RESUME completed. This made PgCat unsuitable for zero-downtime database migration scenarios. The issue had two root causes: 1. **Pool Reference Deadlock**: When RELOAD creates a new pool object, clients waiting on the old pool's `paused_waiter` were never woken up when RESUME was called on the new pool. This caused permanent deadlock because: - Client holds reference to OLD pool (pre-RELOAD) - Client blocks on `OLD_pool.wait_paused().await` - RELOAD creates NEW pool with different `paused_waiter` - RESUME calls `NEW_pool.resume()` - Client still waiting on OLD pool → deadlock! 2. **Unvalidated Pools After RELOAD**: New pools created during PAUSE were not validated before use, potentially causing authentication to block if validation was triggered during client connection. ## Solution This fix implements a two-part solution: ### Part 1: Make resume() async and validate pools (pool.rs) - Changed `resume()` from sync to async function - Added pool validation before resuming if pool is unvalidated - Ensures pools are ready for use before accepting client connections - Updated all call sites in admin.rs to await the async function ### Part 2: Resume old pools before RELOAD (config.rs) - Before creating new pools during RELOAD, explicitly resume all paused old pools to wake up waiting clients - This allows clients to: 1. Wake up from old pool's `wait_paused()` 2. Reach the `pool = self.get_pool()` refresh line 3. Obtain reference to NEW pool 4. Continue normal operation with new pool ## Testing **Unit Tests**: All 38 unit tests + 4 doc tests pass - ✓ `cargo test` - all tests passing - ✓ `cargo fmt` - code properly formatted - ✓ `cargo clippy` - no warnings **Integration Tests**: Real-world database switchover scenario - ✓ PAUSE during active write workload (transaction pooling) - ✓ RELOAD with backend database change (postgres1 → postgres2) - ✓ RESUME with immediate write recovery - ✓ No connection hangs - ✓ No data loss or sequence gaps - ✓ 47 successful writes within 5 seconds post-RESUME Test results from production-like switchover scenario: ``` Recent writes (last 5s): postgres1: 0 postgres2: 47 ✓ postgres2 is receiving writes, postgres1 is not - SWITCHOVER SUCCESSFUL! ✓ Writer recovered successfully (iteration 200 → 300) ✓ No gaps in iteration sequence ``` ## Files Changed - `src/pool.rs`: Made `resume()` async, added validation logic - `src/admin.rs`: Updated `resume()` call sites to await - `src/config.rs`: Added old pool resume before RELOAD, imported `get_all_pools` - `tests/ruby/pause_new_connections_spec.rb`: Added comprehensive test ## Impact This fix enables: - Zero-downtime database migrations using PgCat - Safe hot switchover between primary/replica or different databases - Reliable PAUSE/RELOAD/RESUME workflow in production environments - Compatibility with transaction pooling mode during switchovers ## Related Issues This addresses the issue described in FIX.md regarding broken PAUSE/RESUME support where new client connections would hang indefinitely during PAUSE operations. 🤖 Co-Authored-By: Claude --- src/admin.rs | 4 +- src/client.rs | 2 + src/config.rs | 14 +- src/pool.rs | 16 +- tests/ruby/pause_new_connections_spec.rb | 179 +++++++++++++++++++++++ 5 files changed, 211 insertions(+), 4 deletions(-) create mode 100644 tests/ruby/pause_new_connections_spec.rb diff --git a/src/admin.rs b/src/admin.rs index f08ef2e1..da1815a1 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -891,7 +891,7 @@ where match parts.len() { 0 => { for (_, pool) in get_all_pools() { - pool.resume(); + pool.resume().await; } let mut res = BytesMut::new(); @@ -911,7 +911,7 @@ where match get_pool(database, user) { Some(pool) => { - pool.resume(); + pool.resume().await; let mut res = BytesMut::new(); diff --git a/src/client.rs b/src/client.rs index c72e9d2a..336ae512 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1057,6 +1057,8 @@ where }; // Check if the pool is paused and wait until it's resumed. + // This check is needed for clients already in the query loop when PAUSE is issued, + // and also after pool refresh (RELOAD) to ensure pause state is respected. pool.wait_paused().await; // Refresh pool information, something might have changed. diff --git a/src/config.rs b/src/config.rs index e56f92b9..f4ca0e7d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,7 @@ use tokio::io::AsyncReadExt; use crate::dns_cache::CachedResolver; use crate::errors::Error; -use crate::pool::{ClientServerMap, ConnectionPool}; +use crate::pool::{get_all_pools, ClientServerMap, ConnectionPool}; use crate::sharding::ShardingFunction; use crate::stats::AddressStats; use crate::tls::{load_certs, load_keys}; @@ -1604,6 +1604,18 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result e + connection_error = e + end + end + + # Give the connection attempt time to complete authentication + # Without the fix: This would timeout or hang + # With the fix: Connection should complete within 2 seconds + sleep(2) + + # ASSERTION 1: Connection should have completed (authentication done) + expect(connection_completed).to be(true), + "Connection should complete authentication even during PAUSE. " \ + "Error: #{connection_error}" + + # ASSERTION 2: Query should NOT have completed yet (waiting for RESUME) + expect(query_result).to be_nil, + "Query should not execute while pool is paused" + + # Wait a bit more to ensure query is truly blocked + sleep(2) + expect(query_result).to be_nil, + "Query should still be blocked 4 seconds after PAUSE" + + # Now RESUME the pool + admin_conn.async_exec("RESUME") + + # Wait for the query to complete (should happen immediately after RESUME) + connect_thread.join(5) # 5 second timeout + + # ASSERTION 3: Thread should have completed + expect(connect_thread.alive?).to be(false), + "Connection thread should complete after RESUME" + + # ASSERTION 4: Query should have completed successfully + expect(query_result).not_to be_nil, + "Query should complete after RESUME" + expect(query_result.first["test_value"]).to eq("1"), + "Query should return correct result" + + # ASSERTION 5: Query should have been blocked for at least 2 seconds + # (the time between when we checked it was nil and when we issued RESUME) + query_duration = query_end_time - query_start_time + expect(query_duration).to be >= 2.0, + "Query should have been blocked for at least 2 seconds during PAUSE, " \ + "but completed in #{query_duration} seconds" + + admin_conn.close + end + + it "should handle multiple new connections during PAUSE" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + + # PAUSE the pool + admin_conn.async_exec("PAUSE") + + # Start multiple connections during PAUSE + threads = [] + connection_results = [] + + 5.times do |i| + threads << Thread.new do + begin + conn = PG::connect(pgcat_conn_str) + result = conn.async_exec("SELECT #{i} as conn_id").to_a + conn.close + connection_results[i] = result.first["conn_id"].to_i + rescue => e + connection_results[i] = "ERROR: #{e.message}" + end + end + end + + # Wait a moment for connections to authenticate + sleep(2) + + # All connections should be waiting (not completed queries) + expect(connection_results.compact.size).to eq(0), + "No queries should complete during PAUSE" + + # RESUME + admin_conn.async_exec("RESUME") + + # Wait for all threads to complete + threads.each { |t| t.join(10) } + + # All connections should have completed successfully + expect(connection_results.compact.size).to eq(5), + "All 5 connections should complete after RESUME" + + # Verify each connection got the right result + 5.times do |i| + expect(connection_results[i]).to eq(i), + "Connection #{i} should return correct result" + end + + admin_conn.close + end + + it "should not affect admin connections during PAUSE" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + + # PAUSE the pool + admin_conn.async_exec("PAUSE") + + # Admin connection should still work immediately + # (not blocked by PAUSE) + result = admin_conn.async_exec("SHOW DATABASES").to_a + expect(result).not_to be_empty + + # Create a NEW admin connection during PAUSE + # This should work immediately + new_admin_conn = PG::connect(processes.pgcat.admin_connection_string) + result = new_admin_conn.async_exec("SHOW POOLS").to_a + expect(result).not_to be_empty + + new_admin_conn.close + + # RESUME + admin_conn.async_exec("RESUME") + admin_conn.close + end + end +end