From f06c4296bd05b493618622f640ff7479f033b6ee Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 00:02:47 +0000 Subject: [PATCH 1/9] feat(pubsub): include subscription name in StreamingPull header --- src/pubsub/src/subscriber/stream.rs | 17 +++++++++-------- src/pubsub/src/subscriber/stub.rs | 2 ++ src/pubsub/src/subscriber/transport.rs | 5 ++++- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/pubsub/src/subscriber/stream.rs b/src/pubsub/src/subscriber/stream.rs index efdb709861..d5ef4eea95 100644 --- a/src/pubsub/src/subscriber/stream.rs +++ b/src/pubsub/src/subscriber/stream.rs @@ -101,6 +101,7 @@ where // The only writes we perform are keepalives, which are sent so infrequently // that we don't fear any back pressure on this channel. let (request_tx, request_rx) = mpsc::channel(1); + let subscription = initial_req.subscription.clone(); request_tx.send(initial_req).await.map_err(Error::io)?; // Start the keepalive task **before** we open the stream. @@ -116,7 +117,7 @@ where keepalive::spawn(request_tx, shutdown.clone()); let stream = inner - .streaming_pull(request_rx, RequestOptions::default()) + .streaming_pull(&subscription, request_rx, RequestOptions::default()) .await? .into_inner(); @@ -217,7 +218,7 @@ mod tests { let mut mock = MockStub::new(); mock.expect_streaming_pull() .times(1) - .return_once(move |_r, _o| Ok(TonicResponse::from(response_rx))); + .return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx))); response_tx.send(Ok(test_response(1..10))).await?; response_tx.send(Ok(test_response(11..20))).await?; @@ -243,7 +244,7 @@ mod tests { let mut mock = MockStub::new(); mock.expect_streaming_pull() .times(1) - .return_once(move |mut request_rx, _o| { + .return_once(move |_s, mut request_rx, _o| { tokio::spawn(async move { // Note that this task stays alive as long as we hold // `recover_writes_rx`. @@ -282,7 +283,7 @@ mod tests { let mut mock = MockStub::new(); mock.expect_streaming_pull() .times(1) - .return_once(|_, _| Err(Error::io("fail"))); + .return_once(|_, _, _| Err(Error::io("fail"))); let err = open_stream(Arc::new(mock), initial_request()) .await @@ -304,7 +305,7 @@ mod tests { .expect_streaming_pull() .times(1) .in_sequence(&mut seq) - .return_once(|_, _| Err(transient_error())); + .return_once(|_, _, _| Err(transient_error())); mock_backoff .expect_on_failure() .times(1) @@ -321,7 +322,7 @@ mod tests { .expect_streaming_pull() .times(1) .in_sequence(&mut seq) - .return_once(move |_r, _o| Ok(TonicResponse::from(response_rx))); + .return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx))); let mut stream = Stream::new_with_backoff( Arc::new(mock_stub), @@ -347,7 +348,7 @@ mod tests { .expect_streaming_pull() .times(1) .in_sequence(&mut seq) - .return_once(|_, _| Err(transient_error())); + .return_once(|_, _, _| Err(transient_error())); mock_backoff .expect_on_failure() .times(1) @@ -360,7 +361,7 @@ mod tests { .expect_streaming_pull() .times(1) .in_sequence(&mut seq) - .return_once(|_, _| Err(permanent_error())); + .return_once(|_, _, _| Err(permanent_error())); // The retry loop calculates the backoff delay before determining // whether a retry should occur. Hence, we expect this extra call to // `on_failure()`. diff --git a/src/pubsub/src/subscriber/stub.rs b/src/pubsub/src/subscriber/stub.rs index 7358892ecf..873345ca2b 100644 --- a/src/pubsub/src/subscriber/stub.rs +++ b/src/pubsub/src/subscriber/stub.rs @@ -29,6 +29,7 @@ pub(super) trait Stub: std::fmt::Debug + Send + Sync { type Stream: Sized + std::fmt::Debug; async fn streaming_pull( &self, + subscription: &str, request_rx: Receiver, _options: gax::options::RequestOptions, ) -> Result>; @@ -68,6 +69,7 @@ pub(super) mod tests { type Stream = MockStream; async fn streaming_pull( &self, + subscription: &str, request_rx: Receiver, _options: gax::options::RequestOptions, ) -> Result>; diff --git a/src/pubsub/src/subscriber/transport.rs b/src/pubsub/src/subscriber/transport.rs index 624a3be208..fffe6077be 100644 --- a/src/pubsub/src/subscriber/transport.rs +++ b/src/pubsub/src/subscriber/transport.rs @@ -47,6 +47,7 @@ impl Stub for Transport { type Stream = Streaming; async fn streaming_pull( &self, + subscription: &str, request_rx: Receiver, options: gax::options::RequestOptions, ) -> Result> { @@ -62,6 +63,7 @@ impl Stub for Transport { }; let path = http::uri::PathAndQuery::from_static("/google.pubsub.v1.Subscriber/StreamingPull"); + let x_goog_request_params = format!("subscription={subscription}"); self.inner .bidi_stream( extensions, @@ -69,7 +71,7 @@ impl Stub for Transport { request, options, &info::X_GOOG_API_CLIENT_HEADER, - "", + &x_goog_request_params, ) .await } @@ -137,6 +139,7 @@ pub(super) mod tests { let transport = test_transport(endpoint).await?; let mut stream = Stub::streaming_pull( &transport, + "projects/p/subscriptions/s", request_rx, gax::options::RequestOptions::default(), ) From e0fd90f429b3201f879e5c2e7ad8709922dfd28a Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 11:27:55 +0000 Subject: [PATCH 2/9] add unit test for gRPC metadata verification --- src/pubsub/src/subscriber/session.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/pubsub/src/subscriber/session.rs b/src/pubsub/src/subscriber/session.rs index 15a1a88355..a6443f15a6 100644 --- a/src/pubsub/src/subscriber/session.rs +++ b/src/pubsub/src/subscriber/session.rs @@ -1041,4 +1041,31 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn routing_header() -> anyhow::Result<()> { + let mut mock = MockSubscriber::new(); + + let subscription = "projects/p/subscriptions/s"; + + mock.expect_streaming_pull().return_once(move |request| { + let metadata = request.metadata(); + assert_eq!( + metadata + .get("x-goog-request-params") + .expect("routing header missing"), + &format!("subscription={subscription}") + ); + Err(TonicStatus::failed_precondition( + "header verification failed", + )) + }); + + let (endpoint, _server) = start("0.0.0.0:0", mock).await?; + let client = test_client(endpoint).await?; + + let _ = client.streaming_pull(subscription).start().next().await; + + Ok(()) + } } From 5d8e4510cc39dc42bbde8c552cebf32ab5d957ed Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 11:48:19 +0000 Subject: [PATCH 3/9] accept x_goog_request_params instead of subscription --- src/pubsub/src/subscriber/stream.rs | 5 +++-- src/pubsub/src/subscriber/stub.rs | 4 ++-- src/pubsub/src/subscriber/transport.rs | 5 ++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/pubsub/src/subscriber/stream.rs b/src/pubsub/src/subscriber/stream.rs index d5ef4eea95..1b515246d4 100644 --- a/src/pubsub/src/subscriber/stream.rs +++ b/src/pubsub/src/subscriber/stream.rs @@ -101,7 +101,8 @@ where // The only writes we perform are keepalives, which are sent so infrequently // that we don't fear any back pressure on this channel. let (request_tx, request_rx) = mpsc::channel(1); - let subscription = initial_req.subscription.clone(); + let request_params = format!("subscription={}", initial_req.subscription); + request_tx.send(initial_req).await.map_err(Error::io)?; // Start the keepalive task **before** we open the stream. @@ -117,7 +118,7 @@ where keepalive::spawn(request_tx, shutdown.clone()); let stream = inner - .streaming_pull(&subscription, request_rx, RequestOptions::default()) + .streaming_pull(&request_params, request_rx, RequestOptions::default()) .await? .into_inner(); diff --git a/src/pubsub/src/subscriber/stub.rs b/src/pubsub/src/subscriber/stub.rs index 873345ca2b..777404767c 100644 --- a/src/pubsub/src/subscriber/stub.rs +++ b/src/pubsub/src/subscriber/stub.rs @@ -29,7 +29,7 @@ pub(super) trait Stub: std::fmt::Debug + Send + Sync { type Stream: Sized + std::fmt::Debug; async fn streaming_pull( &self, - subscription: &str, + request_params: &str, request_rx: Receiver, _options: gax::options::RequestOptions, ) -> Result>; @@ -69,7 +69,7 @@ pub(super) mod tests { type Stream = MockStream; async fn streaming_pull( &self, - subscription: &str, + request_params: &str, request_rx: Receiver, _options: gax::options::RequestOptions, ) -> Result>; diff --git a/src/pubsub/src/subscriber/transport.rs b/src/pubsub/src/subscriber/transport.rs index fffe6077be..63df9eebdd 100644 --- a/src/pubsub/src/subscriber/transport.rs +++ b/src/pubsub/src/subscriber/transport.rs @@ -47,7 +47,7 @@ impl Stub for Transport { type Stream = Streaming; async fn streaming_pull( &self, - subscription: &str, + request_params: &str, request_rx: Receiver, options: gax::options::RequestOptions, ) -> Result> { @@ -63,7 +63,6 @@ impl Stub for Transport { }; let path = http::uri::PathAndQuery::from_static("/google.pubsub.v1.Subscriber/StreamingPull"); - let x_goog_request_params = format!("subscription={subscription}"); self.inner .bidi_stream( extensions, @@ -71,7 +70,7 @@ impl Stub for Transport { request, options, &info::X_GOOG_API_CLIENT_HEADER, - &x_goog_request_params, + request_params, ) .await } From d56698b1aad993ff0ecfce0f8303672d1fdfe242 Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 12:02:28 +0000 Subject: [PATCH 4/9] cargo update --- Cargo.lock | 102 +++++++++++++++++++++++------------------------------ 1 file changed, 45 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87766c29be..b78098efd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,9 +34,9 @@ checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" [[package]] name = "anyhow" -version = "1.0.100" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" [[package]] name = "async-trait" @@ -225,12 +225,11 @@ dependencies = [ [[package]] name = "cargo-platform" -version = "0.3.2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87a0c0e6148f11f01f32650a2ea02d532b2ad4e81d8bd41e6e565b5adc5e6082" +checksum = "8abf5d501fd757c2d2ee78d0cc40f606e92e3a63544420316565556ed28485e2" dependencies = [ "serde", - "serde_core", ] [[package]] @@ -249,9 +248,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.54" +version = "1.2.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" +checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" dependencies = [ "find-msvc-tools", "jobserver", @@ -298,9 +297,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.56" +version = "4.5.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" +checksum = "6899ea499e3fb9305a65d5ebf6e3d2248c5fab291f300ad0a704fbe142eae31a" dependencies = [ "clap_builder", "clap_derive", @@ -308,9 +307,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.56" +version = "4.5.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" +checksum = "7b12c8b680195a62a8364d16b8447b01b6c2c8f9aaf68bee653be34d4245e238" dependencies = [ "anstyle", "clap_lex", @@ -431,15 +430,6 @@ dependencies = [ "rustc_version", ] -[[package]] -name = "crc32fast" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" -dependencies = [ - "cfg-if", -] - [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -690,9 +680,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "fixedbitset" @@ -702,11 +692,10 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flate2" -version = "1.1.8" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b375d6465b98090a5f25b1c7703f3859783755aa9a80433b36e0379a3ec2f369" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" dependencies = [ - "crc32fast", "miniz_oxide", "zlib-rs", ] @@ -5453,14 +5442,13 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "base64", "bytes", "futures-channel", - "futures-core", "futures-util", "http", "http-body", @@ -5477,9 +5465,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.64" +version = "0.1.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -6926,9 +6914,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" dependencies = [ "aho-corasick", "memchr", @@ -6938,9 +6926,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" dependencies = [ "aho-corasick", "memchr", @@ -6949,9 +6937,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" [[package]] name = "reqwest" @@ -7236,9 +7224,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e910108742c57a770f492731f99be216a52fadd361b06c8fb59d74ccc267d2" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ "dyn-clone", "ref-cast", @@ -7451,7 +7439,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.13.0", "schemars 0.9.0", - "schemars 1.2.0", + "schemars 1.2.1", "serde_core", "serde_json", "serde_with_macros", @@ -7547,9 +7535,9 @@ checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" [[package]] name = "slab" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "smallvec" @@ -8158,9 +8146,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +checksum = "27aac809edf60b741e2d7db6367214d078856b8a5bff0087e94ff330fb97b6fc" dependencies = [ "prettyplease", "proc-macro2", @@ -8463,9 +8451,9 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasip2" -version = "1.0.2+wasi-0.2.9" +version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ "wit-bindgen", ] @@ -8564,9 +8552,9 @@ dependencies = [ [[package]] name = "webpki-root-certs" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc" +checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" dependencies = [ "rustls-pki-types", ] @@ -8872,9 +8860,9 @@ dependencies = [ [[package]] name = "wit-bindgen" -version = "0.51.0" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" [[package]] name = "writeable" @@ -8913,18 +8901,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.34" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71ddd76bcebeed25db614f82bf31a9f4222d3fbba300e6fb6c00afa26cbd4d9d" +checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.34" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8187381b52e32220d50b255276aa16a084ec0a9017a0ca2152a1f55c539758d" +checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", @@ -8993,12 +8981,12 @@ dependencies = [ [[package]] name = "zlib-rs" -version = "0.5.5" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" +checksum = "a7948af682ccbc3342b6e9420e8c51c1fe5d7bf7756002b4a3c6cabfe96a7e3c" [[package]] name = "zmij" -version = "1.0.17" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" +checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" From ebf4d4bb90181a5f6774bf495076dfeae0271ea5 Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 15:23:22 +0000 Subject: [PATCH 5/9] chore: revert Cargo.lock to main --- Cargo.lock | 102 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 57 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b78098efd3..87766c29be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,9 +34,9 @@ checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" [[package]] name = "anyhow" -version = "1.0.101" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "async-trait" @@ -225,11 +225,12 @@ dependencies = [ [[package]] name = "cargo-platform" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8abf5d501fd757c2d2ee78d0cc40f606e92e3a63544420316565556ed28485e2" +checksum = "87a0c0e6148f11f01f32650a2ea02d532b2ad4e81d8bd41e6e565b5adc5e6082" dependencies = [ "serde", + "serde_core", ] [[package]] @@ -248,9 +249,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.55" +version = "1.2.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" +checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" dependencies = [ "find-msvc-tools", "jobserver", @@ -297,9 +298,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.57" +version = "4.5.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6899ea499e3fb9305a65d5ebf6e3d2248c5fab291f300ad0a704fbe142eae31a" +checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" dependencies = [ "clap_builder", "clap_derive", @@ -307,9 +308,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.57" +version = "4.5.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b12c8b680195a62a8364d16b8447b01b6c2c8f9aaf68bee653be34d4245e238" +checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" dependencies = [ "anstyle", "clap_lex", @@ -430,6 +431,15 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -680,9 +690,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.9" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" [[package]] name = "fixedbitset" @@ -692,10 +702,11 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flate2" -version = "1.1.9" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" +checksum = "b375d6465b98090a5f25b1c7703f3859783755aa9a80433b36e0379a3ec2f369" dependencies = [ + "crc32fast", "miniz_oxide", "zlib-rs", ] @@ -5442,13 +5453,14 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.20" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ "base64", "bytes", "futures-channel", + "futures-core", "futures-util", "http", "http-body", @@ -5465,9 +5477,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.65" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -6914,9 +6926,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.3" +version = "1.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" dependencies = [ "aho-corasick", "memchr", @@ -6926,9 +6938,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.14" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" dependencies = [ "aho-corasick", "memchr", @@ -6937,9 +6949,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.9" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" [[package]] name = "reqwest" @@ -7224,9 +7236,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.2.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" +checksum = "54e910108742c57a770f492731f99be216a52fadd361b06c8fb59d74ccc267d2" dependencies = [ "dyn-clone", "ref-cast", @@ -7439,7 +7451,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.13.0", "schemars 0.9.0", - "schemars 1.2.1", + "schemars 1.2.0", "serde_core", "serde_json", "serde_with_macros", @@ -7535,9 +7547,9 @@ checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" [[package]] name = "slab" -version = "0.4.12" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" [[package]] name = "smallvec" @@ -8146,9 +8158,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.14.3" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27aac809edf60b741e2d7db6367214d078856b8a5bff0087e94ff330fb97b6fc" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" dependencies = [ "prettyplease", "proc-macro2", @@ -8451,9 +8463,9 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasip2" -version = "1.0.1+wasi-0.2.4" +version = "1.0.2+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" dependencies = [ "wit-bindgen", ] @@ -8552,9 +8564,9 @@ dependencies = [ [[package]] name = "webpki-root-certs" -version = "1.0.6" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc" dependencies = [ "rustls-pki-types", ] @@ -8860,9 +8872,9 @@ dependencies = [ [[package]] name = "wit-bindgen" -version = "0.46.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" [[package]] name = "writeable" @@ -8901,18 +8913,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.39" +version = "0.8.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" +checksum = "71ddd76bcebeed25db614f82bf31a9f4222d3fbba300e6fb6c00afa26cbd4d9d" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.39" +version = "0.8.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" +checksum = "d8187381b52e32220d50b255276aa16a084ec0a9017a0ca2152a1f55c539758d" dependencies = [ "proc-macro2", "quote", @@ -8981,12 +8993,12 @@ dependencies = [ [[package]] name = "zlib-rs" -version = "0.6.0" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7948af682ccbc3342b6e9420e8c51c1fe5d7bf7756002b4a3c6cabfe96a7e3c" +checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" [[package]] name = "zmij" -version = "1.0.19" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" +checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" From 0ff6cab3c324444d34a0d9ae9c4aa7da17bfcaf2 Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 15:38:06 +0000 Subject: [PATCH 6/9] add x-goog-request-params verification and fix subscription arg --- src/pubsub/src/subscriber/transport.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/pubsub/src/subscriber/transport.rs b/src/pubsub/src/subscriber/transport.rs index 63df9eebdd..ec4bff2554 100644 --- a/src/pubsub/src/subscriber/transport.rs +++ b/src/pubsub/src/subscriber/transport.rs @@ -132,13 +132,21 @@ pub(super) mod tests { request_tx.send(StreamingPullRequest::default()).await?; let mut mock = MockSubscriber::new(); - mock.expect_streaming_pull() - .return_once(|_| Ok(TonicResponse::from(response_rx))); + mock.expect_streaming_pull().return_once(|request| { + let metadata = request.metadata(); + assert_eq!( + metadata + .get("x-goog-request-params") + .expect("routing header missing"), + "subscription=projects/p/subscriptions/s" + ); + Ok(TonicResponse::from(response_rx)) + }); let (endpoint, _server) = start("0.0.0.0:0", mock).await?; let transport = test_transport(endpoint).await?; let mut stream = Stub::streaming_pull( &transport, - "projects/p/subscriptions/s", + "subscription=projects/p/subscriptions/s", request_rx, gax::options::RequestOptions::default(), ) From aa27f6b65c4748ceb08fcd552ebcceb0bfb9bea8 Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 15:38:39 +0000 Subject: [PATCH 7/9] edit error message --- src/pubsub/src/subscriber/session.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/pubsub/src/subscriber/session.rs b/src/pubsub/src/subscriber/session.rs index a6443f15a6..c132db1754 100644 --- a/src/pubsub/src/subscriber/session.rs +++ b/src/pubsub/src/subscriber/session.rs @@ -1054,11 +1054,9 @@ mod tests { metadata .get("x-goog-request-params") .expect("routing header missing"), - &format!("subscription={subscription}") + "subscription=projects/p/subscriptions/s" ); - Err(TonicStatus::failed_precondition( - "header verification failed", - )) + Err(TonicStatus::failed_precondition("ignored")) }); let (endpoint, _server) = start("0.0.0.0:0", mock).await?; From cf6f4757e821ca794b6efd7c450059db6e548a58 Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 15:39:28 +0000 Subject: [PATCH 8/9] check subscription individually --- src/pubsub/src/subscriber/stream.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/pubsub/src/subscriber/stream.rs b/src/pubsub/src/subscriber/stream.rs index 1b515246d4..94618cfc72 100644 --- a/src/pubsub/src/subscriber/stream.rs +++ b/src/pubsub/src/subscriber/stream.rs @@ -218,6 +218,7 @@ mod tests { let mut mock = MockStub::new(); mock.expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) .return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx))); @@ -244,6 +245,7 @@ mod tests { let mut mock = MockStub::new(); mock.expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) .return_once(move |_s, mut request_rx, _o| { tokio::spawn(async move { @@ -283,6 +285,7 @@ mod tests { async fn error() -> anyhow::Result<()> { let mut mock = MockStub::new(); mock.expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) .return_once(|_, _, _| Err(Error::io("fail"))); @@ -304,6 +307,9 @@ mod tests { // N > 10 (the default attempt limit for GAPICs). mock_stub .expect_streaming_pull() + .withf(|s, _, _| { + s == "subscription=projects/my-project/subscriptions/my-subscription" + }) .times(1) .in_sequence(&mut seq) .return_once(|_, _, _| Err(transient_error())); @@ -321,6 +327,7 @@ mod tests { mock_stub .expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) .in_sequence(&mut seq) .return_once(move |_s, _r, _o| Ok(TonicResponse::from(response_rx))); @@ -347,6 +354,9 @@ mod tests { // N > 10 (the default attempt limit for GAPICs). mock_stub .expect_streaming_pull() + .withf(|s, _, _| { + s == "subscription=projects/my-project/subscriptions/my-subscription" + }) .times(1) .in_sequence(&mut seq) .return_once(|_, _, _| Err(transient_error())); @@ -360,6 +370,7 @@ mod tests { // Simulate a permanent error. mock_stub .expect_streaming_pull() + .withf(|s, _, _| s == "subscription=projects/my-project/subscriptions/my-subscription") .times(1) .in_sequence(&mut seq) .return_once(|_, _, _| Err(permanent_error())); From f5166e440e2208eb72d4dc0e36c03dc73140bef8 Mon Sep 17 00:00:00 2001 From: haphungw Date: Fri, 6 Feb 2026 16:26:39 +0000 Subject: [PATCH 9/9] spell out subscription --- src/pubsub/src/subscriber/session.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pubsub/src/subscriber/session.rs b/src/pubsub/src/subscriber/session.rs index c132db1754..50e55bdceb 100644 --- a/src/pubsub/src/subscriber/session.rs +++ b/src/pubsub/src/subscriber/session.rs @@ -1046,8 +1046,6 @@ mod tests { async fn routing_header() -> anyhow::Result<()> { let mut mock = MockSubscriber::new(); - let subscription = "projects/p/subscriptions/s"; - mock.expect_streaming_pull().return_once(move |request| { let metadata = request.metadata(); assert_eq!( @@ -1062,7 +1060,11 @@ mod tests { let (endpoint, _server) = start("0.0.0.0:0", mock).await?; let client = test_client(endpoint).await?; - let _ = client.streaming_pull(subscription).start().next().await; + let _ = client + .streaming_pull("projects/p/subscriptions/s") + .start() + .next() + .await; Ok(()) }