Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pgdog/src/backend/disconnect_reason.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::fmt::Display;

#[derive(Debug, Clone, Copy, Default)]
pub enum DisconnectReason {
Idle,
Old,
Error,
Offline,
ForceClose,
Paused,
ReplicationMode,
OutOfSync,
Unhealthy,
Healthcheck,
PubSub,
#[default]
Other,
}

impl Display for DisconnectReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let reason = match self {
Self::Idle => "idle",
Self::Old => "max age",
Self::Error => "error",
Self::Other => "other",
Self::ForceClose => "force close",
Self::Paused => "pool paused",
Self::Offline => "pool offline",
Self::OutOfSync => "out of sync",
Self::ReplicationMode => "in replication mode",
Self::Unhealthy => "unhealthy",
Self::Healthcheck => "standalone healthcheck",
Self::PubSub => "pub/sub",
};

write!(f, "{}", reason)
}
}
2 changes: 2 additions & 0 deletions pgdog/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! pgDog backend managers connections to PostgreSQL.
pub mod databases;
pub mod disconnect_reason;
pub mod error;
pub mod maintenance_mode;
pub mod pool;
Expand All @@ -14,6 +15,7 @@ pub mod server;
pub mod server_options;
pub mod stats;

pub use disconnect_reason::DisconnectReason;
pub use error::Error;
pub use pool::{Cluster, ClusterShardConfig, LoadBalancer, Pool, Shard, ShardingSchema};
pub use prepared_statements::PreparedStatements;
Expand Down
24 changes: 21 additions & 3 deletions pgdog/src/backend/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::cmp::max;
use std::collections::VecDeque;
use std::time::Duration;

use crate::backend::DisconnectReason;
use crate::backend::{stats::Counts as BackendCounts, Server};
use crate::net::messages::BackendKeyData;

Expand Down Expand Up @@ -155,12 +156,15 @@ impl Inner {
let max_age = self.config.max_age;
let mut removed = 0;

self.idle_connections.retain(|c| {
self.idle_connections.retain_mut(|c| {
let age = c.age(now);
let keep = age < max_age;
if !keep {
removed += 1;
}
if !keep {
c.disconnect_reason(DisconnectReason::Old);
}
keep
});

Expand All @@ -174,16 +178,22 @@ impl Inner {
let (mut remove, mut removed) = (self.can_remove(), 0);
let idle_timeout = self.config.idle_timeout;

self.idle_connections.retain(|c| {
self.idle_connections.retain_mut(|c| {
let idle_for = c.idle_for(now);

if remove > 0 && idle_for >= idle_timeout {
let keep = if remove > 0 && idle_for >= idle_timeout {
remove -= 1;
removed += 1;
false
} else {
true
};

if !keep {
c.disconnect_reason(DisconnectReason::Idle);
}

keep
});

removed
Expand Down Expand Up @@ -242,6 +252,9 @@ impl Inner {
/// Dump all idle connections.
#[inline]
pub(super) fn dump_idle(&mut self) {
for conn in &mut self.idle_connections {
conn.disconnect_reason(DisconnectReason::Offline);
}
self.idle_connections.clear();
}

Expand Down Expand Up @@ -297,6 +310,7 @@ impl Inner {
if server.error() {
self.errors += 1;
result.server_error = true;
server.disconnect_reason(DisconnectReason::Error);

return result;
}
Expand All @@ -309,18 +323,21 @@ impl Inner {

// Close connections exceeding max age.
if server.age(now) >= self.config.max_age {
server.disconnect_reason(DisconnectReason::Old);
return result;
}

// Force close the connection.
if server.force_close() {
self.force_close += 1;
server.disconnect_reason(DisconnectReason::ForceClose);
return result;
}

// Close connections in replication mode,
// they are generally not re-usable.
if server.replication_mode() {
server.disconnect_reason(DisconnectReason::ReplicationMode);
return result;
}

Expand All @@ -335,6 +352,7 @@ impl Inner {
self.put(server, now);
} else {
self.out_of_sync += 1;
server.disconnect_reason(DisconnectReason::OutOfSync);
}

result
Expand Down
4 changes: 3 additions & 1 deletion pgdog/src/backend/pool/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
use std::time::Duration;

use super::{Error, Guard, Healtcheck, Oids, Pool, Request};
use crate::backend::Server;
use crate::backend::{DisconnectReason, Server};

use tokio::time::{interval, sleep, timeout, Instant};
use tokio::{select, task::spawn};
Expand Down Expand Up @@ -287,6 +287,8 @@ impl Monitor {
.await
.map_err(|_| Error::HealthcheckError)?;

server.disconnect_reason(DisconnectReason::Healthcheck);

Healtcheck::mandatory(&mut server, pool, healthcheck_timeout)
.healthcheck()
.await?;
Expand Down
3 changes: 2 additions & 1 deletion pgdog/src/backend/pool/pool_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::time::{timeout, Instant};
use tracing::error;

use crate::backend::pool::LsnStats;
use crate::backend::{Server, ServerOptions};
use crate::backend::{DisconnectReason, Server, ServerOptions};
use crate::config::PoolerMode;
use crate::net::messages::BackendKeyData;
use crate::net::{Parameter, Parameters};
Expand Down Expand Up @@ -202,6 +202,7 @@ impl Pool {
);

if let Err(err) = healthcheck.healthcheck().await {
conn.disconnect_reason(DisconnectReason::Unhealthy);
drop(conn);
self.inner.health.toggle(false);
return Err(err);
Expand Down
9 changes: 7 additions & 2 deletions pgdog/src/backend/pub_sub/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::{
use tracing::{debug, error, info};

use crate::{
backend::{self, pool::Error, Pool},
backend::{self, pool::Error, DisconnectReason, Pool},
config::config,
net::{
BackendKeyData, FromBytes, NotificationResponse, Parameter, Parameters, Protocol,
Expand Down Expand Up @@ -161,6 +161,7 @@ impl PubSubListener {
info!("pub/sub started [{}]", pool.addr());

let mut server = pool.standalone().await?;

server
.link_client(
&BackendKeyData::new(),
Expand All @@ -179,7 +180,10 @@ impl PubSubListener {
.keys()
.map(|channel| Request::Subscribe(channel.to_string()).into())
.collect::<Vec<ProtocolMessage>>();
server.send(&resub.into()).await?;

if !resub.is_empty() {
server.send(&resub.into()).await?;
}

loop {
select! {
Expand Down Expand Up @@ -214,6 +218,7 @@ impl PubSubListener {
debug!("pub/sub request {:?}", req);
server.send(&vec![req.into()].into()).await?;
} else {
server.disconnect_reason(DisconnectReason::Offline);
break;
}
}
Expand Down
29 changes: 18 additions & 11 deletions pgdog/src/backend/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use tokio::{
use tracing::{debug, error, info, trace, warn};

use super::{
pool::Address, prepared_statements::HandleResult, Error, PreparedStatements, ServerOptions,
Stats,
pool::Address, prepared_statements::HandleResult, DisconnectReason, Error, PreparedStatements,
ServerOptions, Stats,
};
use crate::{
auth::{md5, scram::Client},
Expand Down Expand Up @@ -61,6 +61,7 @@ pub struct Server {
replication_mode: bool,
pooler_mode: PoolerMode,
stream_buffer: MessageBuffer,
disconnect_reason: Option<DisconnectReason>,
}

impl MemoryUsage for Server {
Expand Down Expand Up @@ -265,6 +266,7 @@ impl Server {
re_synced: false,
pooler_mode: PoolerMode::Transaction,
stream_buffer: MessageBuffer::new(cfg.config.memory.message_buffer),
disconnect_reason: None,
};

server.stats.memory_used(server.memory_stats()); // Stream capacity.
Expand Down Expand Up @@ -839,6 +841,13 @@ impl Server {
self.re_synced
}

#[inline]
pub fn disconnect_reason(&mut self, reason: DisconnectReason) {
if self.disconnect_reason.is_none() {
self.disconnect_reason = Some(reason);
}
}

/// Server connection unique identifier.
#[inline]
pub fn id(&self) -> &BackendKeyData {
Expand Down Expand Up @@ -956,15 +965,12 @@ impl Drop for Server {
fn drop(&mut self) {
self.stats().disconnect();
if let Some(mut stream) = self.stream.take() {
// If you see a lot of these, tell your clients
// to not send queries unless they are willing to stick
// around for results.
let out_of_sync = if self.done() {
" ".into()
} else {
format!(" {} ", self.stats().state)
};
info!("closing{}server connection [{}]", out_of_sync, self.addr,);
info!(
"closing server connection [{}, state: {}, reason: {}]",
self.addr,
self.stats.state,
self.disconnect_reason.take().unwrap_or_default(),
);

spawn(async move {
stream.write_all(&Terminate.to_bytes()?).await?;
Expand Down Expand Up @@ -1010,6 +1016,7 @@ pub mod test {
replication_mode: false,
pooler_mode: PoolerMode::Transaction,
stream_buffer: MessageBuffer::new(4096),
disconnect_reason: None,
}
}
}
Expand Down
Loading