Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ where
match parts.len() {
0 => {
for (_, pool) in get_all_pools() {
pool.resume();
pool.resume().await;
}

let mut res = BytesMut::new();
Expand All @@ -911,7 +911,7 @@ where

match get_pool(database, user) {
Some(pool) => {
pool.resume();
pool.resume().await;

let mut res = BytesMut::new();

Expand Down
2 changes: 2 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,8 @@ where
};

// Check if the pool is paused and wait until it's resumed.
// This check is needed for clients already in the query loop when PAUSE is issued,
// and also after pool refresh (RELOAD) to ensure pause state is respected.
pool.wait_paused().await;

// Refresh pool information, something might have changed.
Expand Down
14 changes: 13 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::io::AsyncReadExt;

use crate::dns_cache::CachedResolver;
use crate::errors::Error;
use crate::pool::{ClientServerMap, ConnectionPool};
use crate::pool::{get_all_pools, ClientServerMap, ConnectionPool};
use crate::sharding::ShardingFunction;
use crate::stats::AddressStats;
use crate::tls::{load_certs, load_keys};
Expand Down Expand Up @@ -1604,6 +1604,18 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E

if old_config != new_config {
info!("Config changed, reloading");

// Resume any paused old pools BEFORE creating new pools.
// This wakes up clients waiting on old pool's paused_waiter,
// so they can refresh to the new pool. Without this, clients
// waiting on old pool during RELOAD would deadlock forever.
for (_, old_pool) in get_all_pools() {
if old_pool.paused() {
info!("Resuming old pool before reload to wake up waiting clients");
old_pool.resume().await;
}
}

ConnectionPool::from_config(client_server_map).await?;
Ok(true)
} else {
Expand Down
16 changes: 15 additions & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,21 @@ impl ConnectionPool {
}

/// Resume the pool, allowing queries and resuming any pending queries.
pub fn resume(&self) {
/// If the pool is not validated (e.g., created during PAUSE via RELOAD),
/// validate it first before resuming to ensure it's ready for use.
pub async fn resume(&self) {
// Validate pool before resuming if it hasn't been validated yet.
// This handles the case where RELOAD was called during PAUSE,
// creating a new unvalidated pool. We validate it now to ensure
// clients won't block during authentication.
if !self.validated() {
info!("Pool not validated, validating before resume");
if let Err(e) = self.validate().await {
error!("Failed to validate pool during resume: {:?}", e);
// Continue with resume anyway - errors will surface when clients try to use it
}
}

self.paused.store(false, Ordering::Relaxed);
self.paused_waiter.notify_waiters();
}
Expand Down
179 changes: 179 additions & 0 deletions tests/ruby/pause_new_connections_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# frozen_string_literal: true
require_relative 'spec_helper'

describe "PAUSE with new client connections" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }

after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end

describe "New connections during PAUSE" do
it "should complete authentication and wait, not hang" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)

# Verify pool is not paused initially
results = admin_conn.async_exec("SHOW DATABASES").to_a
paused_pools = results.select { |r| r["database"] == "sharded_db" && r["paused"] == "1" }
expect(paused_pools).to be_empty, "Pool should not be paused initially"

# PAUSE the pool
admin_conn.async_exec("PAUSE")

# Verify pool is now paused
results = admin_conn.async_exec("SHOW DATABASES").to_a
paused_pools = results.select { |r| r["database"] == "sharded_db" && r["paused"] == "1" }
expect(paused_pools).not_to be_empty, "Pool should be paused"

# THIS IS THE KEY TEST:
# Try to connect a NEW client DURING pause
# Without the fix: This would hang during authentication or block indefinitely
# With the fix: This should complete authentication and return a connection object

connection_completed = false
connection_error = nil
query_result = nil
query_start_time = nil
query_end_time = nil

# Attempt connection in a separate thread with timeout
connect_thread = Thread.new do
begin
# This should complete quickly even though pool is paused
# The fix ensures authentication completes before checking pause state
new_conn = PG::connect(pgcat_conn_str)
connection_completed = true

# Now try to execute a query
# This SHOULD block until RESUME is issued
query_start_time = Time.now
query_result = new_conn.async_exec("SELECT 1 as test_value").to_a
query_end_time = Time.now

new_conn.close
rescue => e
connection_error = e
end
end

# Give the connection attempt time to complete authentication
# Without the fix: This would timeout or hang
# With the fix: Connection should complete within 2 seconds
sleep(2)

# ASSERTION 1: Connection should have completed (authentication done)
expect(connection_completed).to be(true),
"Connection should complete authentication even during PAUSE. " \
"Error: #{connection_error}"

# ASSERTION 2: Query should NOT have completed yet (waiting for RESUME)
expect(query_result).to be_nil,
"Query should not execute while pool is paused"

# Wait a bit more to ensure query is truly blocked
sleep(2)
expect(query_result).to be_nil,
"Query should still be blocked 4 seconds after PAUSE"

# Now RESUME the pool
admin_conn.async_exec("RESUME")

# Wait for the query to complete (should happen immediately after RESUME)
connect_thread.join(5) # 5 second timeout

# ASSERTION 3: Thread should have completed
expect(connect_thread.alive?).to be(false),
"Connection thread should complete after RESUME"

# ASSERTION 4: Query should have completed successfully
expect(query_result).not_to be_nil,
"Query should complete after RESUME"
expect(query_result.first["test_value"]).to eq("1"),
"Query should return correct result"

# ASSERTION 5: Query should have been blocked for at least 2 seconds
# (the time between when we checked it was nil and when we issued RESUME)
query_duration = query_end_time - query_start_time
expect(query_duration).to be >= 2.0,
"Query should have been blocked for at least 2 seconds during PAUSE, " \
"but completed in #{query_duration} seconds"

admin_conn.close
end

it "should handle multiple new connections during PAUSE" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)

# PAUSE the pool
admin_conn.async_exec("PAUSE")

# Start multiple connections during PAUSE
threads = []
connection_results = []

5.times do |i|
threads << Thread.new do
begin
conn = PG::connect(pgcat_conn_str)
result = conn.async_exec("SELECT #{i} as conn_id").to_a
conn.close
connection_results[i] = result.first["conn_id"].to_i
rescue => e
connection_results[i] = "ERROR: #{e.message}"
end
end
end

# Wait a moment for connections to authenticate
sleep(2)

# All connections should be waiting (not completed queries)
expect(connection_results.compact.size).to eq(0),
"No queries should complete during PAUSE"

# RESUME
admin_conn.async_exec("RESUME")

# Wait for all threads to complete
threads.each { |t| t.join(10) }

# All connections should have completed successfully
expect(connection_results.compact.size).to eq(5),
"All 5 connections should complete after RESUME"

# Verify each connection got the right result
5.times do |i|
expect(connection_results[i]).to eq(i),
"Connection #{i} should return correct result"
end

admin_conn.close
end

it "should not affect admin connections during PAUSE" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)

# PAUSE the pool
admin_conn.async_exec("PAUSE")

# Admin connection should still work immediately
# (not blocked by PAUSE)
result = admin_conn.async_exec("SHOW DATABASES").to_a
expect(result).not_to be_empty

# Create a NEW admin connection during PAUSE
# This should work immediately
new_admin_conn = PG::connect(processes.pgcat.admin_connection_string)
result = new_admin_conn.async_exec("SHOW POOLS").to_a
expect(result).not_to be_empty

new_admin_conn.close

# RESUME
admin_conn.async_exec("RESUME")
admin_conn.close
end
end
end