Skip to content
1 change: 1 addition & 0 deletions .changeset/engine-perf-retry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
---\n"@googleworkspace/cli": patch\n---\n\nfix(executor): reuse HTTP client across paginated requests and implement global retry for 429/5xx status codes
61 changes: 44 additions & 17 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,56 @@ const MAX_RETRIES: u32 = 3;
/// or misconfigured server from hanging the process indefinitely.
const MAX_RETRY_DELAY_SECS: u64 = 60;

/// Send an HTTP request with automatic retry on 429 (rate limit) responses.
/// Send an HTTP request with automatic retry on 429 (rate limit) and transient 5xx responses.
/// Respects the `Retry-After` header; falls back to exponential backoff (1s, 2s, 4s).
pub async fn send_with_retry(
build_request: impl Fn() -> reqwest::RequestBuilder,
) -> Result<reqwest::Response, reqwest::Error> {
for attempt in 0..MAX_RETRIES {
let resp = build_request().send().await?;

if resp.status() != reqwest::StatusCode::TOO_MANY_REQUESTS {
pub async fn send_with_retry<F>(build_request: F) -> anyhow::Result<reqwest::Response>
where
F: Fn() -> anyhow::Result<reqwest::RequestBuilder>,
{
// total attempts = MAX_RETRIES + 1
for attempt in 0..=MAX_RETRIES {
let req_result = build_request()?.send().await;

let resp = match req_result {
Ok(r) => r,
Err(e) if attempt < MAX_RETRIES && (e.is_timeout() || e.is_connect()) => {
let retry_after = compute_retry_delay(None, attempt);
tracing::debug!(error = %e, attempt, retry_after, "Retrying on network error");
tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await;
continue;
}
Err(e) => return Err(e.into()),
};

let status = resp.status();
if status.is_success()
|| !matches!(
status,
reqwest::StatusCode::TOO_MANY_REQUESTS
| reqwest::StatusCode::INTERNAL_SERVER_ERROR
| reqwest::StatusCode::BAD_GATEWAY
| reqwest::StatusCode::SERVICE_UNAVAILABLE
)
{
return Ok(resp);
}

let header_value = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok());
let retry_after = compute_retry_delay(header_value, attempt);

tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await;
// If we still have retries, sleep and continue
if attempt < MAX_RETRIES {
let header_value = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok());
let retry_after = compute_retry_delay(header_value, attempt);

tracing::debug!(status = %status, attempt, retry_after, "Retrying on server error");
tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await;
} else {
return Ok(resp);
}
}

// Final attempt — return whatever we get
build_request().send().await
unreachable!("Loop should return on last attempt")
}

/// Compute the retry delay from a Retry-After header value and attempt number.
Expand Down
55 changes: 32 additions & 23 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fn parse_and_validate_inputs(

/// Build an HTTP request with auth, query params, page token, and body/multipart attachment.
#[allow(clippy::too_many_arguments)]
async fn build_http_request(
fn build_http_request(
client: &reqwest::Client,
method: &RestMethod,
input: &ExecutionInput,
Expand All @@ -166,6 +166,7 @@ async fn build_http_request(
page_token: Option<&str>,
pages_fetched: u32,
upload: &Option<UploadSource<'_>>,
resolved_file_size: Option<u64>,
) -> Result<reqwest::RequestBuilder, GwsError> {
let mut request = match method.http_method.as_str() {
"GET" => client.get(&input.full_url),
Expand Down Expand Up @@ -212,13 +213,11 @@ async fn build_http_request(
build_multipart_bytes(&input.body, data, content_type)?
}
UploadSource::File { path, content_type } => {
let file_meta = tokio::fs::metadata(path).await.map_err(|e| {
GwsError::Validation(format!(
"Failed to get metadata for upload file '{}': {}",
path, e
let file_size = resolved_file_size.ok_or_else(|| {
GwsError::Other(anyhow::anyhow!(
"Internal error: file size not resolved for upload"
))
})?;
let file_size = file_meta.len();
let media_mime = resolve_upload_mime(*content_type, Some(path), &input.body);
build_multipart_stream(&input.body, path, file_size, &media_mime)?
}
Expand Down Expand Up @@ -431,27 +430,40 @@ pub async fn execute_method(
return Ok(None);
}

let client = crate::client::build_client()?;
let mut page_token: Option<String> = None;
let mut pages_fetched: u32 = 0;
let mut captured_values = Vec::new();

loop {
let client = crate::client::build_client()?;
let request = build_http_request(
&client,
method,
&input,
token,
&auth_method,
page_token.as_deref(),
pages_fetched,
&upload,
)
.await?;
// Resolve file metadata once before loop if uploading a file
let mut resolved_file_size: Option<u64> = None;
if let Some(UploadSource::File { path, .. }) = &upload {
resolved_file_size = Some(tokio::fs::metadata(path).await.map_err(|e| {
GwsError::Validation(format!("Failed to get metadata for upload file '{}': {}", path, e))
})?.len());
}

loop {
let method_id = method.id.as_deref().unwrap_or("unknown");
let start = std::time::Instant::now();
let response = request.send().await.context("HTTP request failed")?;

// Wrap the request builder in send_with_retry to handle rate limits and transient 5xx
let response = crate::client::send_with_retry(|| {
Ok(build_http_request(
&client,
method,
&input,
token,
&auth_method,
page_token.as_deref(),
pages_fetched,
&upload,
resolved_file_size,
)?)
})
.await
.context("HTTP request failed")?;

let latency_ms = start.elapsed().as_millis() as u64;

let status = response.status();
Expand Down Expand Up @@ -2311,7 +2323,6 @@ async fn test_post_without_body_sets_content_length_zero() {
0,
&None,
)
.await
.unwrap();

let built = request.build().unwrap();
Expand Down Expand Up @@ -2351,7 +2362,6 @@ async fn test_post_with_body_does_not_add_content_length_zero() {
0,
&None,
)
.await
.unwrap();

let built = request.build().unwrap();
Expand Down Expand Up @@ -2389,7 +2399,6 @@ async fn test_get_does_not_set_content_length_zero() {
0,
&None,
)
.await
.unwrap();

let built = request.build().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/helpers/calendar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async fn handle_agenda(matches: &ArgMatches) -> Result<(), GwsError> {
);

let resp = crate::client::send_with_retry(|| {
client
Ok(client
.get(&events_url)
.query(&[
("timeMin", time_min.as_str()),
Expand All @@ -341,7 +341,7 @@ async fn handle_agenda(matches: &ArgMatches) -> Result<(), GwsError> {
("orderBy", "startTime"),
("maxResults", "50"),
])
.bearer_auth(token)
.bearer_auth(token))
})
.await;

Expand Down
4 changes: 2 additions & 2 deletions src/helpers/gmail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,10 @@ pub(super) async fn fetch_message_metadata(
);

let resp = crate::client::send_with_retry(|| {
client
Ok(client
.get(&url)
.bearer_auth(token)
.query(&[("format", "full")])
.query(&[("format", "full")]))
})
.await
.map_err(|e| GwsError::Other(anyhow::anyhow!("Failed to fetch message: {e}")))?;
Expand Down
4 changes: 2 additions & 2 deletions src/helpers/gmail/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ pub(super) struct ReplyConfig {

async fn fetch_user_email(client: &reqwest::Client, token: &str) -> Result<String, GwsError> {
let resp = crate::client::send_with_retry(|| {
client
Ok(client
.get("https://gmail.googleapis.com/gmail/v1/users/me/profile")
.bearer_auth(token)
.bearer_auth(token))
})
.await
.map_err(|e| GwsError::Other(anyhow::anyhow!("Failed to fetch user profile: {e}")))?;
Expand Down
2 changes: 1 addition & 1 deletion src/helpers/gmail/triage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub async fn handle_triage(matches: &ArgMatches) -> Result<(), GwsError> {
);

let get_resp = crate::client::send_with_retry(|| {
client.get(&get_url).bearer_auth(token)
Ok(client.get(&get_url).bearer_auth(token))
})
.await
.ok()?;
Expand Down
Loading