From b6bc1723cd439b85505663426d318f09d66d0530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Wed, 27 Aug 2025 14:46:39 +0000 Subject: [PATCH] Convert poll_new_conn to an async function I want to move the manual async bits to the Waitlist implementation, but for that I need to change the locks, which should be done in its own patch. This at least already makes the manual async code quite a bit smaller. --- src/conn/pool/futures/get_conn.rs | 5 ++- src/conn/pool/mod.rs | 57 +++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 3cb0ae68..aac0d990 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -6,7 +6,7 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. -use std::{fmt, future::poll_fn, task::Context}; +use std::fmt; use crate::{ conn::{ @@ -79,8 +79,7 @@ pub(crate) async fn get_conn(pool: Pool) -> Result { match state.inner { GetConnInner::New => { let pool = state.pool_mut(); - let poll_new = |cx: &mut Context<'_>| pool.poll_new_conn(cx, queue_id); - let next = poll_fn(poll_new).await?; + let next = pool.new_conn(queue_id).await?; match next { GetConnInner::Connecting(conn_fut) => { state.inner = GetConnInner::Connecting(conn_fut); diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index ebec5231..8c016aff 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -11,6 +11,7 @@ use tokio::sync::mpsc; use std::{ collections::VecDeque, + future::poll_fn, future::Future, str::FromStr, sync::{atomic, Arc, Mutex}, @@ -232,19 +233,41 @@ impl Pool { exchange.waiting.wake(); } - /// Poll the pool for an available connection. - fn poll_new_conn( - &mut self, - cx: &mut Context<'_>, - queue_id: QueueId, - ) -> Poll> { + fn queue(&mut self, cx: &mut Context<'_>, queue_id: QueueId) -> Poll<()> { + let mut exchange = self.inner.exchange.lock().unwrap(); + exchange.waiting.push(cx.waker().clone(), queue_id); + Poll::Ready(()) + } + + fn poll_higher_priority(&mut self, cx: &mut Context<'_>, queue_id: QueueId) -> Poll<()> { + let mut exchange = self.inner.exchange.lock().unwrap(); + let highest = if let Some(cur) = exchange.waiting.peek_id() { + queue_id > cur + } else { + true + }; + if highest { + Poll::Ready(()) + } else { + // to make sure the waker is updated + exchange.waiting.push(cx.waker().clone(), queue_id); + Poll::Pending + } + } + + async fn queue_and_wait(&mut self, queue_id: QueueId) { + poll_fn(|cx| self.queue(cx, queue_id)).await; + poll_fn(|cx| self.poll_higher_priority(cx, queue_id)).await; + } + + fn try_new_conn(&mut self, queue_id: QueueId) -> Result> { let mut exchange = self.inner.exchange.lock().unwrap(); // NOTE: this load must happen while we hold the lock, // otherwise the recycler may choose to exit, see that .exist == 0, and then exit, // and then we decide to create a new connection, which would then never be torn down. if self.inner.close.load(atomic::Ordering::Acquire) { - return Err(Error::Driver(DriverError::PoolDisconnected)).into(); + return Err(Error::Driver(DriverError::PoolDisconnected)); } exchange.spawn_futures_if_needed(&self.inner); @@ -258,8 +281,7 @@ impl Pool { // If we are not, just queue if !highest { - exchange.waiting.push(cx.waker().clone(), queue_id); - return Poll::Pending; + return Ok(None); } #[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled @@ -275,7 +297,7 @@ impl Pool { #[cfg(feature = "hdrhistogram")] let metrics = self.metrics(); conn.inner.active_since = Instant::now(); - return Poll::Ready(Ok(GetConnInner::Checking( + return Ok(Some(GetConnInner::Checking( async move { conn.stream_mut()?.check().await?; #[cfg(feature = "hdrhistogram")] @@ -315,7 +337,7 @@ impl Pool { #[cfg(feature = "hdrhistogram")] let metrics = self.metrics(); - return Poll::Ready(Ok(GetConnInner::Connecting( + return Ok(Some(GetConnInner::Connecting( async move { let conn = Conn::new(opts).await; #[cfg(feature = "hdrhistogram")] @@ -333,10 +355,17 @@ impl Pool { .boxed(), ))); } + Ok(None) + } - // Polled, but no conn available? Back into the queue. - exchange.waiting.push(cx.waker().clone(), queue_id); - Poll::Pending + /// Get a new connection from the pool. + async fn new_conn(&mut self, queue_id: QueueId) -> Result { + loop { + if let Some(conn) = self.try_new_conn(queue_id)? { + return Ok(conn); + } + self.queue_and_wait(queue_id).await; + } } fn unqueue(&self, queue_id: QueueId) {