Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/pubsub/src/subscriber/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,31 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn routing_header() -> anyhow::Result<()> {
let mut mock = MockSubscriber::new();

mock.expect_streaming_pull().return_once(move |request| {
let metadata = request.metadata();
assert_eq!(
metadata
.get("x-goog-request-params")
.expect("routing header missing"),
"subscription=projects/p/subscriptions/s"
);
Err(TonicStatus::failed_precondition("ignored"))
});

let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
let client = test_client(endpoint).await?;

let _ = client
.streaming_pull("projects/p/subscriptions/s")
.start()
.next()
.await;

Ok(())
}
}
29 changes: 21 additions & 8 deletions src/pubsub/src/subscriber/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ where
// The only writes we perform are keepalives, which are sent so infrequently
// that we don't fear any back pressure on this channel.
let (request_tx, request_rx) = mpsc::channel(1);
let request_params = format!("subscription={}", initial_req.subscription);

request_tx.send(initial_req).await.map_err(Error::io)?;

// Start the keepalive task **before** we open the stream.
Expand All @@ -116,7 +118,7 @@ where
keepalive::spawn(request_tx, shutdown.clone());

let stream = inner
.streaming_pull(request_rx, RequestOptions::default())
.streaming_pull(&request_params, request_rx, RequestOptions::default())
.await?
.into_inner();

Expand Down Expand Up @@ -216,8 +218,9 @@ mod tests {

let mut mock = MockStub::new();
mock.expect_streaming_pull()
.withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription")
.times(1)
.return_once(move |_r, _o| Ok(TonicResponse::from(response_rx)));
.return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx)));

response_tx.send(Ok(test_response(1..10))).await?;
response_tx.send(Ok(test_response(11..20))).await?;
Expand All @@ -242,8 +245,9 @@ mod tests {

let mut mock = MockStub::new();
mock.expect_streaming_pull()
.withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription")
.times(1)
.return_once(move |mut request_rx, _o| {
.return_once(move |_s, mut request_rx, _o| {
tokio::spawn(async move {
// Note that this task stays alive as long as we hold
// `recover_writes_rx`.
Expand Down Expand Up @@ -281,8 +285,9 @@ mod tests {
async fn error() -> anyhow::Result<()> {
let mut mock = MockStub::new();
mock.expect_streaming_pull()
.withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription")
.times(1)
.return_once(|_, _| Err(Error::io("fail")));
.return_once(|_, _, _| Err(Error::io("fail")));

let err = open_stream(Arc::new(mock), initial_request())
.await
Expand All @@ -302,9 +307,12 @@ mod tests {
// N > 10 (the default attempt limit for GAPICs).
mock_stub
.expect_streaming_pull()
.withf(|s, _, _| {
s == "subscription=projects/my-project/subscriptions/my-subscription"
})
.times(1)
.in_sequence(&mut seq)
.return_once(|_, _| Err(transient_error()));
.return_once(|_, _, _| Err(transient_error()));
mock_backoff
.expect_on_failure()
.times(1)
Expand All @@ -319,9 +327,10 @@ mod tests {

mock_stub
.expect_streaming_pull()
.withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription")
.times(1)
.in_sequence(&mut seq)
.return_once(move |_r, _o| Ok(TonicResponse::from(response_rx)));
.return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx)));

let mut stream = Stream::new_with_backoff(
Arc::new(mock_stub),
Expand All @@ -345,9 +354,12 @@ mod tests {
// N > 10 (the default attempt limit for GAPICs).
mock_stub
.expect_streaming_pull()
.withf(|s, _, _| {
s == "subscription=projects/my-project/subscriptions/my-subscription"
})
.times(1)
.in_sequence(&mut seq)
.return_once(|_, _| Err(transient_error()));
.return_once(|_, _, _| Err(transient_error()));
mock_backoff
.expect_on_failure()
.times(1)
Expand All @@ -358,9 +370,10 @@ mod tests {
// Simulate a permanent error.
mock_stub
.expect_streaming_pull()
.withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription")
.times(1)
.in_sequence(&mut seq)
.return_once(|_, _| Err(permanent_error()));
.return_once(|_, _, _| Err(permanent_error()));
// The retry loop calculates the backoff delay before determining
// whether a retry should occur. Hence, we expect this extra call to
// `on_failure()`.
Expand Down
2 changes: 2 additions & 0 deletions src/pubsub/src/subscriber/stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub(super) trait Stub: std::fmt::Debug + Send + Sync {
type Stream: Sized + std::fmt::Debug;
async fn streaming_pull(
&self,
request_params: &str,
request_rx: Receiver<StreamingPullRequest>,
_options: gax::options::RequestOptions,
) -> Result<TonicResponse<Self::Stream>>;
Expand Down Expand Up @@ -68,6 +69,7 @@ pub(super) mod tests {
type Stream = MockStream;
async fn streaming_pull(
&self,
request_params: &str,
request_rx: Receiver<StreamingPullRequest>,
_options: gax::options::RequestOptions,
) -> Result<TonicResponse<MockStream>>;
Expand Down
16 changes: 13 additions & 3 deletions src/pubsub/src/subscriber/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl Stub for Transport {
type Stream = Streaming<StreamingPullResponse>;
async fn streaming_pull(
&self,
request_params: &str,
request_rx: Receiver<StreamingPullRequest>,
options: gax::options::RequestOptions,
) -> Result<TonicResponse<Self::Stream>> {
Expand All @@ -69,7 +70,7 @@ impl Stub for Transport {
request,
options,
&info::X_GOOG_API_CLIENT_HEADER,
"",
request_params,
)
.await
}
Expand Down Expand Up @@ -131,12 +132,21 @@ pub(super) mod tests {
request_tx.send(StreamingPullRequest::default()).await?;

let mut mock = MockSubscriber::new();
mock.expect_streaming_pull()
.return_once(|_| Ok(TonicResponse::from(response_rx)));
mock.expect_streaming_pull().return_once(|request| {
let metadata = request.metadata();
assert_eq!(
metadata
.get("x-goog-request-params")
.expect("routing header missing"),
"subscription=projects/p/subscriptions/s"
);
Ok(TonicResponse::from(response_rx))
});
let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
let transport = test_transport(endpoint).await?;
let mut stream = Stub::streaming_pull(
&transport,
"subscription=projects/p/subscriptions/s",
request_rx,
gax::options::RequestOptions::default(),
)
Expand Down
Loading