From 046217b2417bea11190560168983700e2b743d5d Mon Sep 17 00:00:00 2001 From: Jane Lewis Date: Mon, 8 Sep 2025 09:57:21 -0700 Subject: [PATCH 1/2] Implement `RecordStream`, which handles both synchronous and asynchronous iteration over a `Recordset` --- aerospike-core/src/query/recordset.rs | 56 +++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/aerospike-core/src/query/recordset.rs b/aerospike-core/src/query/recordset.rs index f21602c..fdf070d 100644 --- a/aerospike-core/src/query/recordset.rs +++ b/aerospike-core/src/query/recordset.rs @@ -41,8 +41,12 @@ pub struct Recordset { active: AtomicBool, task_id: AtomicUsize, pub(crate) tracker: Arc>, + stream_count: AtomicUsize, } +/// A stream over incoming records for a [`Recordset`] that can be iterated over either synchronously or asynchronously. +pub struct RecordStream(Arc); + impl Drop for Recordset { fn drop(&mut self) { // close the recordset to finish all the commands sending data @@ -67,6 +71,7 @@ impl Recordset { active: AtomicBool::new(true), task_id: AtomicUsize::new(task_id), tracker: tracker, + stream_count: AtomicUsize::new(0), } } @@ -133,17 +138,34 @@ impl Recordset { } None } + + /// Converts a reference to a [`Recordset`] into a [`RecordStream`] that can be used + /// to iterate over records. + /// Only one stream can exist at a time. If one already exists, + /// this method will return `None`. + pub fn into_stream(self: Arc) -> Option { + if self.stream_count.load(Ordering::Relaxed) > 0 { + return None; + } + self.stream_count.fetch_add(1, Ordering::Relaxed); + Some(RecordStream(self)) + } + + /// Notify the recordset that a stream is closing. + fn close_stream(&self) { + self.stream_count.fetch_sub(1, Ordering::Relaxed); + } } -impl<'a> Iterator for &'a Recordset { +impl<'a> Iterator for RecordStream { type Item = Result; fn next(&mut self) -> Option> { loop { - if self.is_active() || !self.record_queue.is_empty() { - let result = self.record_queue.pop(); + if self.0.is_active() || !self.0.record_queue.is_empty() { + let result = self.0.record_queue.pop(); if result.is_some() { - self.record_queue_count.fetch_sub(1, Ordering::Relaxed); + self.0.record_queue_count.fetch_sub(1, Ordering::Relaxed); return result; } // aerospike_rt::task::yield_now().await; @@ -154,3 +176,29 @@ impl<'a> Iterator for &'a Recordset { } } } + +impl futures::Stream for RecordStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.0.is_active() || !self.0.record_queue.is_empty() { + if let Some(result) = self.0.record_queue.pop() { + self.0.record_queue_count.fetch_sub(1, Ordering::Relaxed); + return std::task::Poll::Ready(Some(result)); + } + cx.waker().wake_by_ref(); + std::task::Poll::Pending + } else { + std::task::Poll::Ready(None) + } + } +} + +impl Drop for RecordStream { + fn drop(&mut self) { + self.0.close_stream(); + } +} From a5dc1111b3ac9d8aadfbec334a196757f001dbfc Mon Sep 17 00:00:00 2001 From: Jane Lewis Date: Mon, 8 Sep 2025 10:17:08 -0700 Subject: [PATCH 2/2] Implement `AsRef` for `RecordStream` to expose the underlying `Recordset` --- aerospike-core/src/query/recordset.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/aerospike-core/src/query/recordset.rs b/aerospike-core/src/query/recordset.rs index fdf070d..9f5603f 100644 --- a/aerospike-core/src/query/recordset.rs +++ b/aerospike-core/src/query/recordset.rs @@ -197,6 +197,12 @@ impl futures::Stream for RecordStream { } } +impl AsRef for RecordStream { + fn as_ref(&self) -> &Recordset { + &self.0 + } +} + impl Drop for RecordStream { fn drop(&mut self) { self.0.close_stream();