From f1ae62967f6b010672c593d60fd53aeca073fea6 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 4 Feb 2026 15:00:14 +0000 Subject: [PATCH 01/14] fix: token cache exists after exaustive retries --- src/auth/src/token_cache.rs | 6 ++- src/auth/tests/credentials.rs | 77 +++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/auth/src/token_cache.rs b/src/auth/src/token_cache.rs index 214068fcaa..c74bc39b65 100644 --- a/src/auth/src/token_cache.rs +++ b/src/auth/src/token_cache.rs @@ -153,8 +153,10 @@ async fn refresh_task( } None => { // The retry policy has been used already by the inner token provider. - // If it ended in an error, just quit the background task. - break; + // If it ended in an error, the background task will wait for a while + // and try again. This allows the task to eventually recover if the + // error was transient but exhausted all the retries. + sleep(SHORT_REFRESH_SLACK).await; } } } diff --git a/src/auth/tests/credentials.rs b/src/auth/tests/credentials.rs index 6b3b72eb6b..c143e6274b 100644 --- a/src/auth/tests/credentials.rs +++ b/src/auth/tests/credentials.rs @@ -965,4 +965,81 @@ mod tests { Ok(()) } + + #[tokio::test] + #[serial] + async fn test_credentials_refresh_recovers_after_outage() -> TestResult { + let server = Server::run(); + let addr = server.addr().to_string(); + let _e = ScopedEnv::set("GCE_METADATA_HOST", &addr); + + // initial token request: success (200 OK) + server.expect( + Expectation::matching(request::path( + "/computeMetadata/v1/instance/service-accounts/default/token", + )) + .times(1) + .respond_with(json_encoded(json!({ + "access_token": "token-1", + "expires_in": 1, // short lived to trigger refresh soon + "token_type": "Bearer" + }))), + ); + + let creds = MdsBuilder::default().build_access_token_credentials()?; + + // get initial token, this starts the refresh_task + let access_token = creds.access_token().await?; + assert!( + access_token.token.contains("token-1"), + "Expected token-1, got {}", + access_token.token + ); + + // set up outage: fail (503 Service Unavailable) + server.expect( + Expectation::matching(request::path( + "/computeMetadata/v1/instance/service-accounts/default/token", + )) + .times(1..) // called at least once + .respond_with(status_code(503)), + ); + + // wait for the token to be refreshed + // it should exhaust retries, fail, and (with the fix) wait for SHORT_REFRESH_SLACK (10s) + tokio::time::sleep(std::time::Duration::from_millis(3000)).await; + + // trying to get a token now should fail because retry was exhausted + let result = creds.headers(Extensions::new()).await; + assert!( + result.is_err(), + "expected error due to exhausted retries during outage, but got: {:?}", + result + ); + + // set up recovery: success (200 OK) + server.expect( + Expectation::matching(request::path( + "/computeMetadata/v1/instance/service-accounts/default/token", + )) + .respond_with(json_encoded(json!({ + "access_token": "token-2", + "expires_in": 3600, + "token_type": "Bearer" + }))), + ); + + // advance time long enough to pass through SHORT_REFRESH_SLACK (10s + some buffer) + tokio::time::sleep(std::time::Duration::from_secs(12)).await; + let result = creds.access_token().await; + // if it recovered, we should get token-2 + let access_token = result.expect("the credential should have recovered from the outage!"); + assert!( + access_token.token.contains("token-2"), + "Expected token-2 after recovery, but got: {}", + access_token.token + ); + + Ok(()) + } } From 2857f87a034e1a53d1c2765f4b0620d2b0fd0fd8 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 4 Feb 2026 16:29:56 +0000 Subject: [PATCH 02/14] impl: use exponential backoff --- src/auth/src/token_cache.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/auth/src/token_cache.rs b/src/auth/src/token_cache.rs index c74bc39b65..0dd2347324 100644 --- a/src/auth/src/token_cache.rs +++ b/src/auth/src/token_cache.rs @@ -17,6 +17,9 @@ use crate::Result; use crate::credentials::{CacheableResource, EntityTag}; use crate::token::{CachedTokenProvider, Token, TokenProvider}; +use gax::backoff_policy::BackoffPolicy; +use gax::exponential_backoff::ExponentialBackoff; +use gax::retry_state::RetryState; use http::Extensions; use std::sync::Arc; use tokio::sync::watch; @@ -116,6 +119,9 @@ async fn refresh_task( ) where T: TokenProvider + Send + Sync + 'static, { + let backoff_policy = ExponentialBackoff::default(); + let mut retry_state = RetryState::default(); + loop { let token_result = token_provider.token().await; let expiry = token_result.as_ref().ok().map(|t| t.expires_at); @@ -128,6 +134,7 @@ async fn refresh_task( match expiry { Some(Some(expiry)) => { + retry_state = RetryState::default(); // reset retry state let time_until_expiry = expiry.checked_duration_since(Instant::now()); match time_until_expiry { @@ -156,7 +163,9 @@ async fn refresh_task( // If it ended in an error, the background task will wait for a while // and try again. This allows the task to eventually recover if the // error was transient but exhausted all the retries. - sleep(SHORT_REFRESH_SLACK).await; + retry_state.attempt_count += 1; + let delay = backoff_policy.on_failure(&retry_state); + sleep(delay).await; } } } From 0d93d4e54a853e2a6f9e43bb77e69158a3168ddf Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 5 Feb 2026 02:30:36 +0000 Subject: [PATCH 03/14] impl: transient auth errors should stay transient --- src/auth/src/credentials/external_account.rs | 4 +- .../src/credentials/idtoken/user_account.rs | 2 +- src/auth/src/credentials/mds.rs | 2 +- src/auth/src/credentials/user_account.rs | 2 +- src/auth/src/retry.rs | 21 +++--- src/auth/src/token_cache.rs | 65 +++++++++---------- 6 files changed, 47 insertions(+), 49 deletions(-) diff --git a/src/auth/src/credentials/external_account.rs b/src/auth/src/credentials/external_account.rs index 50a61591cb..e156da4abe 100644 --- a/src/auth/src/credentials/external_account.rs +++ b/src/auth/src/credentials/external_account.rs @@ -1959,7 +1959,7 @@ mod tests { .unwrap(); let err = creds.headers(Extensions::new()).await.unwrap_err(); - assert!(!err.is_transient()); + assert!(err.is_transient()); sts_server.verify_and_clear(); subject_token_server.verify_and_clear(); } @@ -2084,7 +2084,7 @@ mod tests { .unwrap(); let err = creds.headers(Extensions::new()).await.unwrap_err(); - assert!(!err.is_transient()); + assert!(err.is_transient()); sts_server.verify_and_clear(); } diff --git a/src/auth/src/credentials/idtoken/user_account.rs b/src/auth/src/credentials/idtoken/user_account.rs index 6ff45142eb..75a559d508 100644 --- a/src/auth/src/credentials/idtoken/user_account.rs +++ b/src/auth/src/credentials/idtoken/user_account.rs @@ -422,7 +422,7 @@ mod tests { .build()?; let err = credentials.id_token().await.unwrap_err(); - assert!(!err.is_transient()); + assert!(err.is_transient()); Ok(()) } diff --git a/src/auth/src/credentials/mds.rs b/src/auth/src/credentials/mds.rs index 454476ddb4..cb7011a2d7 100644 --- a/src/auth/src/credentials/mds.rs +++ b/src/auth/src/credentials/mds.rs @@ -483,7 +483,7 @@ mod tests { .build_token_provider(); let err = provider.token().await.unwrap_err(); - assert!(!err.is_transient()); + assert!(err.is_transient()); server.verify_and_clear(); Ok(()) } diff --git a/src/auth/src/credentials/user_account.rs b/src/auth/src/credentials/user_account.rs index d0ea18fc5e..304fc0b3f6 100644 --- a/src/auth/src/credentials/user_account.rs +++ b/src/auth/src/credentials/user_account.rs @@ -611,7 +611,7 @@ mod tests { .build()?; let err = credentials.headers(Extensions::new()).await.unwrap_err(); - assert!(!err.is_transient()); + assert!(err.is_transient()); server.verify_and_clear(); Ok(()) } diff --git a/src/auth/src/retry.rs b/src/auth/src/retry.rs index dbc6534d12..d29144c89e 100644 --- a/src/auth/src/retry.rs +++ b/src/auth/src/retry.rs @@ -95,7 +95,7 @@ impl Builder { #[async_trait::async_trait] impl TokenProvider for TokenProviderWithRetry { async fn token(&self) -> Result { - self.execute_retry_loop(self.retry_policy.clone()).await + self.execute_retry_loop().await } } @@ -103,7 +103,7 @@ impl TokenProviderWithRetry where T: TokenProvider, { - async fn execute_retry_loop(&self, retry_policy: Arc) -> Result { + async fn execute_retry_loop(&self) -> Result { let inner = self.inner.clone(); let sleep = async |d| tokio::time::sleep(d).await; let fetch_token = move |_| { @@ -121,7 +121,7 @@ where sleep, true, // token fetching is idempotent self.retry_throttler.clone(), - retry_policy, + self.retry_policy.clone(), self.backoff_policy.clone(), ) .await @@ -133,14 +133,15 @@ where return CredentialsError::from_source(false, e); } - let msg = match e + match e .source() .and_then(|s| s.downcast_ref::()) { - Some(cred_error) if cred_error.is_transient() => constants::RETRY_EXHAUSTED_ERROR, - _ => constants::TOKEN_FETCH_FAILED_ERROR, - }; - CredentialsError::new(false, msg, e) + Some(cred_error) if cred_error.is_transient() => { + CredentialsError::new(true, constants::RETRY_EXHAUSTED_ERROR, e) + } + _ => CredentialsError::new(false, constants::TOKEN_FETCH_FAILED_ERROR, e), + } } } @@ -286,7 +287,7 @@ mod tests { .build(mock_provider); let error = provider.token().await.unwrap_err(); - assert!(!error.is_transient()); + assert!(error.is_transient()); let original_error = find_source_error::(&error).unwrap(); assert!(original_error.is_transient()); assert!(error.to_string().contains(constants::RETRY_EXHAUSTED_ERROR)); @@ -350,7 +351,7 @@ mod tests { let provider = Builder::default().build(mock_provider); let error = provider.token().await.unwrap_err(); - assert!(!error.is_transient()); + assert!(error.is_transient()); let original_error = find_source_error::(&error).unwrap(); assert!(original_error.is_transient()); } diff --git a/src/auth/src/token_cache.rs b/src/auth/src/token_cache.rs index 0dd2347324..833dc7e2eb 100644 --- a/src/auth/src/token_cache.rs +++ b/src/auth/src/token_cache.rs @@ -17,9 +17,6 @@ use crate::Result; use crate::credentials::{CacheableResource, EntityTag}; use crate::token::{CachedTokenProvider, Token, TokenProvider}; -use gax::backoff_policy::BackoffPolicy; -use gax::exponential_backoff::ExponentialBackoff; -use gax::retry_state::RetryState; use http::Extensions; use std::sync::Arc; use tokio::sync::watch; @@ -119,53 +116,53 @@ async fn refresh_task( ) where T: TokenProvider + Send + Sync + 'static, { - let backoff_policy = ExponentialBackoff::default(); - let mut retry_state = RetryState::default(); - loop { let token_result = token_provider.token().await; - let expiry = token_result.as_ref().ok().map(|t| t.expires_at); - let tagged = token_result.map(|token| { + let tagged = token_result.clone().map(|token| { let entity_tag = EntityTag::new(); (token, entity_tag) }); let _ = tx_token.send(Some(tagged)); - match expiry { - Some(Some(expiry)) => { - retry_state = RetryState::default(); // reset retry state - let time_until_expiry = expiry.checked_duration_since(Instant::now()); - - match time_until_expiry { - None => { - // We were given a token that is expired, or expires in less than 10 seconds. - // We will immediately restart the loop, and fetch a new token. - } - Some(time_until_expiry) => { - if time_until_expiry > NORMAL_REFRESH_SLACK { - sleep(time_until_expiry - NORMAL_REFRESH_SLACK).await; - } else if time_until_expiry > SHORT_REFRESH_SLACK { - // If expiry is less than 4 mins, try to refresh every 10 seconds - // This is to handle cases where MDS **repeatedly** returns about to expire tokens. - sleep(SHORT_REFRESH_SLACK).await; + match token_result { + Ok(token) => { + match token.expires_at { + Some(expiry) => { + let time_until_expiry = expiry.checked_duration_since(Instant::now()); + + match time_until_expiry { + None => { + // We were given a token that is expired, or expires in less than 10 seconds. + // We will immediately restart the loop, and fetch a new token. + } + Some(time_until_expiry) => { + if time_until_expiry > NORMAL_REFRESH_SLACK { + sleep(time_until_expiry - NORMAL_REFRESH_SLACK).await; + } else if time_until_expiry > SHORT_REFRESH_SLACK { + // If expiry is less than 4 mins, try to refresh every 10 seconds + // This is to handle cases where MDS **repeatedly** returns about to expire tokens. + sleep(SHORT_REFRESH_SLACK).await; + } + } } } + None => { + // If there is no expiry, the token is valid forever, so no need to refresh + // TODO(#1553): Validate that all auth backends provide expiry and make expiry not optional. + break; + } } } - Some(None) => { - // If there is no expiry, the token is valid forever, so no need to refresh - // TODO(#1553): Validate that all auth backends provide expiry and make expiry not optional. - break; - } - None => { + Err(err) => { // The retry policy has been used already by the inner token provider. // If it ended in an error, the background task will wait for a while // and try again. This allows the task to eventually recover if the // error was transient but exhausted all the retries. - retry_state.attempt_count += 1; - let delay = backoff_policy.on_failure(&retry_state); - sleep(delay).await; + if !err.is_transient() { + break; + } + sleep(SHORT_REFRESH_SLACK).await; } } } From 1b038be0c3de7e988a70508fae72f690fba2962c Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 5 Feb 2026 14:09:06 +0000 Subject: [PATCH 04/14] impl: minimize changes --- src/auth/src/retry.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/auth/src/retry.rs b/src/auth/src/retry.rs index d29144c89e..e1a2f264ce 100644 --- a/src/auth/src/retry.rs +++ b/src/auth/src/retry.rs @@ -95,7 +95,7 @@ impl Builder { #[async_trait::async_trait] impl TokenProvider for TokenProviderWithRetry { async fn token(&self) -> Result { - self.execute_retry_loop().await + self.execute_retry_loop(self.retry_policy.clone()).await } } @@ -103,7 +103,7 @@ impl TokenProviderWithRetry where T: TokenProvider, { - async fn execute_retry_loop(&self) -> Result { + async fn execute_retry_loop(&self, retry_policy: Arc) -> Result { let inner = self.inner.clone(); let sleep = async |d| tokio::time::sleep(d).await; let fetch_token = move |_| { @@ -121,7 +121,7 @@ where sleep, true, // token fetching is idempotent self.retry_throttler.clone(), - self.retry_policy.clone(), + retry_policy, self.backoff_policy.clone(), ) .await From b7472619a52b69d408c2895b93ce9d33f6fed1c8 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 5 Feb 2026 17:26:48 +0000 Subject: [PATCH 05/14] impl: simplify code --- src/auth/src/token_cache.rs | 47 +++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/src/auth/src/token_cache.rs b/src/auth/src/token_cache.rs index d306ee187d..ab1984ef9c 100644 --- a/src/auth/src/token_cache.rs +++ b/src/auth/src/token_cache.rs @@ -101,6 +101,7 @@ async fn refresh_task( { loop { let token_result = token_provider.token().await; + let expiry = token_result.as_ref().map(|t| t.expires_at); let tagged = token_result.clone().map(|token| { let entity_tag = EntityTag::new(); (token, entity_tag) @@ -108,35 +109,31 @@ async fn refresh_task( let _ = tx_token.send(Some(tagged)); - match token_result { - Ok(token) => { - match token.expires_at { - Some(expiry) => { - let time_until_expiry = expiry.checked_duration_since(Instant::now()); - - match time_until_expiry { - None => { - // We were given a token that is expired, or expires in less than 10 seconds. - // We will immediately restart the loop, and fetch a new token. - } - Some(time_until_expiry) => { - if time_until_expiry > NORMAL_REFRESH_SLACK { - sleep(time_until_expiry - NORMAL_REFRESH_SLACK).await; - } else if time_until_expiry > SHORT_REFRESH_SLACK { - // If expiry is less than 4 mins, try to refresh every 10 seconds - // This is to handle cases where MDS **repeatedly** returns about to expire tokens. - sleep(SHORT_REFRESH_SLACK).await; - } - } - } - } + match expiry { + Ok(Some(expiry)) => { + let time_until_expiry = expiry.checked_duration_since(Instant::now()); + + match time_until_expiry { None => { - // If there is no expiry, the token is valid forever, so no need to refresh - // TODO(#1553): Validate that all auth backends provide expiry and make expiry not optional. - break; + // We were given a token that is expired, or expires in less than 10 seconds. + // We will immediately restart the loop, and fetch a new token. + } + Some(time_until_expiry) => { + if time_until_expiry > NORMAL_REFRESH_SLACK { + sleep(time_until_expiry - NORMAL_REFRESH_SLACK).await; + } else if time_until_expiry > SHORT_REFRESH_SLACK { + // If expiry is less than 4 mins, try to refresh every 10 seconds + // This is to handle cases where MDS **repeatedly** returns about to expire tokens. + sleep(SHORT_REFRESH_SLACK).await; + } } } } + Ok(None) => { + // If there is no expiry, the token is valid forever, so no need to refresh + // TODO(#1553): Validate that all auth backends provide expiry and make expiry not optional. + break; + } Err(err) => { // The retry policy has been used already by the inner token provider. // If it ended in an error, the background task will wait for a while From 2cf908e1798901a33348c7ced87247b214e9b1a1 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 5 Feb 2026 20:26:28 +0000 Subject: [PATCH 06/14] fix: block refresh_task instead of sleeping --- src/auth/src/token_cache.rs | 75 +++++++++++++++++++++++++++++++++---- 1 file changed, 68 insertions(+), 7 deletions(-) diff --git a/src/auth/src/token_cache.rs b/src/auth/src/token_cache.rs index ab1984ef9c..42a72984a1 100644 --- a/src/auth/src/token_cache.rs +++ b/src/auth/src/token_cache.rs @@ -32,6 +32,7 @@ const SHORT_REFRESH_SLACK: Duration = Duration::from_secs(10); #[derive(Debug, Clone)] pub(crate) struct TokenCache { rx_token: watch::Receiver>>, + notify: Arc, } impl TokenCache { @@ -41,13 +42,15 @@ impl TokenCache { { let (tx_token, rx_token) = watch::channel::>>(None); let token_provider = Arc::new(inner); + let notify = Arc::new(tokio::sync::Notify::new()); - tokio::spawn(refresh_task(token_provider, tx_token)); + tokio::spawn(refresh_task(token_provider, tx_token, notify.clone())); - Self { rx_token } + Self { rx_token, notify } } async fn latest_token_and_entity_tag(&self) -> Result<(Token, EntityTag)> { + self.notify.notify_one(); let mut rx = self.rx_token.clone(); let token_result = rx.borrow_and_update().clone(); if let Some(token_result) = token_result { @@ -96,6 +99,7 @@ async fn wait_for_next_token( async fn refresh_task( token_provider: Arc, tx_token: watch::Sender>>, + notify: Arc, ) where T: TokenProvider + Send + Sync + 'static, { @@ -142,7 +146,7 @@ async fn refresh_task( if !err.is_transient() { break; } - sleep(SHORT_REFRESH_SLACK).await; + notify.notified().await; } } } @@ -378,9 +382,10 @@ mod tests { .return_once(|| Ok(token2_clone)); let (tx, mut rx) = watch::channel::>>(None); + let notify = Arc::new(tokio::sync::Notify::new()); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx).await; + refresh_task(Arc::new(mock), tx, notify).await; }); // Give the refresh task a chance to run @@ -437,13 +442,14 @@ mod tests { .return_once(|| Ok(token3_clone)); let (tx, mut rx) = watch::channel::>>(None); + let notify = Arc::new(tokio::sync::Notify::new()); // check that channel has None before refresh task starts let actual = rx.borrow().clone(); assert!(actual.is_none(), "{actual:?}"); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx).await; + refresh_task(Arc::new(mock), tx, notify).await; }); rx.changed().await.unwrap(); @@ -511,13 +517,14 @@ mod tests { .return_once(|| Ok(token2_clone)); let (tx, mut rx) = watch::channel::>>(None); + let notify = Arc::new(tokio::sync::Notify::new()); // check that channel has None before refresh task starts let actual = rx.borrow().clone(); assert!(actual.is_none(), "{actual:?}"); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx).await; + refresh_task(Arc::new(mock), tx, notify).await; }); rx.changed().await.unwrap(); @@ -576,13 +583,14 @@ mod tests { .return_once(|| Ok(token2_clone)); let (tx, mut rx) = watch::channel::>>(None); + let notify = Arc::new(tokio::sync::Notify::new()); // check that channel has None before refresh task starts let actual = rx.borrow().clone(); assert!(actual.is_none(), "{actual:?}"); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx).await; + refresh_task(Arc::new(mock), tx, notify).await; }); rx.changed().await.unwrap(); @@ -601,6 +609,59 @@ mod tests { assert_eq!(actual, token2); } + #[tokio::test(start_paused = true)] + async fn refresh_task_blocks_on_transient_error_and_wakes_up_on_request() -> TestResult { + let now = Instant::now(); + + let token = Token { + token: "token-1".to_string(), + token_type: "Bearer".to_string(), + expires_at: Some(now + TOKEN_VALID_DURATION), + metadata: None, + }; + + let mut mock = MockTokenProvider::new(); + // 1st request succeeds + mock.expect_token() + .times(1) + .return_once(move || Ok(token.clone())); + + // 2nd request (triggered by refresh loop) fails with transient error + mock.expect_token() + .times(1) + .return_once(|| Err(CredentialsError::from_msg(true, "transient error"))); + + let token = Token { + token: "token-2".to_string(), + token_type: "Bearer".to_string(), + expires_at: Some(now + 2 * TOKEN_VALID_DURATION), + metadata: None, + }; + + // 3rd request (triggered by waking up the task) succeeds + mock.expect_token() + .times(1) + .return_once(move || Ok(token.clone())); + + let cache = TokenCache::new(mock); + + // fetch an initial token + let actual = get_cached_token(cache.token(Extensions::new()).await.unwrap())?; + assert_eq!(actual.token, "token-1"); + + // advance time to force expiration, which wakes up the background task. + let sleep = TOKEN_VALID_DURATION.add(Duration::from_secs(10)); + tokio::time::advance(sleep).await; + + // try again. Last fetch failed and background loop is blocked, + // so calling `token` triggers `self.notify.notify_one()`, + // waking up the blocked background task, which will successfully fetch `token2`. + let actual = get_cached_token(cache.token(Extensions::new()).await.unwrap())?; + assert_eq!(actual.token, "token-2"); + + Ok(()) + } + #[derive(Clone, Debug)] struct FakeTokenProvider { result: Result, From 4b23384779264ac8104fc06032dadfcaab723e1e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 5 Feb 2026 21:23:40 +0000 Subject: [PATCH 07/14] impl: come back to sleep approach --- src/auth/src/token_cache.rs | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/src/auth/src/token_cache.rs b/src/auth/src/token_cache.rs index 42a72984a1..a665c2f6e0 100644 --- a/src/auth/src/token_cache.rs +++ b/src/auth/src/token_cache.rs @@ -32,7 +32,6 @@ const SHORT_REFRESH_SLACK: Duration = Duration::from_secs(10); #[derive(Debug, Clone)] pub(crate) struct TokenCache { rx_token: watch::Receiver>>, - notify: Arc, } impl TokenCache { @@ -42,15 +41,13 @@ impl TokenCache { { let (tx_token, rx_token) = watch::channel::>>(None); let token_provider = Arc::new(inner); - let notify = Arc::new(tokio::sync::Notify::new()); - tokio::spawn(refresh_task(token_provider, tx_token, notify.clone())); + tokio::spawn(refresh_task(token_provider, tx_token)); - Self { rx_token, notify } + Self { rx_token } } async fn latest_token_and_entity_tag(&self) -> Result<(Token, EntityTag)> { - self.notify.notify_one(); let mut rx = self.rx_token.clone(); let token_result = rx.borrow_and_update().clone(); if let Some(token_result) = token_result { @@ -99,7 +96,6 @@ async fn wait_for_next_token( async fn refresh_task( token_provider: Arc, tx_token: watch::Sender>>, - notify: Arc, ) where T: TokenProvider + Send + Sync + 'static, { @@ -146,7 +142,7 @@ async fn refresh_task( if !err.is_transient() { break; } - notify.notified().await; + sleep(SHORT_REFRESH_SLACK).await; } } } @@ -382,10 +378,9 @@ mod tests { .return_once(|| Ok(token2_clone)); let (tx, mut rx) = watch::channel::>>(None); - let notify = Arc::new(tokio::sync::Notify::new()); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx, notify).await; + refresh_task(Arc::new(mock), tx).await; }); // Give the refresh task a chance to run @@ -442,14 +437,13 @@ mod tests { .return_once(|| Ok(token3_clone)); let (tx, mut rx) = watch::channel::>>(None); - let notify = Arc::new(tokio::sync::Notify::new()); // check that channel has None before refresh task starts let actual = rx.borrow().clone(); assert!(actual.is_none(), "{actual:?}"); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx, notify).await; + refresh_task(Arc::new(mock), tx).await; }); rx.changed().await.unwrap(); @@ -517,14 +511,13 @@ mod tests { .return_once(|| Ok(token2_clone)); let (tx, mut rx) = watch::channel::>>(None); - let notify = Arc::new(tokio::sync::Notify::new()); // check that channel has None before refresh task starts let actual = rx.borrow().clone(); assert!(actual.is_none(), "{actual:?}"); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx, notify).await; + refresh_task(Arc::new(mock), tx).await; }); rx.changed().await.unwrap(); @@ -583,14 +576,13 @@ mod tests { .return_once(|| Ok(token2_clone)); let (tx, mut rx) = watch::channel::>>(None); - let notify = Arc::new(tokio::sync::Notify::new()); // check that channel has None before refresh task starts let actual = rx.borrow().clone(); assert!(actual.is_none(), "{actual:?}"); tokio::spawn(async move { - refresh_task(Arc::new(mock), tx, notify).await; + refresh_task(Arc::new(mock), tx).await; }); rx.changed().await.unwrap(); @@ -610,7 +602,7 @@ mod tests { } #[tokio::test(start_paused = true)] - async fn refresh_task_blocks_on_transient_error_and_wakes_up_on_request() -> TestResult { + async fn refresh_task_sleeps_on_transient_error_and_recovers_on_next_loop() -> TestResult { let now = Instant::now(); let token = Token { @@ -638,7 +630,7 @@ mod tests { metadata: None, }; - // 3rd request (triggered by waking up the task) succeeds + // 3rd request (triggered by next loop) succeeds mock.expect_token() .times(1) .return_once(move || Ok(token.clone())); @@ -653,9 +645,14 @@ mod tests { let sleep = TOKEN_VALID_DURATION.add(Duration::from_secs(10)); tokio::time::advance(sleep).await; - // try again. Last fetch failed and background loop is blocked, - // so calling `token` triggers `self.notify.notify_one()`, - // waking up the blocked background task, which will successfully fetch `token2`. + let result = cache.token(Extensions::new()).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("transient error")); + + // Wait another SHORT_REFRESH_SLACK + buffer for the background loop to try again and recover + tokio::time::advance(SHORT_REFRESH_SLACK.add(Duration::from_secs(10))).await; + tokio::task::yield_now().await; + let actual = get_cached_token(cache.token(Extensions::new()).await.unwrap())?; assert_eq!(actual.token, "token-2"); From 46bd945a2c03d04806179ada4b61a8cbdd5c2f6e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 6 Feb 2026 15:55:06 +0000 Subject: [PATCH 08/14] fix: use tokio::time::advance instead of sleep --- src/auth/tests/credentials.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/auth/tests/credentials.rs b/src/auth/tests/credentials.rs index 388cf40779..3c3d6457bb 100644 --- a/src/auth/tests/credentials.rs +++ b/src/auth/tests/credentials.rs @@ -966,7 +966,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(start_paused = true)] #[serial] async fn test_credentials_refresh_recovers_after_outage() -> TestResult { let server = Server::run(); @@ -1007,7 +1007,8 @@ mod tests { // wait for the token to be refreshed // it should exhaust retries, fail, and (with the fix) wait for SHORT_REFRESH_SLACK (10s) - tokio::time::sleep(std::time::Duration::from_millis(3000)).await; + tokio::time::advance(std::time::Duration::from_millis(3000)).await; + tokio::task::yield_now().await; // trying to get a token now should fail because retry was exhausted let result = creds.headers(Extensions::new()).await; @@ -1030,7 +1031,17 @@ mod tests { ); // advance time long enough to pass through SHORT_REFRESH_SLACK (10s + some buffer) - tokio::time::sleep(std::time::Duration::from_secs(12)).await; + tokio::time::advance(std::time::Duration::from_secs(12)).await; + + // yield tasks to let the refresh task run and http request layers work + for _ in 0..30 { + tokio::task::yield_now().await; + let result = creds.headers(Extensions::new()).await; + if result.is_ok() { + break; + } + } + let result = creds.access_token().await; // if it recovered, we should get token-2 let access_token = result.expect("the credential should have recovered from the outage!"); From 4fd40009aa22cb3fceef7a21b0258ce23d6e9440 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 6 Feb 2026 15:55:40 +0000 Subject: [PATCH 09/14] test: cover changes on how auth errors are treated on retry --- src/auth/src/retry.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/auth/src/retry.rs b/src/auth/src/retry.rs index e1a2f264ce..85e57d4831 100644 --- a/src/auth/src/retry.rs +++ b/src/auth/src/retry.rs @@ -494,6 +494,26 @@ mod tests { ); } + #[test_case(false, "invalid credentials"; "permanent auth error")] + #[test_case(true, "transient network error"; "transient auth error")] + fn test_map_retry_error_auth_error(transient: bool, message: &str) { + // 1. Create an authentication error. + let error = CredentialsError::from_msg(transient, message); + let error = gax::error::Error::authentication(error); + let error_string = error.to_string(); + + // 2. Call the function under test. + let credentials_error = + TokenProviderWithRetry::::map_retry_error(error); + + // 3. Assert that the resulting error is transient or not like the original error and wraps the original error. + assert_eq!(credentials_error.is_transient(), transient); + assert_eq!( + credentials_error.source().unwrap().to_string(), + error_string + ); + } + #[test] fn test_unwind_safe() { assert_impl_all!(Builder: std::panic::UnwindSafe, std::panic::RefUnwindSafe); From f512cce03c4075dea6145e08b6af7f2697516024 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 6 Feb 2026 16:05:48 +0000 Subject: [PATCH 10/14] test: increase yield time --- src/auth/tests/credentials.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/auth/tests/credentials.rs b/src/auth/tests/credentials.rs index 3c3d6457bb..c827398b16 100644 --- a/src/auth/tests/credentials.rs +++ b/src/auth/tests/credentials.rs @@ -1034,7 +1034,7 @@ mod tests { tokio::time::advance(std::time::Duration::from_secs(12)).await; // yield tasks to let the refresh task run and http request layers work - for _ in 0..30 { + for _ in 0..100 { tokio::task::yield_now().await; let result = creds.headers(Extensions::new()).await; if result.is_ok() { From 22c04493829b18316f8aa6ed8589dda708ba1115 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 6 Feb 2026 16:20:25 +0000 Subject: [PATCH 11/14] fix: fmt all the things --- src/auth/src/retry.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/auth/src/retry.rs b/src/auth/src/retry.rs index 85e57d4831..9fa705799a 100644 --- a/src/auth/src/retry.rs +++ b/src/auth/src/retry.rs @@ -503,8 +503,7 @@ mod tests { let error_string = error.to_string(); // 2. Call the function under test. - let credentials_error = - TokenProviderWithRetry::::map_retry_error(error); + let credentials_error = TokenProviderWithRetry::::map_retry_error(error); // 3. Assert that the resulting error is transient or not like the original error and wraps the original error. assert_eq!(credentials_error.is_transient(), transient); From 35fd5450d43d76abfd227dd13a80b604cbba70dd Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 6 Feb 2026 20:39:40 +0000 Subject: [PATCH 12/14] fix: address review comments --- src/auth/src/constants.rs | 2 +- src/auth/src/credentials/external_account.rs | 2 +- src/auth/src/credentials/idtoken/user_account.rs | 2 +- src/auth/src/credentials/mds.rs | 2 +- src/auth/src/credentials/user_account.rs | 2 +- src/auth/src/retry.rs | 9 +++++++-- src/auth/src/token_cache.rs | 15 +++++++++++---- 7 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/auth/src/constants.rs b/src/auth/src/constants.rs index 070e197960..5d94782917 100644 --- a/src/auth/src/constants.rs +++ b/src/auth/src/constants.rs @@ -29,7 +29,7 @@ pub(crate) const JWT_TOKEN_TYPE: &str = "urn:ietf:params:oauth:token-type:jwt"; /// SAML2 Token OAuth Token Type pub(crate) const SAML2_TOKEN_TYPE: &str = "urn:ietf:params:oauth:token-type:saml2"; -pub(crate) const RETRY_EXHAUSTED_ERROR: &str = "All retry attempts to fetch the token were exhausted. Subsequent calls with this credential will also fail."; +pub(crate) const RETRY_EXHAUSTED_ERROR: &str = "All retry attempts to fetch the token were exhausted with transient errors. Subsequent calls with this credential might succeed."; pub(crate) const TOKEN_FETCH_FAILED_ERROR: &str = "Request to fetch the token failed. Subsequent calls with this credential will also fail."; diff --git a/src/auth/src/credentials/external_account.rs b/src/auth/src/credentials/external_account.rs index 753e1be54b..f2f9f28877 100644 --- a/src/auth/src/credentials/external_account.rs +++ b/src/auth/src/credentials/external_account.rs @@ -1959,7 +1959,7 @@ mod tests { .unwrap(); let err = creds.headers(Extensions::new()).await.unwrap_err(); - assert!(err.is_transient()); + assert!(err.is_transient(), "{err:?}"); sts_server.verify_and_clear(); subject_token_server.verify_and_clear(); } diff --git a/src/auth/src/credentials/idtoken/user_account.rs b/src/auth/src/credentials/idtoken/user_account.rs index 75a559d508..731e77c9e5 100644 --- a/src/auth/src/credentials/idtoken/user_account.rs +++ b/src/auth/src/credentials/idtoken/user_account.rs @@ -422,7 +422,7 @@ mod tests { .build()?; let err = credentials.id_token().await.unwrap_err(); - assert!(err.is_transient()); + assert!(err.is_transient(), "{err:?}"); Ok(()) } diff --git a/src/auth/src/credentials/mds.rs b/src/auth/src/credentials/mds.rs index 84c0e14918..73f5442e0c 100644 --- a/src/auth/src/credentials/mds.rs +++ b/src/auth/src/credentials/mds.rs @@ -483,7 +483,7 @@ mod tests { .build_token_provider(); let err = provider.token().await.unwrap_err(); - assert!(err.is_transient()); + assert!(err.is_transient(), "{err:?}"); server.verify_and_clear(); Ok(()) } diff --git a/src/auth/src/credentials/user_account.rs b/src/auth/src/credentials/user_account.rs index e7d6d37f6d..6f37781678 100644 --- a/src/auth/src/credentials/user_account.rs +++ b/src/auth/src/credentials/user_account.rs @@ -611,7 +611,7 @@ mod tests { .build()?; let err = credentials.headers(Extensions::new()).await.unwrap_err(); - assert!(err.is_transient()); + assert!(err.is_transient(), "{err:?}"); server.verify_and_clear(); Ok(()) } diff --git a/src/auth/src/retry.rs b/src/auth/src/retry.rs index 9fa705799a..fbd512d954 100644 --- a/src/auth/src/retry.rs +++ b/src/auth/src/retry.rs @@ -506,10 +506,15 @@ mod tests { let credentials_error = TokenProviderWithRetry::::map_retry_error(error); // 3. Assert that the resulting error is transient or not like the original error and wraps the original error. - assert_eq!(credentials_error.is_transient(), transient); + assert_eq!( + credentials_error.is_transient(), + transient, + "{credentials_error:?}" + ); assert_eq!( credentials_error.source().unwrap().to_string(), - error_string + error_string, + "{credentials_error:?}" ); } diff --git a/src/auth/src/token_cache.rs b/src/auth/src/token_cache.rs index a665c2f6e0..5f384299e7 100644 --- a/src/auth/src/token_cache.rs +++ b/src/auth/src/token_cache.rs @@ -135,10 +135,17 @@ async fn refresh_task( break; } Err(err) => { - // The retry policy has been used already by the inner token provider. - // If it ended in an error, the background task will wait for a while - // and try again. This allows the task to eventually recover if the - // error was transient but exhausted all the retries. + // On transient errors, even if the retry policy is exhausted, + // we want to continue running this retry loop. + // This loop cannot stop because that may leave the + // credentials in an unrecoverable state (see #4541). + // We considered using a notification to wake up the next time + // a caller wants to retrieve a token, but that seemed prone to + // deadlocks. We may implement this as an improvement (#4593). + // On permanent errors, then there is really no point in trying + // again, by definition of "permanent". If the error was misclassified + // as permanent, that is a bug in the retry policy and better fixed + // there than implemented as a workaround here. if !err.is_transient() { break; } From d380f85a935b2e91434f72bcf42c21746ccbb7ba Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 6 Feb 2026 20:41:50 +0000 Subject: [PATCH 13/14] fix: add debug msg on assert --- src/auth/src/credentials/external_account.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/auth/src/credentials/external_account.rs b/src/auth/src/credentials/external_account.rs index f2f9f28877..8a0cf776b5 100644 --- a/src/auth/src/credentials/external_account.rs +++ b/src/auth/src/credentials/external_account.rs @@ -2084,7 +2084,7 @@ mod tests { .unwrap(); let err = creds.headers(Extensions::new()).await.unwrap_err(); - assert!(err.is_transient()); + assert!(err.is_transient(), "{err:?}"); sts_server.verify_and_clear(); } From f8564bba5f718dee2a7ce050c2de53695d5db47e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 6 Feb 2026 20:58:02 +0000 Subject: [PATCH 14/14] test: increase yield time to wait refresh and http calls --- src/auth/tests/credentials.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/auth/tests/credentials.rs b/src/auth/tests/credentials.rs index c827398b16..b005af78fd 100644 --- a/src/auth/tests/credentials.rs +++ b/src/auth/tests/credentials.rs @@ -1032,11 +1032,10 @@ mod tests { // advance time long enough to pass through SHORT_REFRESH_SLACK (10s + some buffer) tokio::time::advance(std::time::Duration::from_secs(12)).await; - // yield tasks to let the refresh task run and http request layers work - for _ in 0..100 { + for _ in 0..1000 { tokio::task::yield_now().await; - let result = creds.headers(Extensions::new()).await; + let result = creds.access_token().await; if result.is_ok() { break; }