Skip to content

Commit b4fb822

Browse files
committed
Merge #21: Ensure that bad HTTP status codes result in the producer retrying
Approved-by: Qqwy Priority: Normal Auto-deploy: false
2 parents 831fcb1 + 4f5afcd commit b4fb822

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libs/opsqueue_python/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "opsqueue_python"
3-
version = "0.30.7"
3+
version = "0.30.8"
44
edition = "2021"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

opsqueue/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "opsqueue"
3-
version = "0.30.7"
3+
version = "0.30.8"
44
edition = "2021"
55
description = "lightweight batch processing queue for heavy loads"
66
repository = "https://github.com/channable/opsqueue"

opsqueue/src/producer/client.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ impl Client {
6262
.http_client
6363
.get(format!("{base_url}/submissions/count"))
6464
.send()
65-
.await?;
65+
.await?
66+
.error_for_status()?;
6667
let bytes = resp.bytes().await?;
6768
let body = serde_json::from_slice(&bytes)?;
6869

@@ -92,7 +93,8 @@ impl Client {
9293
.headers(otel_trace_carrier.try_into().unwrap_or_default())
9394
.json(submission)
9495
.send()
95-
.await?;
96+
.await?
97+
.error_for_status()?;
9698
let bytes = resp.bytes().await?;
9799
let body = serde_json::from_slice(&bytes)?;
98100
Ok(body)
@@ -118,7 +120,8 @@ impl Client {
118120
.http_client
119121
.get(format!("{base_url}/submissions/{submission_id}"))
120122
.send()
121-
.await?;
123+
.await?
124+
.error_for_status()?;
122125
let bytes = resp.bytes().await?;
123126
let body = serde_json::from_slice(&bytes)?;
124127
Ok(body)
@@ -143,7 +146,8 @@ impl Client {
143146
"{base_url}/submissions/lookup_id_by_prefix/{prefix}"
144147
))
145148
.send()
146-
.await?;
149+
.await?
150+
.error_for_status()?;
147151
let bytes = resp.bytes().await?;
148152
let body = serde_json::from_slice(&bytes)?;
149153
Ok(body)
@@ -158,15 +162,18 @@ impl Client {
158162

159163
/// Get the server's version from the `/version` endpoint.
160164
///
161-
/// The result will be the value of [`VERSION_CARGO_SEMVER`][crate::VERSION_CARGO_SEMVER]
165+
/// A successful result will be the value of [`VERSION_CARGO_SEMVER`][crate::VERSION_CARGO_SEMVER]
162166
/// prefixed with a "v", for example `v0.30.5`.
167+
///
168+
/// Upon connection failure, this will not attempt to do any retrying.
163169
pub async fn server_version(&self) -> Result<String, InternalProducerClientError> {
164170
let base_url = &self.base_url;
165171
let resp = self
166172
.http_client
167173
.get(format!("{base_url}/version"))
168174
.send()
169-
.await?;
175+
.await?
176+
.error_for_status()?;
170177
let text = resp.text().await?;
171178
Ok(text)
172179
}
@@ -183,14 +190,28 @@ pub enum InternalProducerClientError {
183190
impl InternalProducerClientError {
184191
pub fn is_ephemeral(&self) -> bool {
185192
match self {
186-
// NOTE: In the case of an ungraceful restart, this case might theoretically trigger.
193+
// In the case of an ungraceful restart, this case might theoretically trigger.
187194
// So even cleaner would be a tiny retry loop for this special case.
188195
// However, we certainly **do not** want to wait multiple minutes before returning.
189196
Self::ResponseDecodingError(_) => false,
190-
// NOTE: reqwest doesn't make this very easy as it has a single error typed used for _everything_
191-
// Maybe a different HTTP client library is nicer in this regard?
197+
// Reqwest doesn't make this very easy as it has a single error typed used for _everything_.
192198
Self::HTTPClientError(inner) => {
193-
inner.is_connect() || inner.is_timeout() || inner.is_decode()
199+
// Failures in which a connection could not be established are ephemeral,
200+
// as these can be caused by network failures, so we want to retry:
201+
if inner.is_connect() || inner.is_timeout() || inner.is_decode() {
202+
true
203+
} else if inner.is_status() {
204+
// Failures where the server returns a 5xx status code might indicate the server is (temporarily!) unhealthy,
205+
// or in the process of a restart.
206+
// Any other status is considered a permanent failure however
207+
inner
208+
.status()
209+
.map(|status| status.is_server_error())
210+
.unwrap_or(false)
211+
} else {
212+
// Anything else is a permanent failure.
213+
false
214+
}
194215
}
195216
}
196217
}

0 commit comments

Comments
 (0)