Skip to content

Commit 05fd2b0

Browse files
QqwyOpsBotPrime
authored andcommitted
Allow configuring the max number of chunk retries
Before, this was hard-coded to 10.
1 parent a2e407f commit 05fd2b0

File tree

3 files changed

+21
-5
lines changed

3 files changed

+21
-5
lines changed

opsqueue/src/common/chunk.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,11 +367,11 @@ pub mod db {
367367
chunk_id: ChunkId,
368368
failure: String,
369369
mut conn: impl WriterConnection,
370+
max_retries: u32,
370371
) -> sqlx::Result<bool> {
371372
let failed_permanently = conn
372373
.transaction(move |mut tx| {
373374
Box::pin(async move {
374-
const MAX_RETRIES: i64 = 10;
375375
let ChunkId {
376376
submission_id,
377377
chunk_index,
@@ -388,7 +388,7 @@ pub mod db {
388388
.fetch_one(tx.get_inner())
389389
.await?;
390390
tracing::trace!("Retries: {}", fields.retries);
391-
if fields.retries >= MAX_RETRIES {
391+
if fields.retries >= max_retries.into() {
392392
crate::common::submission::db::fail_submission_notx(
393393
submission_id,
394394
chunk_index,

opsqueue/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ pub struct Config {
6969
/// At that time, the connection will be closed and any open reservations will be canceled.
7070
#[arg(long, default_value_t = 3)]
7171
pub max_missable_heartbeats: usize,
72+
73+
/// Maximum number of times a chunk is retried before permanently failing
74+
/// the full submission the chunk is a part of.
75+
#[arg(long, default_value_t = 10)]
76+
pub max_chunk_retries: u32,
7277
}
7378

7479
impl Default for Config {
@@ -82,13 +87,15 @@ impl Default for Config {
8287
let heartbeat_interval =
8388
humantime::Duration::from_str("10 seconds").expect("valid humantime");
8489
let max_missable_heartbeats = 3;
90+
let max_chunk_retries = 10;
8591
Config {
8692
port,
8793
database_filename,
8894
reservation_expiration,
8995
max_read_pool_size,
9096
heartbeat_interval,
9197
max_missable_heartbeats,
98+
max_chunk_retries,
9299
}
93100
}
94101
}

opsqueue/src/consumer/server/mod.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ impl ServerState {
7272
config: &'static Config,
7373
) -> Self {
7474
let dispatcher = Dispatcher::new(reservation_expiration);
75-
let (completer, completer_tx) = Completer::new(pool.writer_pool(), &dispatcher);
75+
let (completer, completer_tx) =
76+
Completer::new(pool.writer_pool(), &dispatcher, config.max_chunk_retries);
7677
Self {
7778
pool,
7879
completer: Some(completer),
@@ -170,12 +171,14 @@ pub struct Completer {
170171
pool: db::WriterPool,
171172
dispatcher: Dispatcher,
172173
count: usize,
174+
max_chunk_retries: u32,
173175
}
174176

175177
impl Completer {
176178
pub fn new(
177179
pool: &db::WriterPool,
178180
dispatcher: &Dispatcher,
181+
max_chunk_retries: u32,
179182
) -> (Self, tokio::sync::mpsc::Sender<CompleterMessage>) {
180183
let (tx, rx) = tokio::sync::mpsc::channel(1024);
181184
let pool = pool.clone();
@@ -184,6 +187,7 @@ impl Completer {
184187
pool: pool.clone(),
185188
dispatcher: dispatcher.clone(),
186189
count: 0,
190+
max_chunk_retries,
187191
};
188192
(me, tx)
189193
}
@@ -243,8 +247,13 @@ impl Completer {
243247
} => {
244248
// Even in the unlikely event that the DB write fails,
245249
// we still want to unreserve the chunk
246-
let failed_permanently =
247-
crate::common::chunk::db::retry_or_fail_chunk(id, failure, &mut conn).await;
250+
let failed_permanently = crate::common::chunk::db::retry_or_fail_chunk(
251+
id,
252+
failure,
253+
&mut conn,
254+
self.max_chunk_retries,
255+
)
256+
.await;
248257
reservations.lock().expect("No poison").remove(&id);
249258
let maybe_started_at = self
250259
.dispatcher

0 commit comments

Comments
 (0)