diff --git a/src/pubsub/src/publisher/actor.rs b/src/pubsub/src/publisher/actor.rs index f581e24a6b..b58b2613f4 100644 --- a/src/pubsub/src/publisher/actor.rs +++ b/src/pubsub/src/publisher/actor.rs @@ -479,14 +479,21 @@ impl SequentialBatchActor { #[cfg(test)] mod tests { use super::{ConcurrentBatchActor, SequentialBatchActor}; + use crate::error::PublishError; use crate::publisher::actor::{BundledMessage, ToBatchActor}; use crate::publisher::options::BatchingOptions; use crate::{ generated::gapic_dataplane::client::Publisher as GapicPublisher, model::{PublishResponse, PubsubMessage}, }; + use mockall::Sequence; + use rand::{Rng, distr::Alphanumeric}; + use std::collections::HashMap; + use std::time::Duration; static TOPIC: &str = "my-topic"; + const EXPECTED_BATCHES: usize = 5; + const TIME_PER_BATCH: Duration = Duration::from_secs(10); mockall::mock! { #[derive(Debug)] @@ -496,6 +503,20 @@ mod tests { } } + // Similar to GapicPublisher but returns impl Future instead. + // This is useful for mocking a response with delays/timeouts. + // See https://github.com/asomers/mockall/issues/189 for more + // detail on why this is needed. + // While this can used inplace of GapicPublisher, it makes the + // normal usage without async closure much more cumbersome. + mockall::mock! { + #[derive(Debug)] + GapicPublisherWithFuture {} + impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture { + fn publish(&self, req: crate::model::PublishRequest, _options: gax::options::RequestOptions) -> impl Future>> + Send; + } + } + fn publish_ok( req: crate::model::PublishRequest, _options: gax::options::RequestOptions, @@ -509,6 +530,78 @@ mod tests { )) } + fn publish_err( + _req: crate::model::PublishRequest, + _options: gax::options::RequestOptions, + ) -> gax::Result> { + Err(gax::error::Error::service( + gax::error::rpc::Status::default() + .set_code(gax::error::rpc::Code::Unknown) + .set_message("unknown error has occurred"), + )) + } + + fn generate_random_data() -> String { + rand::rng() + .sample_iter(&Alphanumeric) + .take(16) + .map(char::from) + .collect() + } + + // Send ToBatchActor::Publish with random data n times then await and assert the result. + macro_rules! assert_publish_is_ok { + ($actor_tx:ident, $n:expr) => { + let mut publish_rxs = HashMap::new(); + for _ in 0..$n { + let (publish_tx, publish_rx) = tokio::sync::oneshot::channel(); + let msg = generate_random_data(); + let bundle = BundledMessage { + msg: PubsubMessage::new().set_data(msg.clone()), + tx: publish_tx, + }; + $actor_tx.send(ToBatchActor::Publish(bundle))?; + publish_rxs.insert(msg, publish_rx); + } + for (k, v) in publish_rxs { + let res = v.await; + assert!(matches!(res, Ok(Ok(ref msg)) if *msg == k), "got {res:?}, expected {k:?}"); + } + }; + } + + // Send ToBatchActor::Publish with random data n times then await and assert that the actor is paused. + macro_rules! assert_actor_is_paused { + ($actor_tx:ident, $n:expr) => { + let mut publish_rxs = Vec::new(); + for _ in 0..$n { + let (publish_tx, publish_rx) = tokio::sync::oneshot::channel(); + // let msg = generate_random_data(); + let bundle = BundledMessage { + msg: PubsubMessage::new().set_data(generate_random_data()), + tx: publish_tx, + }; + $actor_tx.send(ToBatchActor::Publish(bundle))?; + publish_rxs.push(publish_rx); + } + for v in publish_rxs { + let res = v.await; + assert!( + matches!(res, Ok(Err(PublishError::OrderingKeyPaused(())))), + "{res:?}" + ); + } + }; + } + + macro_rules! assert_flush { + ($actor_tx:ident) => { + let (flush_tx, flush_rx) = tokio::sync::oneshot::channel(); + $actor_tx.send(ToBatchActor::Flush(flush_tx))?; + flush_rx.await?; + }; + } + #[tokio::test] async fn basic() -> anyhow::Result<()> { let client = GapicPublisher::from_stub(MockGapicPublisher::new()); @@ -527,70 +620,192 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(start_paused = true)] async fn concurrent_actor_publish() -> anyhow::Result<()> { - let mut mock = MockGapicPublisher::new(); + let mut mock = MockGapicPublisherWithFuture::new(); mock.expect_publish() .withf(|req, _o| req.topic == TOPIC) - .returning(publish_ok) - .times(1); + .times(EXPECTED_BATCHES) + .returning({ + |r, o| { + Box::pin(async move { + tokio::time::sleep(TIME_PER_BATCH).await; + publish_ok(r, o) + }) + } + }); let (actor_tx, actor_rx) = tokio::sync::mpsc::unbounded_channel(); tokio::spawn( ConcurrentBatchActor::new( TOPIC.to_string(), GapicPublisher::from_stub(mock), - BatchingOptions::default().set_message_count_threshold(1_u32), + BatchingOptions::default().set_message_count_threshold(2_u32), actor_rx, ) .run(), ); - let (publish_tx, publish_rx) = tokio::sync::oneshot::channel(); - let bundle = BundledMessage { - msg: PubsubMessage::new().set_data("hello"), - tx: publish_tx, - }; - actor_tx.send(ToBatchActor::Publish(bundle))?; - let res = publish_rx.await; - assert!(matches!(res, Ok(Ok(ref msg)) if msg == "hello"), "{res:?}"); + let start = tokio::time::Instant::now(); + assert_publish_is_ok!(actor_tx, 10); + assert_eq!( + start.elapsed(), + TIME_PER_BATCH, + "all batches should have been concurrently sent and completed by {:?}", + TIME_PER_BATCH + ); Ok(()) } - #[tokio::test] + #[tokio::test(start_paused = true)] + async fn sequential_actor_publish() -> anyhow::Result<()> { + let mut mock = MockGapicPublisherWithFuture::new(); + mock.expect_publish() + .withf(|req, _o| req.topic == TOPIC) + .times(EXPECTED_BATCHES) + .returning({ + |r, o| { + Box::pin(async move { + tokio::time::sleep(TIME_PER_BATCH).await; + publish_ok(r, o) + }) + } + }); + let (actor_tx, actor_rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn( + SequentialBatchActor::new( + TOPIC.to_string(), + GapicPublisher::from_stub(mock), + BatchingOptions::default().set_message_count_threshold(2_u32), + actor_rx, + ) + .run(), + ); + + let start = tokio::time::Instant::now(); + assert_publish_is_ok!(actor_tx, 10); + assert_eq!( + start.elapsed(), + EXPECTED_BATCHES as u32 * TIME_PER_BATCH, + "all batches should have been seqentially sent and takes {:?}", + EXPECTED_BATCHES as u32 * TIME_PER_BATCH + ); + Ok(()) + } + + #[tokio::test(start_paused = true)] async fn concurrent_actor_flush() -> anyhow::Result<()> { - let mut mock = MockGapicPublisher::new(); + let mut mock = MockGapicPublisherWithFuture::new(); mock.expect_publish() .withf(|req, _o| req.topic == TOPIC) - .returning(publish_ok) - .times(1); + .times(EXPECTED_BATCHES) + .returning({ + |r, o| { + Box::pin(async move { + tokio::time::sleep(TIME_PER_BATCH).await; + publish_ok(r, o) + }) + } + }); let (actor_tx, actor_rx) = tokio::sync::mpsc::unbounded_channel(); tokio::spawn( ConcurrentBatchActor::new( TOPIC.to_string(), GapicPublisher::from_stub(mock), - BatchingOptions::default(), + BatchingOptions::default().set_message_count_threshold(2_u32), actor_rx, ) .run(), ); // Flush on empty. - let (flush_tx, flush_rx) = tokio::sync::oneshot::channel(); - actor_tx.send(ToBatchActor::Flush(flush_tx))?; - flush_rx.await?; + assert_flush!(actor_tx); + + // Publish 10 messages then Flush. + let start = tokio::time::Instant::now(); + let mut publish_rxs = HashMap::new(); + for _ in 0..10 { + let (publish_tx, publish_rx) = tokio::sync::oneshot::channel(); + let msg = generate_random_data(); + let bundle = BundledMessage { + msg: PubsubMessage::new().set_data(msg.clone()), + tx: publish_tx, + }; + actor_tx.send(ToBatchActor::Publish(bundle))?; + publish_rxs.insert(msg, publish_rx); + } + assert_flush!(actor_tx); + for (k, v) in publish_rxs { + let res = v.await; + assert!( + matches!(res, Ok(Ok(ref msg)) if *msg == k), + "got {res:?}, expected {k:?}" + ); + } + assert_eq!( + start.elapsed(), + TIME_PER_BATCH, + "all batches should have been concurrently sent and completed by {:?}", + TIME_PER_BATCH + ); - // Publish a message then Flush. - let (publish_tx, publish_rx) = tokio::sync::oneshot::channel(); - let bundle = BundledMessage { - msg: PubsubMessage::new().set_data("hello"), - tx: publish_tx, - }; - actor_tx.send(ToBatchActor::Publish(bundle))?; - let (flush_tx, flush_rx) = tokio::sync::oneshot::channel(); - actor_tx.send(ToBatchActor::Flush(flush_tx))?; - flush_rx.await?; - let res = publish_rx.await; - assert!(matches!(res, Ok(Ok(ref msg)) if msg == "hello"), "{res:?}"); + Ok(()) + } + + #[tokio::test(start_paused = true)] + async fn sequential_actor_flush() -> anyhow::Result<()> { + let mut mock = MockGapicPublisherWithFuture::new(); + mock.expect_publish() + .withf(|req, _o| req.topic == TOPIC) + .times(EXPECTED_BATCHES) + .returning({ + |r, o| { + Box::pin(async move { + tokio::time::sleep(TIME_PER_BATCH).await; + publish_ok(r, o) + }) + } + }); + let (actor_tx, actor_rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn( + SequentialBatchActor::new( + TOPIC.to_string(), + GapicPublisher::from_stub(mock), + BatchingOptions::default().set_message_count_threshold(2_u32), + actor_rx, + ) + .run(), + ); + + // Flush on empty. + assert_flush!(actor_tx); + + // Publish 10 messages then Flush. + let start = tokio::time::Instant::now(); + let mut publish_rxs = HashMap::new(); + for _ in 0..10 { + let (publish_tx, publish_rx) = tokio::sync::oneshot::channel(); + let msg = generate_random_data(); + let bundle = BundledMessage { + msg: PubsubMessage::new().set_data(msg.clone()), + tx: publish_tx, + }; + actor_tx.send(ToBatchActor::Publish(bundle))?; + publish_rxs.insert(msg, publish_rx); + } + assert_flush!(actor_tx); + for (k, v) in publish_rxs { + let res = v.await; + assert!( + matches!(res, Ok(Ok(ref msg)) if *msg == k), + "got {res:?}, expected {k:?}" + ); + } + assert_eq!( + start.elapsed(), + EXPECTED_BATCHES as u32 * TIME_PER_BATCH, + "all batches should have been seqentially sent and takes {:?}", + EXPECTED_BATCHES as u32 * TIME_PER_BATCH + ); Ok(()) } @@ -611,4 +826,58 @@ mod tests { actor_tx.send(ToBatchActor::ResumePublish())?; Ok(()) } + + #[tokio::test(start_paused = true)] + async fn sequential_actor_resume() -> anyhow::Result<()> { + let mut mock = MockGapicPublisher::new(); + let mut seq = Sequence::new(); + mock.expect_publish() + .withf(|req, _o| req.topic == TOPIC) + .times(5) + .in_sequence(&mut seq) + .returning(publish_ok); + mock.expect_publish() + .withf(|req, _o| req.topic == TOPIC) + .times(1) + .in_sequence(&mut seq) + .returning(publish_err); + mock.expect_publish() + .withf(|req, _o| req.topic == TOPIC) + .times(5) + .in_sequence(&mut seq) + .returning(publish_ok); + + let (actor_tx, actor_rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn( + SequentialBatchActor::new( + TOPIC.to_string(), + GapicPublisher::from_stub(mock), + BatchingOptions::default().set_message_count_threshold(1_u32), + actor_rx, + ) + .run(), + ); + + // Validate resume when not paused. + actor_tx.send(ToBatchActor::ResumePublish())?; + assert_publish_is_ok!(actor_tx, 5); + + // This message triggers the mock to return publish error and causes the actor to pause. + let (publish_tx, publish_rx) = tokio::sync::oneshot::channel(); + let bundle = BundledMessage { + msg: PubsubMessage::new().set_data(generate_random_data()), + tx: publish_tx, + }; + actor_tx.send(ToBatchActor::Publish(bundle))?; + let got_err = publish_rx.await; + // TODO(#3689): Validate the error structure when Publisher error structure is better defined. + assert!(matches!(got_err, Ok(Err(_))), "{got_err:?}"); + assert_actor_is_paused!(actor_tx, 5); + + // Resume then validate that the actor is no longer paused. + actor_tx.send(ToBatchActor::ResumePublish())?; + assert_publish_is_ok!(actor_tx, 5); + + Ok(()) + } }