diff --git a/pgdog/src/backend/disconnect_reason.rs b/pgdog/src/backend/disconnect_reason.rs new file mode 100644 index 00000000..f49edaa3 --- /dev/null +++ b/pgdog/src/backend/disconnect_reason.rs @@ -0,0 +1,39 @@ +use std::fmt::Display; + +#[derive(Debug, Clone, Copy, Default)] +pub enum DisconnectReason { + Idle, + Old, + Error, + Offline, + ForceClose, + Paused, + ReplicationMode, + OutOfSync, + Unhealthy, + Healthcheck, + PubSub, + #[default] + Other, +} + +impl Display for DisconnectReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let reason = match self { + Self::Idle => "idle", + Self::Old => "max age", + Self::Error => "error", + Self::Other => "other", + Self::ForceClose => "force close", + Self::Paused => "pool paused", + Self::Offline => "pool offline", + Self::OutOfSync => "out of sync", + Self::ReplicationMode => "in replication mode", + Self::Unhealthy => "unhealthy", + Self::Healthcheck => "standalone healthcheck", + Self::PubSub => "pub/sub", + }; + + write!(f, "{}", reason) + } +} diff --git a/pgdog/src/backend/mod.rs b/pgdog/src/backend/mod.rs index 8498d760..7cfccd95 100644 --- a/pgdog/src/backend/mod.rs +++ b/pgdog/src/backend/mod.rs @@ -1,6 +1,7 @@ //! pgDog backend managers connections to PostgreSQL. pub mod databases; +pub mod disconnect_reason; pub mod error; pub mod maintenance_mode; pub mod pool; @@ -14,6 +15,7 @@ pub mod server; pub mod server_options; pub mod stats; +pub use disconnect_reason::DisconnectReason; pub use error::Error; pub use pool::{Cluster, ClusterShardConfig, LoadBalancer, Pool, Shard, ShardingSchema}; pub use prepared_statements::PreparedStatements; diff --git a/pgdog/src/backend/pool/inner.rs b/pgdog/src/backend/pool/inner.rs index 53037e71..405cc790 100644 --- a/pgdog/src/backend/pool/inner.rs +++ b/pgdog/src/backend/pool/inner.rs @@ -4,6 +4,7 @@ use std::cmp::max; use std::collections::VecDeque; use std::time::Duration; +use crate::backend::DisconnectReason; use crate::backend::{stats::Counts as BackendCounts, Server}; use crate::net::messages::BackendKeyData; @@ -155,12 +156,15 @@ impl Inner { let max_age = self.config.max_age; let mut removed = 0; - self.idle_connections.retain(|c| { + self.idle_connections.retain_mut(|c| { let age = c.age(now); let keep = age < max_age; if !keep { removed += 1; } + if !keep { + c.disconnect_reason(DisconnectReason::Old); + } keep }); @@ -174,16 +178,22 @@ impl Inner { let (mut remove, mut removed) = (self.can_remove(), 0); let idle_timeout = self.config.idle_timeout; - self.idle_connections.retain(|c| { + self.idle_connections.retain_mut(|c| { let idle_for = c.idle_for(now); - if remove > 0 && idle_for >= idle_timeout { + let keep = if remove > 0 && idle_for >= idle_timeout { remove -= 1; removed += 1; false } else { true + }; + + if !keep { + c.disconnect_reason(DisconnectReason::Idle); } + + keep }); removed @@ -242,6 +252,9 @@ impl Inner { /// Dump all idle connections. #[inline] pub(super) fn dump_idle(&mut self) { + for conn in &mut self.idle_connections { + conn.disconnect_reason(DisconnectReason::Offline); + } self.idle_connections.clear(); } @@ -297,6 +310,7 @@ impl Inner { if server.error() { self.errors += 1; result.server_error = true; + server.disconnect_reason(DisconnectReason::Error); return result; } @@ -309,18 +323,21 @@ impl Inner { // Close connections exceeding max age. if server.age(now) >= self.config.max_age { + server.disconnect_reason(DisconnectReason::Old); return result; } // Force close the connection. if server.force_close() { self.force_close += 1; + server.disconnect_reason(DisconnectReason::ForceClose); return result; } // Close connections in replication mode, // they are generally not re-usable. if server.replication_mode() { + server.disconnect_reason(DisconnectReason::ReplicationMode); return result; } @@ -335,6 +352,7 @@ impl Inner { self.put(server, now); } else { self.out_of_sync += 1; + server.disconnect_reason(DisconnectReason::OutOfSync); } result diff --git a/pgdog/src/backend/pool/monitor.rs b/pgdog/src/backend/pool/monitor.rs index 091cf432..5faa0cdf 100644 --- a/pgdog/src/backend/pool/monitor.rs +++ b/pgdog/src/backend/pool/monitor.rs @@ -35,7 +35,7 @@ use std::time::Duration; use super::{Error, Guard, Healtcheck, Oids, Pool, Request}; -use crate::backend::Server; +use crate::backend::{DisconnectReason, Server}; use tokio::time::{interval, sleep, timeout, Instant}; use tokio::{select, task::spawn}; @@ -287,6 +287,8 @@ impl Monitor { .await .map_err(|_| Error::HealthcheckError)?; + server.disconnect_reason(DisconnectReason::Healthcheck); + Healtcheck::mandatory(&mut server, pool, healthcheck_timeout) .healthcheck() .await?; diff --git a/pgdog/src/backend/pool/pool_impl.rs b/pgdog/src/backend/pool/pool_impl.rs index 7b353926..02898188 100644 --- a/pgdog/src/backend/pool/pool_impl.rs +++ b/pgdog/src/backend/pool/pool_impl.rs @@ -11,7 +11,7 @@ use tokio::time::{timeout, Instant}; use tracing::error; use crate::backend::pool::LsnStats; -use crate::backend::{Server, ServerOptions}; +use crate::backend::{DisconnectReason, Server, ServerOptions}; use crate::config::PoolerMode; use crate::net::messages::BackendKeyData; use crate::net::{Parameter, Parameters}; @@ -202,6 +202,7 @@ impl Pool { ); if let Err(err) = healthcheck.healthcheck().await { + conn.disconnect_reason(DisconnectReason::Unhealthy); drop(conn); self.inner.health.toggle(false); return Err(err); diff --git a/pgdog/src/backend/pub_sub/listener.rs b/pgdog/src/backend/pub_sub/listener.rs index 802a2db6..6289d136 100644 --- a/pgdog/src/backend/pub_sub/listener.rs +++ b/pgdog/src/backend/pub_sub/listener.rs @@ -15,7 +15,7 @@ use tokio::{ use tracing::{debug, error, info}; use crate::{ - backend::{self, pool::Error, Pool}, + backend::{self, pool::Error, DisconnectReason, Pool}, config::config, net::{ BackendKeyData, FromBytes, NotificationResponse, Parameter, Parameters, Protocol, @@ -161,6 +161,7 @@ impl PubSubListener { info!("pub/sub started [{}]", pool.addr()); let mut server = pool.standalone().await?; + server .link_client( &BackendKeyData::new(), @@ -179,7 +180,10 @@ impl PubSubListener { .keys() .map(|channel| Request::Subscribe(channel.to_string()).into()) .collect::>(); - server.send(&resub.into()).await?; + + if !resub.is_empty() { + server.send(&resub.into()).await?; + } loop { select! { @@ -214,6 +218,7 @@ impl PubSubListener { debug!("pub/sub request {:?}", req); server.send(&vec![req.into()].into()).await?; } else { + server.disconnect_reason(DisconnectReason::Offline); break; } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 626c18b0..a7f67030 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -13,8 +13,8 @@ use tokio::{ use tracing::{debug, error, info, trace, warn}; use super::{ - pool::Address, prepared_statements::HandleResult, Error, PreparedStatements, ServerOptions, - Stats, + pool::Address, prepared_statements::HandleResult, DisconnectReason, Error, PreparedStatements, + ServerOptions, Stats, }; use crate::{ auth::{md5, scram::Client}, @@ -61,6 +61,7 @@ pub struct Server { replication_mode: bool, pooler_mode: PoolerMode, stream_buffer: MessageBuffer, + disconnect_reason: Option, } impl MemoryUsage for Server { @@ -265,6 +266,7 @@ impl Server { re_synced: false, pooler_mode: PoolerMode::Transaction, stream_buffer: MessageBuffer::new(cfg.config.memory.message_buffer), + disconnect_reason: None, }; server.stats.memory_used(server.memory_stats()); // Stream capacity. @@ -839,6 +841,13 @@ impl Server { self.re_synced } + #[inline] + pub fn disconnect_reason(&mut self, reason: DisconnectReason) { + if self.disconnect_reason.is_none() { + self.disconnect_reason = Some(reason); + } + } + /// Server connection unique identifier. #[inline] pub fn id(&self) -> &BackendKeyData { @@ -956,15 +965,12 @@ impl Drop for Server { fn drop(&mut self) { self.stats().disconnect(); if let Some(mut stream) = self.stream.take() { - // If you see a lot of these, tell your clients - // to not send queries unless they are willing to stick - // around for results. - let out_of_sync = if self.done() { - " ".into() - } else { - format!(" {} ", self.stats().state) - }; - info!("closing{}server connection [{}]", out_of_sync, self.addr,); + info!( + "closing server connection [{}, state: {}, reason: {}]", + self.addr, + self.stats.state, + self.disconnect_reason.take().unwrap_or_default(), + ); spawn(async move { stream.write_all(&Terminate.to_bytes()?).await?; @@ -1010,6 +1016,7 @@ pub mod test { replication_mode: false, pooler_mode: PoolerMode::Transaction, stream_buffer: MessageBuffer::new(4096), + disconnect_reason: None, } } }