Skip to content

Commit dd48e76

Browse files
authored
Add database connection pool metrics (#4742)
1 parent a6d462a commit dd48e76

File tree

7 files changed

+200
-11
lines changed

7 files changed

+200
-11
lines changed

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ use quickwit_proto::metastore::{
4848
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId};
4949
use sea_query::{Asterisk, PostgresQueryBuilder, Query};
5050
use sea_query_binder::SqlxBinder;
51-
use sqlx::{Executor, Pool, Postgres, Transaction};
51+
use sqlx::{Acquire, Executor, Postgres, Transaction};
5252
use tracing::{debug, info, instrument, warn};
5353

5454
use super::error::convert_sqlx_err;
5555
use super::migrator::run_migrations;
5656
use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits};
57+
use super::pool::TrackedPool;
5758
use super::split_stream::SplitStream;
5859
use super::utils::{append_query_filters, establish_connection};
5960
use crate::checkpoint::{
@@ -71,7 +72,7 @@ use crate::{
7172
#[derive(Clone)]
7273
pub struct PostgresqlMetastore {
7374
uri: Uri,
74-
connection_pool: Pool<Postgres>,
75+
connection_pool: TrackedPool<Postgres>,
7576
}
7677

7778
impl fmt::Debug for PostgresqlMetastore {
@@ -709,7 +710,7 @@ impl MetastoreService for PostgresqlMetastore {
709710
let pg_split_stream = SplitStream::new(
710711
self.connection_pool.clone(),
711712
sql,
712-
|connection_pool: &Pool<Postgres>, sql: &String| {
713+
|connection_pool: &TrackedPool<Postgres>, sql: &String| {
713714
sqlx::query_as_with::<_, PgSplit, _>(sql, values).fetch(connection_pool)
714715
},
715716
);
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright (C) 2024 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at hello@quickwit.io.
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use once_cell::sync::Lazy;
21+
use quickwit_common::metrics::{new_gauge, IntGauge};
22+
23+
#[derive(Clone)]
24+
pub(super) struct PostgresMetrics {
25+
pub acquire_connections: IntGauge,
26+
pub active_connections: IntGauge,
27+
pub idle_connections: IntGauge,
28+
}
29+
30+
impl Default for PostgresMetrics {
31+
fn default() -> Self {
32+
Self {
33+
acquire_connections: new_gauge(
34+
"acquire_connections",
35+
"Number of connections being acquired.",
36+
"metastore",
37+
&[],
38+
),
39+
active_connections: new_gauge(
40+
"active_connections",
41+
"Number of active (used + idle) connections.",
42+
"metastore",
43+
&[],
44+
),
45+
idle_connections: new_gauge(
46+
"idle_connections",
47+
"Number of idle connections.",
48+
"metastore",
49+
&[],
50+
),
51+
}
52+
}
53+
}
54+
55+
pub(super) static POSTGRES_METRICS: Lazy<PostgresMetrics> = Lazy::new(PostgresMetrics::default);

quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919

2020
use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
2121
use sqlx::migrate::Migrator;
22-
use sqlx::{Pool, Postgres};
22+
use sqlx::{Acquire, Postgres};
2323
use tracing::{error, instrument};
2424

25+
use super::pool::TrackedPool;
26+
2527
static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql");
2628

2729
/// Initializes the database and runs the SQL migrations stored in the
2830
/// `quickwit-metastore/migrations` directory.
2931
#[instrument(skip_all)]
30-
pub(super) async fn run_migrations(pool: &Pool<Postgres>) -> MetastoreResult<()> {
32+
pub(super) async fn run_migrations(pool: &TrackedPool<Postgres>) -> MetastoreResult<()> {
3133
let tx = pool.begin().await?;
3234
let migrate_result = MIGRATOR.run(pool).await;
3335

quickwit/quickwit-metastore/src/metastore/postgres/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
mod error;
2121
mod factory;
2222
mod metastore;
23+
mod metrics;
2324
mod migrator;
2425
mod model;
26+
mod pool;
2527
mod split_stream;
2628
mod tags;
2729
mod utils;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (C) 2024 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at hello@quickwit.io.
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use futures::future::BoxFuture;
21+
use futures::stream::BoxStream;
22+
use quickwit_common::metrics::GaugeGuard;
23+
use sqlx::database::HasStatement;
24+
use sqlx::pool::maybe::MaybePoolConnection;
25+
use sqlx::pool::PoolConnection;
26+
use sqlx::{
27+
Acquire, Database, Describe, Either, Error, Execute, Executor, Pool, Postgres, Transaction,
28+
};
29+
30+
use super::metrics::POSTGRES_METRICS;
31+
32+
#[derive(Debug)]
33+
pub(super) struct TrackedPool<DB: Database> {
34+
inner_pool: Pool<DB>,
35+
}
36+
37+
impl TrackedPool<Postgres> {
38+
pub fn new(inner_pool: Pool<Postgres>) -> Self {
39+
Self { inner_pool }
40+
}
41+
}
42+
43+
impl<DB: Database> Clone for TrackedPool<DB> {
44+
fn clone(&self) -> Self {
45+
Self {
46+
inner_pool: self.inner_pool.clone(),
47+
}
48+
}
49+
}
50+
51+
impl<'a, DB: Database> Acquire<'a> for &TrackedPool<DB> {
52+
type Database = DB;
53+
54+
type Connection = PoolConnection<DB>;
55+
56+
fn acquire(self) -> BoxFuture<'static, Result<Self::Connection, Error>> {
57+
let acquire_conn_fut = self.inner_pool.acquire();
58+
59+
POSTGRES_METRICS
60+
.active_connections
61+
.set(self.inner_pool.size() as i64);
62+
POSTGRES_METRICS
63+
.idle_connections
64+
.set(self.inner_pool.num_idle() as i64);
65+
66+
Box::pin(async move {
67+
let mut gauge_guard = GaugeGuard::from_gauge(&POSTGRES_METRICS.acquire_connections);
68+
gauge_guard.add(1);
69+
70+
let conn = acquire_conn_fut.await?;
71+
Ok(conn)
72+
})
73+
}
74+
75+
fn begin(self) -> BoxFuture<'static, Result<Transaction<'a, DB>, Error>> {
76+
let acquire_conn_fut = self.acquire();
77+
78+
Box::pin(async move {
79+
Transaction::begin(MaybePoolConnection::PoolConnection(acquire_conn_fut.await?)).await
80+
})
81+
}
82+
}
83+
84+
impl<'p, DB: Database> Executor<'p> for &'_ TrackedPool<DB>
85+
where for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>
86+
{
87+
type Database = DB;
88+
89+
fn fetch_many<'e, 'q: 'e, E: 'q>(
90+
self,
91+
query: E,
92+
) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
93+
where
94+
E: Execute<'q, Self::Database>,
95+
{
96+
self.inner_pool.fetch_many(query)
97+
}
98+
99+
fn fetch_optional<'e, 'q: 'e, E: 'q>(
100+
self,
101+
query: E,
102+
) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
103+
where
104+
E: Execute<'q, Self::Database>,
105+
{
106+
self.inner_pool.fetch_optional(query)
107+
}
108+
109+
fn prepare_with<'e, 'q: 'e>(
110+
self,
111+
sql: &'q str,
112+
parameters: &'e [<Self::Database as Database>::TypeInfo],
113+
) -> BoxFuture<'e, Result<<Self::Database as HasStatement<'q>>::Statement, Error>> {
114+
self.inner_pool.prepare_with(sql, parameters)
115+
}
116+
117+
#[doc(hidden)]
118+
fn describe<'e, 'q: 'e>(
119+
self,
120+
sql: &'q str,
121+
) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>> {
122+
self.inner_pool.describe(sql)
123+
}
124+
}

quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ use std::task::{Context, Poll};
2222

2323
use futures::stream::BoxStream;
2424
use ouroboros::self_referencing;
25-
use sqlx::{Pool, Postgres};
25+
use sqlx::Postgres;
2626
use tokio_stream::Stream;
2727

28+
use super::pool::TrackedPool;
29+
2830
#[self_referencing(pub_extras)]
2931
pub struct SplitStream<T> {
30-
connection_pool: Pool<Postgres>,
32+
connection_pool: TrackedPool<Postgres>,
3133
sql: String,
3234
#[borrows(connection_pool, sql)]
3335
#[covariant]

quickwit/quickwit-metastore/src/metastore/postgres/utils.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ use quickwit_common::uri::Uri;
2626
use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
2727
use sea_query::{any, Expr, Func, Order, SelectStatement};
2828
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
29-
use sqlx::{ConnectOptions, Pool, Postgres};
29+
use sqlx::{ConnectOptions, Postgres};
3030
use tracing::error;
3131
use tracing::log::LevelFilter;
3232

3333
use super::model::{Splits, ToTimestampFunc};
34+
use super::pool::TrackedPool;
3435
use super::tags::generate_sql_condition;
3536
use crate::metastore::FilterRange;
3637
use crate::{ListSplitsQuery, SplitMaturity, SplitMetadata};
@@ -43,7 +44,7 @@ pub(super) async fn establish_connection(
4344
acquire_timeout: Duration,
4445
idle_timeout_opt: Option<Duration>,
4546
max_lifetime_opt: Option<Duration>,
46-
) -> MetastoreResult<Pool<Postgres>> {
47+
) -> MetastoreResult<TrackedPool<Postgres>> {
4748
let pool_options = PgPoolOptions::new()
4849
.min_connections(min_connections as u32)
4950
.max_connections(max_connections as u32)
@@ -53,15 +54,17 @@ pub(super) async fn establish_connection(
5354
let connect_options: PgConnectOptions = PgConnectOptions::from_str(connection_uri.as_str())?
5455
.application_name("quickwit-metastore")
5556
.log_statements(LevelFilter::Info);
56-
pool_options
57+
let sqlx_pool = pool_options
5758
.connect_with(connect_options)
5859
.await
5960
.map_err(|error| {
6061
error!(connection_uri=%connection_uri, error=?error, "failed to establish connection to database");
6162
MetastoreError::Connection {
6263
message: error.to_string(),
6364
}
64-
})
65+
})?;
66+
let tracked_pool = TrackedPool::new(sqlx_pool);
67+
Ok(tracked_pool)
6568
}
6669

6770
/// Extends an existing SQL string with the generated filter range appended to the query.

0 commit comments

Comments
 (0)