From 90527c02e9cbc4d4a9e1a8fc72b8096bed5c06b3 Mon Sep 17 00:00:00 2001 From: Marten Wijnja Date: Mon, 6 Oct 2025 17:29:30 +0200 Subject: [PATCH] Allow configuring the max number of chunk retries Before, this was hard-coded to 10. --- opsqueue/src/common/chunk.rs | 4 ++-- opsqueue/src/config.rs | 7 +++++++ opsqueue/src/consumer/server/mod.rs | 15 ++++++++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/opsqueue/src/common/chunk.rs b/opsqueue/src/common/chunk.rs index 912cb2b..9ac98e2 100644 --- a/opsqueue/src/common/chunk.rs +++ b/opsqueue/src/common/chunk.rs @@ -367,11 +367,11 @@ pub mod db { chunk_id: ChunkId, failure: String, mut conn: impl WriterConnection, + max_retries: u32, ) -> sqlx::Result { let failed_permanently = conn .transaction(move |mut tx| { Box::pin(async move { - const MAX_RETRIES: i64 = 10; let ChunkId { submission_id, chunk_index, @@ -388,7 +388,7 @@ pub mod db { .fetch_one(tx.get_inner()) .await?; tracing::trace!("Retries: {}", fields.retries); - if fields.retries >= MAX_RETRIES { + if fields.retries >= max_retries.into() { crate::common::submission::db::fail_submission_notx( submission_id, chunk_index, diff --git a/opsqueue/src/config.rs b/opsqueue/src/config.rs index a73a0c0..a56264b 100644 --- a/opsqueue/src/config.rs +++ b/opsqueue/src/config.rs @@ -69,6 +69,11 @@ pub struct Config { /// At that time, the connection will be closed and any open reservations will be canceled. #[arg(long, default_value_t = 3)] pub max_missable_heartbeats: usize, + + /// Maximum number of times a chunk is retried before permanently failing + /// the full submission the chunk is a part of. + #[arg(long, default_value_t = 10)] + pub max_chunk_retries: u32, } impl Default for Config { @@ -82,6 +87,7 @@ impl Default for Config { let heartbeat_interval = humantime::Duration::from_str("10 seconds").expect("valid humantime"); let max_missable_heartbeats = 3; + let max_chunk_retries = 10; Config { port, database_filename, @@ -89,6 +95,7 @@ impl Default for Config { max_read_pool_size, heartbeat_interval, max_missable_heartbeats, + max_chunk_retries, } } } diff --git a/opsqueue/src/consumer/server/mod.rs b/opsqueue/src/consumer/server/mod.rs index 8767d3f..c869f5d 100644 --- a/opsqueue/src/consumer/server/mod.rs +++ b/opsqueue/src/consumer/server/mod.rs @@ -72,7 +72,8 @@ impl ServerState { config: &'static Config, ) -> Self { let dispatcher = Dispatcher::new(reservation_expiration); - let (completer, completer_tx) = Completer::new(pool.writer_pool(), &dispatcher); + let (completer, completer_tx) = + Completer::new(pool.writer_pool(), &dispatcher, config.max_chunk_retries); Self { pool, completer: Some(completer), @@ -170,12 +171,14 @@ pub struct Completer { pool: db::WriterPool, dispatcher: Dispatcher, count: usize, + max_chunk_retries: u32, } impl Completer { pub fn new( pool: &db::WriterPool, dispatcher: &Dispatcher, + max_chunk_retries: u32, ) -> (Self, tokio::sync::mpsc::Sender) { let (tx, rx) = tokio::sync::mpsc::channel(1024); let pool = pool.clone(); @@ -184,6 +187,7 @@ impl Completer { pool: pool.clone(), dispatcher: dispatcher.clone(), count: 0, + max_chunk_retries, }; (me, tx) } @@ -243,8 +247,13 @@ impl Completer { } => { // Even in the unlikely event that the DB write fails, // we still want to unreserve the chunk - let failed_permanently = - crate::common::chunk::db::retry_or_fail_chunk(id, failure, &mut conn).await; + let failed_permanently = crate::common::chunk::db::retry_or_fail_chunk( + id, + failure, + &mut conn, + self.max_chunk_retries, + ) + .await; reservations.lock().expect("No poison").remove(&id); let maybe_started_at = self .dispatcher