From d552c6fff81c3589c1feb5bf788d25714019cba9 Mon Sep 17 00:00:00 2001 From: Joshua Potts <8704475+iamjpotts@users.noreply.github.com> Date: Fri, 8 Aug 2025 09:45:26 -0400 Subject: [PATCH] refactor(any): Implement fetch_optional in terms of fetch_many Signed-off-by: Joshua Potts <8704475+iamjpotts@users.noreply.github.com> --- sqlx-core/src/any/connection/backend.rs | 15 ++++++- sqlx-mysql/src/any.rs | 30 +------------ sqlx-postgres/src/any.rs | 28 +----------- sqlx-sqlite/src/any.rs | 59 ++++++++++++++----------- 4 files changed, 50 insertions(+), 82 deletions(-) diff --git a/sqlx-core/src/any/connection/backend.rs b/sqlx-core/src/any/connection/backend.rs index e59b345ed9..b71f3355d6 100644 --- a/sqlx-core/src/any/connection/backend.rs +++ b/sqlx-core/src/any/connection/backend.rs @@ -4,6 +4,7 @@ use crate::sql_str::SqlStr; use either::Either; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; +use futures_util::TryStreamExt; use std::fmt::Debug; pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static { @@ -106,7 +107,19 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static { query: SqlStr, persistent: bool, arguments: Option>, - ) -> BoxFuture<'q, crate::Result>>; + ) -> BoxFuture<'q, crate::Result>> { + let mut stream = self.fetch_many(query, persistent, arguments); + + Box::pin(async move { + while let Some(result) = stream.try_next().await? { + if let Either::Right(row) = result { + return Ok(Some(row)); + } + } + + Ok(None) + }) + } fn prepare_with<'c, 'q: 'c>( &'c mut self, diff --git a/sqlx-mysql/src/any.rs b/sqlx-mysql/src/any.rs index 70b7ad4511..a00fffc1de 100644 --- a/sqlx-mysql/src/any.rs +++ b/sqlx-mysql/src/any.rs @@ -6,7 +6,7 @@ use crate::{ use either::Either; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{stream, FutureExt, StreamExt, TryFutureExt}; use sqlx_core::any::{ Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo, AnyTypeInfoKind, @@ -17,7 +17,7 @@ use sqlx_core::describe::Describe; use sqlx_core::executor::Executor; use sqlx_core::sql_str::SqlStr; use sqlx_core::transaction::TransactionManager; -use std::{future, pin::pin}; +use std::future; sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql); @@ -103,32 +103,6 @@ impl AnyConnectionBackend for MySqlConnection { ) } - fn fetch_optional<'q>( - &'q mut self, - query: SqlStr, - persistent: bool, - arguments: Option>, - ) -> BoxFuture<'q, sqlx_core::Result>> { - let persistent = persistent && arguments.is_some(); - let arguments = arguments - .map(AnyArguments::convert_into) - .transpose() - .map_err(sqlx_core::Error::Encode); - - Box::pin(async move { - let arguments = arguments?; - let mut stream = pin!(self.run(query, arguments, persistent).await?); - - while let Some(result) = stream.try_next().await? { - if let Either::Right(row) = result { - return Ok(Some(AnyRow::try_from(&row)?)); - } - } - - Ok(None) - }) - } - fn prepare_with<'c, 'q: 'c>( &'c mut self, sql: SqlStr, diff --git a/sqlx-postgres/src/any.rs b/sqlx-postgres/src/any.rs index d24145637c..8c14b8c5b3 100644 --- a/sqlx-postgres/src/any.rs +++ b/sqlx-postgres/src/any.rs @@ -4,9 +4,9 @@ use crate::{ }; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{stream, FutureExt, StreamExt, TryFutureExt}; use sqlx_core::sql_str::SqlStr; -use std::{future, pin::pin}; +use std::future; use sqlx_core::any::{ Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow, @@ -105,30 +105,6 @@ impl AnyConnectionBackend for PgConnection { ) } - fn fetch_optional<'q>( - &'q mut self, - query: SqlStr, - persistent: bool, - arguments: Option>, - ) -> BoxFuture<'q, sqlx_core::Result>> { - let persistent = persistent && arguments.is_some(); - let arguments = arguments - .map(AnyArguments::convert_into) - .transpose() - .map_err(sqlx_core::Error::Encode); - - Box::pin(async move { - let arguments = arguments?; - let mut stream = pin!(self.run(query, arguments, persistent, None).await?); - - if let Some(Either::Right(row)) = stream.try_next().await? { - return Ok(Some(AnyRow::try_from(&row)?)); - } - - Ok(None) - }) - } - fn prepare_with<'c, 'q: 'c>( &'c mut self, sql: SqlStr, diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 636f986bf5..98721b93b9 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -19,7 +19,6 @@ use sqlx_core::database::Database; use sqlx_core::describe::Describe; use sqlx_core::executor::Executor; use sqlx_core::transaction::TransactionManager; -use std::pin::pin; use std::sync::Arc; sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite); @@ -86,21 +85,7 @@ impl AnyConnectionBackend for SqliteConnection { persistent: bool, arguments: Option>, ) -> BoxStream<'q, sqlx_core::Result>> { - let persistent = persistent && arguments.is_some(); - let args = arguments.map(map_arguments); - - Box::pin( - self.worker - .execute(query, args, self.row_channel_size, persistent, None) - .map_ok(flume::Receiver::into_stream) - .try_flatten_stream() - .map( - move |res: sqlx_core::Result>| match res? { - Either::Left(result) => Ok(Either::Left(map_result(result))), - Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)), - }, - ), - ) + self.fetch_with_limit(query, persistent, arguments, None) } fn fetch_optional<'q>( @@ -109,19 +94,13 @@ impl AnyConnectionBackend for SqliteConnection { persistent: bool, arguments: Option>, ) -> BoxFuture<'q, sqlx_core::Result>> { - let persistent = persistent && arguments.is_some(); - let args = arguments.map(map_arguments); + let mut stream = self.fetch_with_limit(query, persistent, arguments, Some(1)); Box::pin(async move { - let mut stream = pin!( - self.worker - .execute(query, args, self.row_channel_size, persistent, Some(1)) - .map_ok(flume::Receiver::into_stream) - .await? - ); - - if let Some(Either::Right(row)) = stream.try_next().await? { - return Ok(Some(AnyRow::try_from(&row)?)); + while let Some(result) = stream.try_next().await? { + if let Either::Right(row) = result { + return Ok(Some(row)); + } } Ok(None) @@ -145,6 +124,32 @@ impl AnyConnectionBackend for SqliteConnection { } } +impl SqliteConnection { + fn fetch_with_limit( + &mut self, + query: SqlStr, + persistent: bool, + arguments: Option, + limit: Option, + ) -> BoxStream<'_, sqlx_core::Result>> { + let persistent = persistent && arguments.is_some(); + let args = arguments.map(map_arguments); + + Box::pin( + self.worker + .execute(query, args, self.row_channel_size, persistent, limit) + .map_ok(flume::Receiver::into_stream) + .try_flatten_stream() + .map( + move |res: sqlx_core::Result>| match res? { + Either::Left(result) => Ok(Either::Left(map_result(result))), + Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)), + }, + ), + ) + } +} + impl<'a> TryFrom<&'a SqliteTypeInfo> for AnyTypeInfo { type Error = sqlx_core::Error;