diff --git a/Cargo.lock b/Cargo.lock index 8df04d4..f5ad53a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1972,7 +1972,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.30.7" +version = "0.30.8" dependencies = [ "anyhow", "arc-swap", @@ -2026,7 +2026,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.30.7" +version = "0.30.8" dependencies = [ "anyhow", "chrono", diff --git a/libs/opsqueue_python/Cargo.toml b/libs/opsqueue_python/Cargo.toml index eb43975..5658152 100644 --- a/libs/opsqueue_python/Cargo.toml +++ b/libs/opsqueue_python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opsqueue_python" -version = "0.30.7" +version = "0.30.8" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/opsqueue/Cargo.toml b/opsqueue/Cargo.toml index 4a1a889..84dab4b 100644 --- a/opsqueue/Cargo.toml +++ b/opsqueue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opsqueue" -version = "0.30.7" +version = "0.30.8" edition = "2021" description = "lightweight batch processing queue for heavy loads" repository = "https://github.com/channable/opsqueue" diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 328394b..0b94b66 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -62,7 +62,8 @@ impl Client { .http_client .get(format!("{base_url}/submissions/count")) .send() - .await?; + .await? + .error_for_status()?; let bytes = resp.bytes().await?; let body = serde_json::from_slice(&bytes)?; @@ -92,7 +93,8 @@ impl Client { .headers(otel_trace_carrier.try_into().unwrap_or_default()) .json(submission) .send() - .await?; + .await? + .error_for_status()?; let bytes = resp.bytes().await?; let body = serde_json::from_slice(&bytes)?; Ok(body) @@ -118,7 +120,8 @@ impl Client { .http_client .get(format!("{base_url}/submissions/{submission_id}")) .send() - .await?; + .await? + .error_for_status()?; let bytes = resp.bytes().await?; let body = serde_json::from_slice(&bytes)?; Ok(body) @@ -143,7 +146,8 @@ impl Client { "{base_url}/submissions/lookup_id_by_prefix/{prefix}" )) .send() - .await?; + .await? + .error_for_status()?; let bytes = resp.bytes().await?; let body = serde_json::from_slice(&bytes)?; Ok(body) @@ -158,15 +162,18 @@ impl Client { /// Get the server's version from the `/version` endpoint. /// - /// The result will be the value of [`VERSION_CARGO_SEMVER`][crate::VERSION_CARGO_SEMVER] + /// A successful result will be the value of [`VERSION_CARGO_SEMVER`][crate::VERSION_CARGO_SEMVER] /// prefixed with a "v", for example `v0.30.5`. + /// + /// Upon connection failure, this will not attempt to do any retrying. pub async fn server_version(&self) -> Result { let base_url = &self.base_url; let resp = self .http_client .get(format!("{base_url}/version")) .send() - .await?; + .await? + .error_for_status()?; let text = resp.text().await?; Ok(text) } @@ -183,14 +190,28 @@ pub enum InternalProducerClientError { impl InternalProducerClientError { pub fn is_ephemeral(&self) -> bool { match self { - // NOTE: In the case of an ungraceful restart, this case might theoretically trigger. + // In the case of an ungraceful restart, this case might theoretically trigger. // So even cleaner would be a tiny retry loop for this special case. // However, we certainly **do not** want to wait multiple minutes before returning. Self::ResponseDecodingError(_) => false, - // NOTE: reqwest doesn't make this very easy as it has a single error typed used for _everything_ - // Maybe a different HTTP client library is nicer in this regard? + // Reqwest doesn't make this very easy as it has a single error typed used for _everything_. Self::HTTPClientError(inner) => { - inner.is_connect() || inner.is_timeout() || inner.is_decode() + // Failures in which a connection could not be established are ephemeral, + // as these can be caused by network failures, so we want to retry: + if inner.is_connect() || inner.is_timeout() || inner.is_decode() { + true + } else if inner.is_status() { + // Failures where the server returns a 5xx status code might indicate the server is (temporarily!) unhealthy, + // or in the process of a restart. + // Any other status is considered a permanent failure however + inner + .status() + .map(|status| status.is_server_error()) + .unwrap_or(false) + } else { + // Anything else is a permanent failure. + false + } } } }