Skip to content

Commit 0182f8e

Browse files
authored
feat: add reason for connection closing (#664)
Helps with debugging settings.
1 parent 8dd51c3 commit 0182f8e

File tree

7 files changed

+92
-18
lines changed

7 files changed

+92
-18
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use std::fmt::Display;
2+
3+
#[derive(Debug, Clone, Copy, Default)]
4+
pub enum DisconnectReason {
5+
Idle,
6+
Old,
7+
Error,
8+
Offline,
9+
ForceClose,
10+
Paused,
11+
ReplicationMode,
12+
OutOfSync,
13+
Unhealthy,
14+
Healthcheck,
15+
PubSub,
16+
#[default]
17+
Other,
18+
}
19+
20+
impl Display for DisconnectReason {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
let reason = match self {
23+
Self::Idle => "idle",
24+
Self::Old => "max age",
25+
Self::Error => "error",
26+
Self::Other => "other",
27+
Self::ForceClose => "force close",
28+
Self::Paused => "pool paused",
29+
Self::Offline => "pool offline",
30+
Self::OutOfSync => "out of sync",
31+
Self::ReplicationMode => "in replication mode",
32+
Self::Unhealthy => "unhealthy",
33+
Self::Healthcheck => "standalone healthcheck",
34+
Self::PubSub => "pub/sub",
35+
};
36+
37+
write!(f, "{}", reason)
38+
}
39+
}

pgdog/src/backend/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! pgDog backend managers connections to PostgreSQL.
22
33
pub mod databases;
4+
pub mod disconnect_reason;
45
pub mod error;
56
pub mod maintenance_mode;
67
pub mod pool;
@@ -14,6 +15,7 @@ pub mod server;
1415
pub mod server_options;
1516
pub mod stats;
1617

18+
pub use disconnect_reason::DisconnectReason;
1719
pub use error::Error;
1820
pub use pool::{Cluster, ClusterShardConfig, LoadBalancer, Pool, Shard, ShardingSchema};
1921
pub use prepared_statements::PreparedStatements;

pgdog/src/backend/pool/inner.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::cmp::max;
44
use std::collections::VecDeque;
55
use std::time::Duration;
66

7+
use crate::backend::DisconnectReason;
78
use crate::backend::{stats::Counts as BackendCounts, Server};
89
use crate::net::messages::BackendKeyData;
910

@@ -155,12 +156,15 @@ impl Inner {
155156
let max_age = self.config.max_age;
156157
let mut removed = 0;
157158

158-
self.idle_connections.retain(|c| {
159+
self.idle_connections.retain_mut(|c| {
159160
let age = c.age(now);
160161
let keep = age < max_age;
161162
if !keep {
162163
removed += 1;
163164
}
165+
if !keep {
166+
c.disconnect_reason(DisconnectReason::Old);
167+
}
164168
keep
165169
});
166170

@@ -174,16 +178,22 @@ impl Inner {
174178
let (mut remove, mut removed) = (self.can_remove(), 0);
175179
let idle_timeout = self.config.idle_timeout;
176180

177-
self.idle_connections.retain(|c| {
181+
self.idle_connections.retain_mut(|c| {
178182
let idle_for = c.idle_for(now);
179183

180-
if remove > 0 && idle_for >= idle_timeout {
184+
let keep = if remove > 0 && idle_for >= idle_timeout {
181185
remove -= 1;
182186
removed += 1;
183187
false
184188
} else {
185189
true
190+
};
191+
192+
if !keep {
193+
c.disconnect_reason(DisconnectReason::Idle);
186194
}
195+
196+
keep
187197
});
188198

189199
removed
@@ -242,6 +252,9 @@ impl Inner {
242252
/// Dump all idle connections.
243253
#[inline]
244254
pub(super) fn dump_idle(&mut self) {
255+
for conn in &mut self.idle_connections {
256+
conn.disconnect_reason(DisconnectReason::Offline);
257+
}
245258
self.idle_connections.clear();
246259
}
247260

@@ -297,6 +310,7 @@ impl Inner {
297310
if server.error() {
298311
self.errors += 1;
299312
result.server_error = true;
313+
server.disconnect_reason(DisconnectReason::Error);
300314

301315
return result;
302316
}
@@ -309,18 +323,21 @@ impl Inner {
309323

310324
// Close connections exceeding max age.
311325
if server.age(now) >= self.config.max_age {
326+
server.disconnect_reason(DisconnectReason::Old);
312327
return result;
313328
}
314329

315330
// Force close the connection.
316331
if server.force_close() {
317332
self.force_close += 1;
333+
server.disconnect_reason(DisconnectReason::ForceClose);
318334
return result;
319335
}
320336

321337
// Close connections in replication mode,
322338
// they are generally not re-usable.
323339
if server.replication_mode() {
340+
server.disconnect_reason(DisconnectReason::ReplicationMode);
324341
return result;
325342
}
326343

@@ -335,6 +352,7 @@ impl Inner {
335352
self.put(server, now);
336353
} else {
337354
self.out_of_sync += 1;
355+
server.disconnect_reason(DisconnectReason::OutOfSync);
338356
}
339357

340358
result

pgdog/src/backend/pool/monitor.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
use std::time::Duration;
3636

3737
use super::{Error, Guard, Healtcheck, Oids, Pool, Request};
38-
use crate::backend::Server;
38+
use crate::backend::{DisconnectReason, Server};
3939

4040
use tokio::time::{interval, sleep, timeout, Instant};
4141
use tokio::{select, task::spawn};
@@ -287,6 +287,8 @@ impl Monitor {
287287
.await
288288
.map_err(|_| Error::HealthcheckError)?;
289289

290+
server.disconnect_reason(DisconnectReason::Healthcheck);
291+
290292
Healtcheck::mandatory(&mut server, pool, healthcheck_timeout)
291293
.healthcheck()
292294
.await?;

pgdog/src/backend/pool/pool_impl.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tokio::time::{timeout, Instant};
1111
use tracing::error;
1212

1313
use crate::backend::pool::LsnStats;
14-
use crate::backend::{Server, ServerOptions};
14+
use crate::backend::{DisconnectReason, Server, ServerOptions};
1515
use crate::config::PoolerMode;
1616
use crate::net::messages::BackendKeyData;
1717
use crate::net::{Parameter, Parameters};
@@ -202,6 +202,7 @@ impl Pool {
202202
);
203203

204204
if let Err(err) = healthcheck.healthcheck().await {
205+
conn.disconnect_reason(DisconnectReason::Unhealthy);
205206
drop(conn);
206207
self.inner.health.toggle(false);
207208
return Err(err);

pgdog/src/backend/pub_sub/listener.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio::{
1515
use tracing::{debug, error, info};
1616

1717
use crate::{
18-
backend::{self, pool::Error, Pool},
18+
backend::{self, pool::Error, DisconnectReason, Pool},
1919
config::config,
2020
net::{
2121
BackendKeyData, FromBytes, NotificationResponse, Parameter, Parameters, Protocol,
@@ -161,6 +161,7 @@ impl PubSubListener {
161161
info!("pub/sub started [{}]", pool.addr());
162162

163163
let mut server = pool.standalone().await?;
164+
164165
server
165166
.link_client(
166167
&BackendKeyData::new(),
@@ -179,7 +180,10 @@ impl PubSubListener {
179180
.keys()
180181
.map(|channel| Request::Subscribe(channel.to_string()).into())
181182
.collect::<Vec<ProtocolMessage>>();
182-
server.send(&resub.into()).await?;
183+
184+
if !resub.is_empty() {
185+
server.send(&resub.into()).await?;
186+
}
183187

184188
loop {
185189
select! {
@@ -214,6 +218,7 @@ impl PubSubListener {
214218
debug!("pub/sub request {:?}", req);
215219
server.send(&vec![req.into()].into()).await?;
216220
} else {
221+
server.disconnect_reason(DisconnectReason::Offline);
217222
break;
218223
}
219224
}

pgdog/src/backend/server.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use tokio::{
1313
use tracing::{debug, error, info, trace, warn};
1414

1515
use super::{
16-
pool::Address, prepared_statements::HandleResult, Error, PreparedStatements, ServerOptions,
17-
Stats,
16+
pool::Address, prepared_statements::HandleResult, DisconnectReason, Error, PreparedStatements,
17+
ServerOptions, Stats,
1818
};
1919
use crate::{
2020
auth::{md5, scram::Client},
@@ -61,6 +61,7 @@ pub struct Server {
6161
replication_mode: bool,
6262
pooler_mode: PoolerMode,
6363
stream_buffer: MessageBuffer,
64+
disconnect_reason: Option<DisconnectReason>,
6465
}
6566

6667
impl MemoryUsage for Server {
@@ -265,6 +266,7 @@ impl Server {
265266
re_synced: false,
266267
pooler_mode: PoolerMode::Transaction,
267268
stream_buffer: MessageBuffer::new(cfg.config.memory.message_buffer),
269+
disconnect_reason: None,
268270
};
269271

270272
server.stats.memory_used(server.memory_stats()); // Stream capacity.
@@ -839,6 +841,13 @@ impl Server {
839841
self.re_synced
840842
}
841843

844+
#[inline]
845+
pub fn disconnect_reason(&mut self, reason: DisconnectReason) {
846+
if self.disconnect_reason.is_none() {
847+
self.disconnect_reason = Some(reason);
848+
}
849+
}
850+
842851
/// Server connection unique identifier.
843852
#[inline]
844853
pub fn id(&self) -> &BackendKeyData {
@@ -956,15 +965,12 @@ impl Drop for Server {
956965
fn drop(&mut self) {
957966
self.stats().disconnect();
958967
if let Some(mut stream) = self.stream.take() {
959-
// If you see a lot of these, tell your clients
960-
// to not send queries unless they are willing to stick
961-
// around for results.
962-
let out_of_sync = if self.done() {
963-
" ".into()
964-
} else {
965-
format!(" {} ", self.stats().state)
966-
};
967-
info!("closing{}server connection [{}]", out_of_sync, self.addr,);
968+
info!(
969+
"closing server connection [{}, state: {}, reason: {}]",
970+
self.addr,
971+
self.stats.state,
972+
self.disconnect_reason.take().unwrap_or_default(),
973+
);
968974

969975
spawn(async move {
970976
stream.write_all(&Terminate.to_bytes()?).await?;
@@ -1010,6 +1016,7 @@ pub mod test {
10101016
replication_mode: false,
10111017
pooler_mode: PoolerMode::Transaction,
10121018
stream_buffer: MessageBuffer::new(4096),
1019+
disconnect_reason: None,
10131020
}
10141021
}
10151022
}

0 commit comments

Comments
 (0)