diff --git a/src/pubsub/src/subscriber/session.rs b/src/pubsub/src/subscriber/session.rs index 15a1a88355..50e55bdceb 100644 --- a/src/pubsub/src/subscriber/session.rs +++ b/src/pubsub/src/subscriber/session.rs @@ -1041,4 +1041,31 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn routing_header() -> anyhow::Result<()> { + let mut mock = MockSubscriber::new(); + + mock.expect_streaming_pull().return_once(move |request| { + let metadata = request.metadata(); + assert_eq!( + metadata + .get("x-goog-request-params") + .expect("routing header missing"), + "subscription=projects/p/subscriptions/s" + ); + Err(TonicStatus::failed_precondition("ignored")) + }); + + let (endpoint, _server) = start("0.0.0.0:0", mock).await?; + let client = test_client(endpoint).await?; + + let _ = client + .streaming_pull("projects/p/subscriptions/s") + .start() + .next() + .await; + + Ok(()) + } } diff --git a/src/pubsub/src/subscriber/stream.rs b/src/pubsub/src/subscriber/stream.rs index efdb709861..94618cfc72 100644 --- a/src/pubsub/src/subscriber/stream.rs +++ b/src/pubsub/src/subscriber/stream.rs @@ -101,6 +101,8 @@ where // The only writes we perform are keepalives, which are sent so infrequently // that we don't fear any back pressure on this channel. let (request_tx, request_rx) = mpsc::channel(1); + let request_params = format!("subscription={}", initial_req.subscription); + request_tx.send(initial_req).await.map_err(Error::io)?; // Start the keepalive task **before** we open the stream. @@ -116,7 +118,7 @@ where keepalive::spawn(request_tx, shutdown.clone()); let stream = inner - .streaming_pull(request_rx, RequestOptions::default()) + .streaming_pull(&request_params, request_rx, RequestOptions::default()) .await? .into_inner(); @@ -216,8 +218,9 @@ mod tests { let mut mock = MockStub::new(); mock.expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) - .return_once(move |_r, _o| Ok(TonicResponse::from(response_rx))); + .return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx))); response_tx.send(Ok(test_response(1..10))).await?; response_tx.send(Ok(test_response(11..20))).await?; @@ -242,8 +245,9 @@ mod tests { let mut mock = MockStub::new(); mock.expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) - .return_once(move |mut request_rx, _o| { + .return_once(move |_s, mut request_rx, _o| { tokio::spawn(async move { // Note that this task stays alive as long as we hold // `recover_writes_rx`. @@ -281,8 +285,9 @@ mod tests { async fn error() -> anyhow::Result<()> { let mut mock = MockStub::new(); mock.expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) - .return_once(|_, _| Err(Error::io("fail"))); + .return_once(|_, _, _| Err(Error::io("fail"))); let err = open_stream(Arc::new(mock), initial_request()) .await @@ -302,9 +307,12 @@ mod tests { // N > 10 (the default attempt limit for GAPICs). mock_stub .expect_streaming_pull() + .withf(|s, _, _| { + s == "subscription=projects/my-project/subscriptions/my-subscription" + }) .times(1) .in_sequence(&mut seq) - .return_once(|_, _| Err(transient_error())); + .return_once(|_, _, _| Err(transient_error())); mock_backoff .expect_on_failure() .times(1) @@ -319,9 +327,10 @@ mod tests { mock_stub .expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) .in_sequence(&mut seq) - .return_once(move |_r, _o| Ok(TonicResponse::from(response_rx))); + .return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx))); let mut stream = Stream::new_with_backoff( Arc::new(mock_stub), @@ -345,9 +354,12 @@ mod tests { // N > 10 (the default attempt limit for GAPICs). mock_stub .expect_streaming_pull() + .withf(|s, _, _| { + s == "subscription=projects/my-project/subscriptions/my-subscription" + }) .times(1) .in_sequence(&mut seq) - .return_once(|_, _| Err(transient_error())); + .return_once(|_, _, _| Err(transient_error())); mock_backoff .expect_on_failure() .times(1) @@ -358,9 +370,10 @@ mod tests { // Simulate a permanent error. mock_stub .expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) .in_sequence(&mut seq) - .return_once(|_, _| Err(permanent_error())); + .return_once(|_, _, _| Err(permanent_error())); // The retry loop calculates the backoff delay before determining // whether a retry should occur. Hence, we expect this extra call to // `on_failure()`. diff --git a/src/pubsub/src/subscriber/stub.rs b/src/pubsub/src/subscriber/stub.rs index 7358892ecf..777404767c 100644 --- a/src/pubsub/src/subscriber/stub.rs +++ b/src/pubsub/src/subscriber/stub.rs @@ -29,6 +29,7 @@ pub(super) trait Stub: std::fmt::Debug + Send + Sync { type Stream: Sized + std::fmt::Debug; async fn streaming_pull( &self, + request_params: &str, request_rx: Receiver, _options: gax::options::RequestOptions, ) -> Result>; @@ -68,6 +69,7 @@ pub(super) mod tests { type Stream = MockStream; async fn streaming_pull( &self, + request_params: &str, request_rx: Receiver, _options: gax::options::RequestOptions, ) -> Result>; diff --git a/src/pubsub/src/subscriber/transport.rs b/src/pubsub/src/subscriber/transport.rs index 624a3be208..ec4bff2554 100644 --- a/src/pubsub/src/subscriber/transport.rs +++ b/src/pubsub/src/subscriber/transport.rs @@ -47,6 +47,7 @@ impl Stub for Transport { type Stream = Streaming; async fn streaming_pull( &self, + request_params: &str, request_rx: Receiver, options: gax::options::RequestOptions, ) -> Result> { @@ -69,7 +70,7 @@ impl Stub for Transport { request, options, &info::X_GOOG_API_CLIENT_HEADER, - "", + request_params, ) .await } @@ -131,12 +132,21 @@ pub(super) mod tests { request_tx.send(StreamingPullRequest::default()).await?; let mut mock = MockSubscriber::new(); - mock.expect_streaming_pull() - .return_once(|_| Ok(TonicResponse::from(response_rx))); + mock.expect_streaming_pull().return_once(|request| { + let metadata = request.metadata(); + assert_eq!( + metadata + .get("x-goog-request-params") + .expect("routing header missing"), + "subscription=projects/p/subscriptions/s" + ); + Ok(TonicResponse::from(response_rx)) + }); let (endpoint, _server) = start("0.0.0.0:0", mock).await?; let transport = test_transport(endpoint).await?; let mut stream = Stub::streaming_pull( &transport, + "subscription=projects/p/subscriptions/s", request_rx, gax::options::RequestOptions::default(), )