Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions opsqueue/src/common/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,11 @@ pub mod db {
chunk_id: ChunkId,
failure: String,
mut conn: impl WriterConnection,
max_retries: u32,
) -> sqlx::Result<bool> {
let failed_permanently = conn
.transaction(move |mut tx| {
Box::pin(async move {
const MAX_RETRIES: i64 = 10;
let ChunkId {
submission_id,
chunk_index,
Expand All @@ -388,7 +388,7 @@ pub mod db {
.fetch_one(tx.get_inner())
.await?;
tracing::trace!("Retries: {}", fields.retries);
if fields.retries >= MAX_RETRIES {
if fields.retries >= max_retries.into() {
crate::common::submission::db::fail_submission_notx(
submission_id,
chunk_index,
Expand Down
7 changes: 7 additions & 0 deletions opsqueue/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ pub struct Config {
/// At that time, the connection will be closed and any open reservations will be canceled.
#[arg(long, default_value_t = 3)]
pub max_missable_heartbeats: usize,

/// Maximum number of times a chunk is retried before permanently failing
/// the full submission the chunk is a part of.
#[arg(long, default_value_t = 10)]
pub max_chunk_retries: u32,
}

impl Default for Config {
Expand All @@ -82,13 +87,15 @@ impl Default for Config {
let heartbeat_interval =
humantime::Duration::from_str("10 seconds").expect("valid humantime");
let max_missable_heartbeats = 3;
let max_chunk_retries = 10;
Config {
port,
database_filename,
reservation_expiration,
max_read_pool_size,
heartbeat_interval,
max_missable_heartbeats,
max_chunk_retries,
}
}
}
15 changes: 12 additions & 3 deletions opsqueue/src/consumer/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl ServerState {
config: &'static Config,
) -> Self {
let dispatcher = Dispatcher::new(reservation_expiration);
let (completer, completer_tx) = Completer::new(pool.writer_pool(), &dispatcher);
let (completer, completer_tx) =
Completer::new(pool.writer_pool(), &dispatcher, config.max_chunk_retries);
Self {
pool,
completer: Some(completer),
Expand Down Expand Up @@ -170,12 +171,14 @@ pub struct Completer {
pool: db::WriterPool,
dispatcher: Dispatcher,
count: usize,
max_chunk_retries: u32,
}

impl Completer {
pub fn new(
pool: &db::WriterPool,
dispatcher: &Dispatcher,
max_chunk_retries: u32,
) -> (Self, tokio::sync::mpsc::Sender<CompleterMessage>) {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let pool = pool.clone();
Expand All @@ -184,6 +187,7 @@ impl Completer {
pool: pool.clone(),
dispatcher: dispatcher.clone(),
count: 0,
max_chunk_retries,
};
(me, tx)
}
Expand Down Expand Up @@ -243,8 +247,13 @@ impl Completer {
} => {
// Even in the unlikely event that the DB write fails,
// we still want to unreserve the chunk
let failed_permanently =
crate::common::chunk::db::retry_or_fail_chunk(id, failure, &mut conn).await;
let failed_permanently = crate::common::chunk::db::retry_or_fail_chunk(
id,
failure,
&mut conn,
self.max_chunk_retries,
)
.await;
reservations.lock().expect("No poison").remove(&id);
let maybe_started_at = self
.dispatcher
Expand Down