Skip to content

Commit f7c57e1

Browse files
authored
Simplify worker timer job (#19)
We now only need one 'tokio::select!' call
1 parent c912bab commit f7c57e1

File tree

3 files changed

+25
-25
lines changed

3 files changed

+25
-25
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ uuid = { version = "1", features = ["v7", "serde"] }
2323
tracing = "0.1"
2424
hostname = "0.4"
2525
rand = "0.9"
26+
futures = "0.3.31"
2627

2728
[dev-dependencies]
2829
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }

src/worker.rs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -280,47 +280,31 @@ impl Worker {
280280
let mut fatal_duration = warn_duration * 2;
281281
let mut warn_fired = false;
282282
let mut deadline = Instant::now();
283-
let mut channel_open = true;
284283

285284
loop {
286285
let warn_at = deadline + warn_duration;
287286
let fatal_at = deadline + fatal_duration;
288287

289-
// If channel is closed, just wait for timeout without checking channel
290-
if !channel_open {
291-
tokio::select! {
292-
_ = sleep_until(warn_at), if !warn_fired => {
293-
tracing::warn!(
294-
"Task {} exceeded claim timeout of {}s (no heartbeat/step since last extension)",
295-
task_label,
296-
claim_timeout
297-
);
298-
warn_fired = true;
299-
}
300-
301-
_ = sleep_until(fatal_at) => {
302-
// Fatal timeout reached
303-
return;
304-
}
288+
let channel_fut = async {
289+
if lease_rx.is_closed() && lease_rx.is_empty() {
290+
// Wait forever, so that we'll hit one of the timeout branches
291+
// in the `tokio::select!` below.
292+
futures::future::pending().await
293+
} else {
294+
lease_rx.recv().await
305295
}
306-
continue;
307-
}
296+
};
308297

309298
tokio::select! {
310299
biased; // Check channel first to prioritize resets
311300

312-
msg = lease_rx.recv() => {
301+
msg = channel_fut => {
313302
if let Some(extension) = msg {
314303
// Lease extended - reset deadline and warning state
315304
warn_duration = extension;
316305
fatal_duration = extension * 2;
317306
deadline = Instant::now();
318307
warn_fired = false;
319-
} else {
320-
// Channel closed - task might be finishing, but keep timing
321-
// in case it's actually stuck. The outer select will abort
322-
// us when task completes normally.
323-
channel_open = false;
324308
}
325309
}
326310

0 commit comments

Comments
 (0)