diff --git a/.changeset/engine-perf-retry.md b/.changeset/engine-perf-retry.md new file mode 100644 index 00000000..f6a70974 --- /dev/null +++ b/.changeset/engine-perf-retry.md @@ -0,0 +1 @@ +---\n"@googleworkspace/cli": patch\n---\n\nfix(executor): reuse HTTP client across paginated requests and implement global retry for 429/5xx status codes diff --git a/src/client.rs b/src/client.rs index e0abe409..5786df09 100644 --- a/src/client.rs +++ b/src/client.rs @@ -24,29 +24,56 @@ const MAX_RETRIES: u32 = 3; /// or misconfigured server from hanging the process indefinitely. const MAX_RETRY_DELAY_SECS: u64 = 60; -/// Send an HTTP request with automatic retry on 429 (rate limit) responses. +/// Send an HTTP request with automatic retry on 429 (rate limit) and transient 5xx responses. /// Respects the `Retry-After` header; falls back to exponential backoff (1s, 2s, 4s). -pub async fn send_with_retry( - build_request: impl Fn() -> reqwest::RequestBuilder, -) -> Result { - for attempt in 0..MAX_RETRIES { - let resp = build_request().send().await?; - - if resp.status() != reqwest::StatusCode::TOO_MANY_REQUESTS { +pub async fn send_with_retry(build_request: F) -> anyhow::Result +where + F: Fn() -> anyhow::Result, +{ + // total attempts = MAX_RETRIES + 1 + for attempt in 0..=MAX_RETRIES { + let req_result = build_request()?.send().await; + + let resp = match req_result { + Ok(r) => r, + Err(e) if attempt < MAX_RETRIES && (e.is_timeout() || e.is_connect()) => { + let retry_after = compute_retry_delay(None, attempt); + tracing::debug!(error = %e, attempt, retry_after, "Retrying on network error"); + tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await; + continue; + } + Err(e) => return Err(e.into()), + }; + + let status = resp.status(); + if status.is_success() + || !matches!( + status, + reqwest::StatusCode::TOO_MANY_REQUESTS + | reqwest::StatusCode::INTERNAL_SERVER_ERROR + | reqwest::StatusCode::BAD_GATEWAY + | reqwest::StatusCode::SERVICE_UNAVAILABLE + ) + { return Ok(resp); } - let header_value = resp - .headers() - .get("retry-after") - .and_then(|v| v.to_str().ok()); - let retry_after = compute_retry_delay(header_value, attempt); - - tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await; + // If we still have retries, sleep and continue + if attempt < MAX_RETRIES { + let header_value = resp + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()); + let retry_after = compute_retry_delay(header_value, attempt); + + tracing::debug!(status = %status, attempt, retry_after, "Retrying on server error"); + tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await; + } else { + return Ok(resp); + } } - // Final attempt — return whatever we get - build_request().send().await + unreachable!("Loop should return on last attempt") } /// Compute the retry delay from a Retry-After header value and attempt number. diff --git a/src/executor.rs b/src/executor.rs index b6b8dadf..dc6e27fd 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -157,7 +157,7 @@ fn parse_and_validate_inputs( /// Build an HTTP request with auth, query params, page token, and body/multipart attachment. #[allow(clippy::too_many_arguments)] -async fn build_http_request( +fn build_http_request( client: &reqwest::Client, method: &RestMethod, input: &ExecutionInput, @@ -166,6 +166,7 @@ async fn build_http_request( page_token: Option<&str>, pages_fetched: u32, upload: &Option>, + resolved_file_size: Option, ) -> Result { let mut request = match method.http_method.as_str() { "GET" => client.get(&input.full_url), @@ -212,13 +213,11 @@ async fn build_http_request( build_multipart_bytes(&input.body, data, content_type)? } UploadSource::File { path, content_type } => { - let file_meta = tokio::fs::metadata(path).await.map_err(|e| { - GwsError::Validation(format!( - "Failed to get metadata for upload file '{}': {}", - path, e + let file_size = resolved_file_size.ok_or_else(|| { + GwsError::Other(anyhow::anyhow!( + "Internal error: file size not resolved for upload" )) })?; - let file_size = file_meta.len(); let media_mime = resolve_upload_mime(*content_type, Some(path), &input.body); build_multipart_stream(&input.body, path, file_size, &media_mime)? } @@ -431,27 +430,40 @@ pub async fn execute_method( return Ok(None); } + let client = crate::client::build_client()?; let mut page_token: Option = None; let mut pages_fetched: u32 = 0; let mut captured_values = Vec::new(); - loop { - let client = crate::client::build_client()?; - let request = build_http_request( - &client, - method, - &input, - token, - &auth_method, - page_token.as_deref(), - pages_fetched, - &upload, - ) - .await?; + // Resolve file metadata once before loop if uploading a file + let mut resolved_file_size: Option = None; + if let Some(UploadSource::File { path, .. }) = &upload { + resolved_file_size = Some(tokio::fs::metadata(path).await.map_err(|e| { + GwsError::Validation(format!("Failed to get metadata for upload file '{}': {}", path, e)) + })?.len()); + } + loop { let method_id = method.id.as_deref().unwrap_or("unknown"); let start = std::time::Instant::now(); - let response = request.send().await.context("HTTP request failed")?; + + // Wrap the request builder in send_with_retry to handle rate limits and transient 5xx + let response = crate::client::send_with_retry(|| { + Ok(build_http_request( + &client, + method, + &input, + token, + &auth_method, + page_token.as_deref(), + pages_fetched, + &upload, + resolved_file_size, + )?) + }) + .await + .context("HTTP request failed")?; + let latency_ms = start.elapsed().as_millis() as u64; let status = response.status(); @@ -2311,7 +2323,6 @@ async fn test_post_without_body_sets_content_length_zero() { 0, &None, ) - .await .unwrap(); let built = request.build().unwrap(); @@ -2351,7 +2362,6 @@ async fn test_post_with_body_does_not_add_content_length_zero() { 0, &None, ) - .await .unwrap(); let built = request.build().unwrap(); @@ -2389,7 +2399,6 @@ async fn test_get_does_not_set_content_length_zero() { 0, &None, ) - .await .unwrap(); let built = request.build().unwrap(); diff --git a/src/helpers/calendar.rs b/src/helpers/calendar.rs index cf28b249..f25adddf 100644 --- a/src/helpers/calendar.rs +++ b/src/helpers/calendar.rs @@ -332,7 +332,7 @@ async fn handle_agenda(matches: &ArgMatches) -> Result<(), GwsError> { ); let resp = crate::client::send_with_retry(|| { - client + Ok(client .get(&events_url) .query(&[ ("timeMin", time_min.as_str()), @@ -341,7 +341,7 @@ async fn handle_agenda(matches: &ArgMatches) -> Result<(), GwsError> { ("orderBy", "startTime"), ("maxResults", "50"), ]) - .bearer_auth(token) + .bearer_auth(token)) }) .await; diff --git a/src/helpers/gmail/mod.rs b/src/helpers/gmail/mod.rs index b9ed7c8e..24902891 100644 --- a/src/helpers/gmail/mod.rs +++ b/src/helpers/gmail/mod.rs @@ -335,10 +335,10 @@ pub(super) async fn fetch_message_metadata( ); let resp = crate::client::send_with_retry(|| { - client + Ok(client .get(&url) .bearer_auth(token) - .query(&[("format", "full")]) + .query(&[("format", "full")])) }) .await .map_err(|e| GwsError::Other(anyhow::anyhow!("Failed to fetch message: {e}")))?; diff --git a/src/helpers/gmail/reply.rs b/src/helpers/gmail/reply.rs index 211bf98e..cca2dd6c 100644 --- a/src/helpers/gmail/reply.rs +++ b/src/helpers/gmail/reply.rs @@ -144,9 +144,9 @@ pub(super) struct ReplyConfig { async fn fetch_user_email(client: &reqwest::Client, token: &str) -> Result { let resp = crate::client::send_with_retry(|| { - client + Ok(client .get("https://gmail.googleapis.com/gmail/v1/users/me/profile") - .bearer_auth(token) + .bearer_auth(token)) }) .await .map_err(|e| GwsError::Other(anyhow::anyhow!("Failed to fetch user profile: {e}")))?; diff --git a/src/helpers/gmail/triage.rs b/src/helpers/gmail/triage.rs index 8bc58804..54534e8b 100644 --- a/src/helpers/gmail/triage.rs +++ b/src/helpers/gmail/triage.rs @@ -106,7 +106,7 @@ pub async fn handle_triage(matches: &ArgMatches) -> Result<(), GwsError> { ); let get_resp = crate::client::send_with_retry(|| { - client.get(&get_url).bearer_auth(token) + Ok(client.get(&get_url).bearer_auth(token)) }) .await .ok()?;