Harden worker delivery contracts and add durable worker event journal#206
Harden worker delivery contracts and add durable worker event journal#206vsumner wants to merge 9 commits intospacedriveapp:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds deterministic worker task contracts (ack/progress/terminal), durable terminal delivery receipts, and an append-only worker events journal; introduces OutboundEnvelope and receipt-aware outbound routing; extends config/runtime with a hot-reloadable worker_contract block; updates messaging adapters to return DeliveryOutcome and adds per-worker progress handling. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
|
||
| /// Mark that a user-visible acknowledgement has been delivered for a worker. | ||
| pub async fn mark_worker_task_contract_acknowledged( | ||
| &self, |
There was a problem hiding this comment.
mark_worker_task_contract_acknowledged currently rewrites any non-terminal state to acked, which can regress progressing/sla_missed back to acked whenever a later checkpoint is surfaced. I think this should only transition created -> acked and otherwise leave state unchanged.
| &self, | |
| sqlx::query( | |
| "UPDATE worker_task_contracts \ | |
| SET state = CASE \ | |
| WHEN state IN (?, ?, ?) THEN state \ | |
| WHEN state = ? THEN ? \ | |
| ELSE state \ | |
| END, \ | |
| updated_at = CURRENT_TIMESTAMP \ | |
| WHERE worker_id = ?", | |
| ) | |
| .bind(WORKER_CONTRACT_STATE_TERMINAL_PENDING) | |
| .bind(WORKER_CONTRACT_STATE_TERMINAL_ACKED) | |
| .bind(WORKER_CONTRACT_STATE_TERMINAL_FAILED) | |
| .bind(WORKER_CONTRACT_STATE_CREATED) | |
| .bind(WORKER_CONTRACT_STATE_ACKED) | |
| .bind(worker_id.to_string()) |
| &self, | ||
| worker_id: &str, | ||
| limit: i64, | ||
| ) -> crate::error::Result<Vec<WorkerEventRow>> { |
There was a problem hiding this comment.
Minor ordering nit: created_at is CURRENT_TIMESTAMP (seconds resolution), so multiple events created within the same second can appear in a non-deterministic order. Adding a tie-breaker keeps timelines stable.
| ) -> crate::error::Result<Vec<WorkerEventRow>> { | |
| ORDER BY created_at DESC, id DESC \ |
| .bind(WORKER_CONTRACT_STATE_CREATED) | ||
| .bind(limit) | ||
| .fetch_all(&mut *tx) | ||
| .await |
There was a problem hiding this comment.
In these claim loops, unwrap_or_default() on try_get can silently turn a decode error into empty IDs and result in no-op updates (and repeated retries forever). Since this is internal DB state, I’d rather fail loudly and rollback the tx.
| .await | |
| let contract_id: String = row.try_get("id").map_err(|error| anyhow::anyhow!(error))?; | |
| let worker_id_raw: String = row | |
| .try_get("worker_id") | |
| .map_err(|error| anyhow::anyhow!(error))?; | |
| let task_summary: String = row | |
| .try_get("task_summary") | |
| .map_err(|error| anyhow::anyhow!(error))?; | |
| let attempt_count: i64 = row | |
| .try_get("attempt_count") | |
| .map_err(|error| anyhow::anyhow!(error))?; |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/tools/cancel.rs (1)
85-117:⚠️ Potential issue | 🟡 Minor
reasonsilently dropped for branch cancellations but appears in the output message.When
process_type == "branch",args.reasonis not forwarded tocancel_branch, yet lines 110–117 will still format the output message as"branch <id> cancelled: <reason>". The LLM sees a reason-bearing success message even though no reason was recorded in the backend. This is a minor inconsistency — either pass the reason tocancel_branch(if the signature supports it) or omit it from the output message for the branch path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/cancel.rs` around lines 85 - 117, The success message can claim a reason for branch cancellations even though call() does not forward args.reason to cancel_branch; update call() to either pass the reason through or remove it from the formatted message for the "branch" arm. Concretely, modify the match branch that invokes self.state.cancel_branch(branch_id) to call a cancel_branch variant that accepts an Option<&str> (or args.reason.as_deref()) if such a signature exists, or else ensure the final message construction uses the recorded behavior (omit reason when args.process_type == "branch") by checking process_type before appending args.reason; reference the call() function, args.reason/args.process_type/args.process_id, cancel_branch and cancel_worker, and CancelError when making the change.src/agent/channel.rs (2)
2895-2909: 🛠️ Refactor suggestion | 🟠 MajorDo not silently drop terminal worker event broadcast failures.
Terminal
WorkerStatusandWorkerCompletesends are currently ignored withlet _ =. If dispatch fails, downstream cleanup/retrigger logic can silently miss terminal transitions.As per coding guidelines: "Don't silently discard errors; no let _ = on Results. Handle, log, or propagate errors. Only exception is .ok() on channel sends where the receiver may be dropped".Suggested fix
- let _ = event_tx.send(ProcessEvent::WorkerStatus { + if let Err(error) = event_tx.send(ProcessEvent::WorkerStatus { agent_id: agent_id.clone(), worker_id, channel_id: channel_id.clone(), status: terminal_status.to_string(), - }); + }) { + tracing::debug!(%error, worker_id = %worker_id, "failed to broadcast WorkerStatus"); + } - let _ = event_tx.send(ProcessEvent::WorkerComplete { + if let Err(error) = event_tx.send(ProcessEvent::WorkerComplete { agent_id, worker_id, channel_id, result: result_text, notify, success, - }); + }) { + tracing::debug!(%error, worker_id = %worker_id, "failed to broadcast WorkerComplete"); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2895 - 2909, Replace the silent ignores on the two terminal broadcasts so failures are not dropped: check the Result returned by event_tx.send for both the ProcessEvent::WorkerStatus (the send that includes terminal_status, agent_id.clone(), worker_id, channel_id.clone()) and the ProcessEvent::WorkerComplete (the send that passes agent_id, worker_id, channel_id, result_text, notify, success), and handle errors instead of using "let _ =". Either propagate the error to the caller, return an Err, or at minimum log the send failure with contextual fields (agent_id, worker_id, channel_id, terminal_status/result_text) so downstream cleanup/retrigger logic can detect missed broadcasts; update the function signature to return a Result if choosing propagation.
1767-1977: 🛠️ Refactor suggestion | 🟠 MajorMove worker log/contract DB writes off the channel event loop.
This path now awaits multiple DB operations inline while processing broadcast events. That can stall event handling and user-facing responsiveness under load. Use fire-and-forget tasks for worker log persistence in this loop.
As per coding guidelines: "Use fire-and-forget DB writes with tokio::spawn for conversation history saves, memory writes, and worker log persistence. User gets their response immediately".Suggested refactor pattern
- if let Err(error) = run_logger - .upsert_worker_task_contract( - &self.deps.agent_id, - &self.id, - *worker_id, - &public_task_summary, - worker_contract_config.ack_secs.max(1), - worker_contract_config.progress_secs.max(1), - terminal_secs, - ) - .await - { - tracing::warn!(...); - } + let run_logger = run_logger.clone(); + let agent_id = self.deps.agent_id.clone(); + let channel_id = self.id.clone(); + let task_summary = public_task_summary.clone(); + tokio::spawn(async move { + if let Err(error) = run_logger + .upsert_worker_task_contract( + &agent_id, + &channel_id, + *worker_id, + &task_summary, + worker_contract_config.ack_secs.max(1), + worker_contract_config.progress_secs.max(1), + terminal_secs, + ) + .await + { + tracing::warn!(%error, channel_id = %channel_id, worker_id = %worker_id, "failed to upsert worker task contract"); + } + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 1767 - 1977, The event loop is performing awaited DB/log calls inline (e.g., run_logger.upsert_worker_task_contract, run_logger.touch_worker_task_contract_progress, run_logger.log_worker_event, state.process_run_logger.mark_worker_task_contract_terminal_pending, run_logger.log_worker_completed) which can block the channel; change these to fire-and-forget tasks by moving each awaited call into a tokio::spawn (or equivalent) closure capturing the minimal needed data (worker_id, status/tool_name/description/question_id/result/terminal_state, progress_secs/terminal_secs, channel id/self.id) and logging any errors inside the spawned task, while keeping the existing immediate behavior (send_status_update, checkpoint updates) in the event loop; ensure you reference the same run_logger and state APIs inside the spawned tasks and avoid holding mutable references from self across the await boundary.
🧹 Nitpick comments (7)
src/db.rs (1)
83-83: Use a non-abbreviated binding name for clarity.Rename
exttoextensionin the closure for consistency with repository naming rules.Proposed refactor
- if path.extension().and_then(|ext| ext.to_str()) != Some("sql") { + if path + .extension() + .and_then(|extension| extension.to_str()) + != Some("sql") + { continue; }As per coding guidelines, "Don't abbreviate variable names (use queue not q, message not msg, channel not ch). Common abbreviations like config are fine".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/db.rs` at line 83, The closure passed to path.extension().and_then(|ext| ext.to_str()) uses an abbreviated binding name `ext`; rename this binding to `extension` (i.e., and_then(|extension| extension.to_str())) to comply with the repository naming guideline and improve clarity—update any matching closure occurrences in src/db.rs (where path.extension() is used) to use `extension` instead of `ext`.migrations/20260225000001_worker_events.sql (1)
17-24: ConsiderCREATE INDEX IF NOT EXISTSfor idempotent migrations.The table uses
IF NOT EXISTSbut the three index statements do not, which means replaying this migration (e.g., in test teardown/re-setup or after a partial rollback) will error with "index already exists". Theworker_task_contractsmigration has the same pattern, so this may be intentional, but it's worth making consistent.💡 Suggested fix
-CREATE INDEX idx_worker_events_worker +CREATE INDEX IF NOT EXISTS idx_worker_events_worker ON worker_events(worker_id, created_at); -CREATE INDEX idx_worker_events_channel +CREATE INDEX IF NOT EXISTS idx_worker_events_channel ON worker_events(channel_id, created_at); -CREATE INDEX idx_worker_events_agent +CREATE INDEX IF NOT EXISTS idx_worker_events_agent ON worker_events(agent_id, created_at);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@migrations/20260225000001_worker_events.sql` around lines 17 - 24, The three index creations (idx_worker_events_worker, idx_worker_events_channel, idx_worker_events_agent) on worker_events should be made idempotent; change their CREATE INDEX statements to use IF NOT EXISTS so rerunning migrations or partial rollbacks won't fail due to "index already exists" errors—update the CREATE INDEX lines for these three indexes to include IF NOT EXISTS while leaving the rest of the migration unchanged.docs/content/docs/(configuration)/config.mdx (1)
480-487: Consider documenting[defaults.worker_contract]hot-reload behavior.The "What Hot-Reloads" table (lines 226–240) doesn't mention
worker_contractsettings. Given that the PR wires these througharc-swap(as described in the How It Works section), they are likely hot-reloadable. If so, a row should be added to the table for consistency with other config sections likecompactionandmax_turns.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(configuration)/config.mdx around lines 480 - 487, The docs omission: add a row to the "What Hot-Reloads" table indicating that [defaults.worker_contract] fields (ack_secs, progress_secs, tick_secs) are hot-reloadable; confirm this by referencing the arc-swap wiring described in the "How It Works" section and state the fields are hot-reloaded at runtime so changes take effect without restart. Update the table to mirror the format used for `compaction`/`max_turns` rows and include the exact config key `[defaults.worker_contract]` and the three settings (ack_secs, progress_secs, tick_secs) in the Description column.src/api/workers.rs (2)
73-79:payload_jsondouble-encoding — considerOption<serde_json::Value>for cleaner API output.Returning
payload_json: Option<String>serializes an already-JSON string as a JSON string literal (i.e., clients receive"payload_json": "{\"key\":\"val\"}"instead of"payload_json": {"key": "val"}). Changing the field toOption<serde_json::Value>with a deserialization step on mapping would produce a more idiomatic REST response without any behavioral change on the storage side.♻️ Suggested mapping adjustment
.map(|event| WorkerEventItem { id: event.id, event_type: event.event_type, - payload_json: event.payload_json, + payload_json: event.payload_json.as_deref().and_then(|s| serde_json::from_str(s).ok()), created_at: event.created_at, })And update the struct field type:
pub(super) struct WorkerEventItem { id: String, event_type: String, - payload_json: Option<String>, + payload_json: Option<serde_json::Value>, created_at: String, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/workers.rs` around lines 73 - 79, The WorkerEventItem struct currently serializes payload_json as Option<String>, causing double-encoded JSON in responses; change WorkerEventItem::payload_json to Option<serde_json::Value> and, when constructing WorkerEventItem from your DB/model row (where payload is stored as a JSON string), parse the stored string with serde_json::from_str to produce a serde_json::Value (or set None on empty/null), handling parse errors appropriately (e.g., map to None or propagate a conversion error) so API output returns native JSON objects; update any mapping code that creates WorkerEventItem to perform this deserialization.
175-177: Hard-coded event limit of 200 may be insufficient for long-running workers.Workers with many tool calls accumulate
tool_started/tool_completedevents rapidly. At 200 events, detail pages for long workers will silently truncate the journal. Consider either exposing alimitquery param (defaulting to 200) or raising the ceiling, so consumers can retrieve the full journal when needed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/workers.rs` around lines 175 - 177, The hard-coded 200 in the call to logger.list_worker_events(&query.worker_id, 200) causes silent truncation for long-running workers; add an optional limit query parameter to the handler (e.g., limit: Option<usize> in the request/query type), parse and default it to 200, clamp it to a reasonable max (e.g., 5_000) to avoid DoS, and pass that value into list_worker_events instead of the literal 200. Update any request types and the call site (logger.list_worker_events) and validate the input so consumers can request larger journals when needed.src/messaging/webchat.rs (1)
115-121: Wildcard arm silently maps newStatusUpdatevariants toNotSurfaced.The
_ => NotSurfacedcatch-all means futureStatusUpdatevariants added to the enum will compile without any forcing function on the webchat adapter to explicitly handle or reject them. If a new variant is intended to be surfaced in webchat (e.g., aWorkerProgressstatus in the future), it could silently becomeNotSurfacedwith no diagnostic.Consider an
#[allow(unreachable_patterns)]-guarded exhaustive match or at least atracing::trace!on the wildcard arm so it's visible during development.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/webchat.rs` around lines 115 - 121, The match on StatusUpdate (mapping to WebChatEvent) currently uses a wildcard arm that returns DeliveryOutcome::NotSurfaced, which will silently ignore any new StatusUpdate variants; update the match on the status variable so it is exhaustive (remove the blanket `_` arm) and explicitly handle known variants or, if you must keep a catch-all, replace `_ => return Ok(DeliveryOutcome::NotSurfaced)` with a branch that logs the unhandled variant (e.g., tracing::trace!("unhandled StatusUpdate in webchat adapter: {:?}", status) and then returns NotSurfaced) or annotate the exhaustive match with #[allow(unreachable_patterns)] to force compile-time visibility; refer to the StatusUpdate enum, the WebChatEvent mapping, and DeliveryOutcome::NotSurfaced to find where to change the logic.src/tools/browser.rs (1)
472-493: Timeout helper is only applied to a subset of browser actionsThis still leaves several long-running CDP paths without bounds (
snapshot,act,screenshot,evaluate,content,close_tab), so worker stalls can still occur there.⏱️ Suggested extension pattern
- page.execute(AxEnableParams::default()) - .await - .map_err(|error| { - BrowserError::new(format!("failed to enable accessibility: {error}")) - })?; + Self::with_action_timeout( + "enable accessibility", + page.execute(AxEnableParams::default()), + ) + .await + .map_err(|error| BrowserError::new(format!("failed to enable accessibility: {error}")))?; - let result = page - .evaluate(script) - .await - .map_err(|error| BrowserError::new(format!("evaluate failed: {error}")))?; + let result = Self::with_action_timeout("evaluate", page.evaluate(script)) + .await + .map_err(|error| BrowserError::new(format!("evaluate failed: {error}")))?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 472 - 493, The helper with_action_timeout currently wraps only a subset of CDP actions, leaving long-running calls like snapshot, act, screenshot, evaluate, content, and close_tab unmanaged and allowing stalls; update the code so each of those methods (functions named snapshot, act, screenshot, evaluate, content, close_tab) invokes with_action_timeout(action_name, async { ...original future... }) using the shared BROWSER_ACTION_TIMEOUT_SECS constant and propagates BrowserError on timeout or inner error, ensuring the same error formatting used in with_action_timeout is applied uniformly to those call sites.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/content/docs/`(features)/workers.mdx:
- Around line 98-99: The docs are inconsistent: one section sets "state =
Cancelled" while the state-machine/terminal-state text still treats cancellation
as "Failed"; update the state-machine/terminal-state description to include
Cancelled as a terminal state (and remove or change any wording that models
cancellations as Failed) so all references consistently treat cancellation as a
distinct terminal state (look for occurrences of "state = Cancelled",
"Cancelled", and "Failed" in the state-machine text and align their semantics
and examples accordingly).
In `@src/agent/channel.rs`:
- Around line 97-144: The cancel logic in cancel_worker currently calls
handle.abort() and immediately emits WorkerStatus(cancelled) and
WorkerComplete(cancelled), which can conflict if the task already finished;
instead, after calling handle.abort() await the JoinHandle (the handle variable)
and inspect the JoinError (use JoinError::is_cancelled()) — only send the
cancelled WorkerStatus/WorkerComplete events when the join result indicates the
task was actually cancelled; if the handle awaited successfully (Ok) emit no
cancelled terminal events or emit a success/complete event as appropriate; use
the existing self.deps.event_tx.send(...) calls and the
WorkerStatus/WorkerComplete variants to update events based on the awaited
result.
In `@src/config.rs`:
- Around line 3082-3096: The worker_contract parsing currently permits zero
durations for ack_secs, progress_secs, and tick_secs which can lead to immediate
expiry or tight loops; update the WorkerContractConfig construction to validate
those fields are > 0 and not accept 0: when mapping
toml.defaults.worker_contract, replace each .ack_secs/.progress_secs/.tick_secs
unwrap_or usage with logic that treats 0 as invalid and falls back to
base_defaults.worker_contract.<field> (or return a clear config parse error) so
zeros are converted to the safe default; apply the same validation to the other
occurrence that constructs WorkerContractConfig around lines 3292-3302.
In `@src/conversation/history.rs`:
- Around line 1357-1409: The close_orphaned_runs function is marking
worker_task_contracts as terminal_failed even when a matching
worker_delivery_receipt is already acked; update the final UPDATE in
close_orphaned_runs (the query operating on worker_task_contracts) to add an AND
NOT EXISTS subquery that looks up worker_delivery_receipts for the same
worker_id with status = 'acked' (skip changing contracts if an acked receipt
exists), keeping the same state assignment
(WORKER_CONTRACT_STATE_TERMINAL_FAILED) and existing NOT IN checks; this
complements or mitigates the non-transactional ack_worker_delivery_receipt flow
by avoiding overwriting contracts that already have an acked receipt.
- Around line 1237-1270: In fail_worker_delivery_receipt_attempt, the two
updates that mark the receipt as failed and set the related contract to
WORKER_CONTRACT_STATE_TERMINAL_FAILED must be executed inside a single database
transaction (same approach used in ack_worker_delivery_receipt) to avoid the
intermediate inconsistent state if a crash occurs between the two
non-transactional queries; change the code that updates worker_delivery_receipts
and worker_task_contracts to begin a sqlx transaction on self.pool, execute both
UPDATE statements (the query that sets status='failed' on
worker_delivery_receipts and the query that sets
state=WORKER_CONTRACT_STATE_TERMINAL_FAILED on worker_task_contracts where
worker_id = (SELECT worker_id FROM worker_delivery_receipts WHERE id = ?)), and
then commit the transaction (rolling back on any error) so both writes succeed
or none do.
- Around line 967-1033: upsert_worker_terminal_receipt has a TOCTOU race between
the SELECT and INSERT that can cause unique-constraint failures; change the
implementation to perform the operation atomically (either wrap the
read/conditional update/insert in a database transaction using self.pool.begin()
and transactional execute/commit, or replace the read+branch with a single SQL
upsert using INSERT ... ON CONFLICT(worker_id, kind) DO UPDATE) on the
worker_delivery_receipts table; ensure the upsert sets channel_id,
terminal_state, payload_text, status ('pending' for non-acked),
next_attempt_at/updated_at and returns or preserves the receipt id exactly as
current code expects; update references to WORKER_TERMINAL_RECEIPT_KIND and the
returned receipt_id logic inside upsert_worker_terminal_receipt accordingly.
- Around line 1164-1204: The two updates in ack_worker_delivery_receipt (the
UPDATE on worker_delivery_receipts that sets status='acked' and the subsequent
UPDATE on worker_task_contracts that sets the contract state) must be executed
inside a single DB transaction so they succeed or fail atomically; modify
ack_worker_delivery_receipt to begin a sqlx transaction (e.g.,
pool.begin()/Transaction), run the first UPDATE and check rows_affected, then
run the second UPDATE within the same transaction (using the same receipt_id and
the WORKER_CONTRACT_STATE_* constants), and commit the transaction only if both
succeed (rollback on error), mapping errors the same way as before.
- Around line 1638-1675: The function list_worker_events currently uses
limit.clamp(1, 500) which silently promotes 0 to 1; change it to validate the
input and return an error for non-positive limits instead of promoting them:
remove the limit.clamp(1, 500) call, add an early check in list_worker_events
such as if limit <= 0 { return Err(anyhow::anyhow!("invalid limit: must be >
0")); }, and still enforce an upper bound by using let limit = limit.min(500)
before binding to the query; keep using anyhow::anyhow! for the error to match
existing error handling and bind the validated limit to the SQL query.
- Around line 589-613: upsert_worker_task_contract currently forces state back
to WORKER_CONTRACT_STATE_CREATED in the UPDATE (see the bind of
WORKER_CONTRACT_STATE_CREATED), which regresses contracts; change the UPDATE to
preserve the existing state instead of overwriting it—either remove the "state =
?" assignment from the UPDATE clause or replace it with a conditional (e.g.,
state = CASE WHEN state = ? THEN ? ELSE state END) so the bind of
WORKER_CONTRACT_STATE_CREATED only applies on newly inserted rows (or only when
the prior state is CREATED); update the corresponding .bind calls to match the
new SQL so the UPDATE no longer resets progressed/acked states.
In `@src/db.rs`:
- Around line 91-97: The migration filename validation currently allows an empty
version prefix (e.g., "_worker.sql"); update the check around the split_once
result (the local variable file_name -> version) to assert that version is
non-empty before validating digits (e.g., add assert!(!version.is_empty(),
"migration version should not be empty: {file_name}");) and then keep the
existing assert that all chars are ASCII digits so only non-empty numeric
prefixes pass; make this change in the same block where version is extracted
from file_name.
In `@src/messaging/discord.rs`:
- Around line 481-487: The match arms handling StatusUpdate::StopTyping,
StatusUpdate::ToolStarted { .. }, StatusUpdate::ToolCompleted { .. }, and
StatusUpdate::BranchStarted { .. } currently call
self.stop_typing(message).await and then return the value representing
"Surfaced" (true); change these branches to still call
self.stop_typing(message).await but return the NotSurfaced outcome instead of
Surfaced so these non-visible status updates don't mark delivery as successful
(replace the trailing true with the NotSurfaced/DeliveryOutcome::NotSurfaced
value used by the surrounding function).
In `@src/tools/conclude_link.rs`:
- Around line 97-100: Don't silently ignore the Result from
response_tx.send(...).await; replace the current `let _ =
self.response_tx.send(OutboundResponse::Status(crate::StatusUpdate::StopTyping).into()).await;`
with explicit handling: either call `.await.ok();` if a dropped receiver is the
only acceptable error, or match/map the Result and log or propagate the error
(use the project's logger) so send failures on response_tx are not silently
discarded. Ensure you reference response_tx, send, OutboundResponse::Status and
StatusUpdate::StopTyping when applying the change.
---
Outside diff comments:
In `@src/agent/channel.rs`:
- Around line 2895-2909: Replace the silent ignores on the two terminal
broadcasts so failures are not dropped: check the Result returned by
event_tx.send for both the ProcessEvent::WorkerStatus (the send that includes
terminal_status, agent_id.clone(), worker_id, channel_id.clone()) and the
ProcessEvent::WorkerComplete (the send that passes agent_id, worker_id,
channel_id, result_text, notify, success), and handle errors instead of using
"let _ =". Either propagate the error to the caller, return an Err, or at
minimum log the send failure with contextual fields (agent_id, worker_id,
channel_id, terminal_status/result_text) so downstream cleanup/retrigger logic
can detect missed broadcasts; update the function signature to return a Result
if choosing propagation.
- Around line 1767-1977: The event loop is performing awaited DB/log calls
inline (e.g., run_logger.upsert_worker_task_contract,
run_logger.touch_worker_task_contract_progress, run_logger.log_worker_event,
state.process_run_logger.mark_worker_task_contract_terminal_pending,
run_logger.log_worker_completed) which can block the channel; change these to
fire-and-forget tasks by moving each awaited call into a tokio::spawn (or
equivalent) closure capturing the minimal needed data (worker_id,
status/tool_name/description/question_id/result/terminal_state,
progress_secs/terminal_secs, channel id/self.id) and logging any errors inside
the spawned task, while keeping the existing immediate behavior
(send_status_update, checkpoint updates) in the event loop; ensure you reference
the same run_logger and state APIs inside the spawned tasks and avoid holding
mutable references from self across the await boundary.
In `@src/tools/cancel.rs`:
- Around line 85-117: The success message can claim a reason for branch
cancellations even though call() does not forward args.reason to cancel_branch;
update call() to either pass the reason through or remove it from the formatted
message for the "branch" arm. Concretely, modify the match branch that invokes
self.state.cancel_branch(branch_id) to call a cancel_branch variant that accepts
an Option<&str> (or args.reason.as_deref()) if such a signature exists, or else
ensure the final message construction uses the recorded behavior (omit reason
when args.process_type == "branch") by checking process_type before appending
args.reason; reference the call() function,
args.reason/args.process_type/args.process_id, cancel_branch and cancel_worker,
and CancelError when making the change.
---
Nitpick comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Around line 480-487: The docs omission: add a row to the "What Hot-Reloads"
table indicating that [defaults.worker_contract] fields (ack_secs,
progress_secs, tick_secs) are hot-reloadable; confirm this by referencing the
arc-swap wiring described in the "How It Works" section and state the fields are
hot-reloaded at runtime so changes take effect without restart. Update the table
to mirror the format used for `compaction`/`max_turns` rows and include the
exact config key `[defaults.worker_contract]` and the three settings (ack_secs,
progress_secs, tick_secs) in the Description column.
In `@migrations/20260225000001_worker_events.sql`:
- Around line 17-24: The three index creations (idx_worker_events_worker,
idx_worker_events_channel, idx_worker_events_agent) on worker_events should be
made idempotent; change their CREATE INDEX statements to use IF NOT EXISTS so
rerunning migrations or partial rollbacks won't fail due to "index already
exists" errors—update the CREATE INDEX lines for these three indexes to include
IF NOT EXISTS while leaving the rest of the migration unchanged.
In `@src/api/workers.rs`:
- Around line 73-79: The WorkerEventItem struct currently serializes
payload_json as Option<String>, causing double-encoded JSON in responses; change
WorkerEventItem::payload_json to Option<serde_json::Value> and, when
constructing WorkerEventItem from your DB/model row (where payload is stored as
a JSON string), parse the stored string with serde_json::from_str to produce a
serde_json::Value (or set None on empty/null), handling parse errors
appropriately (e.g., map to None or propagate a conversion error) so API output
returns native JSON objects; update any mapping code that creates
WorkerEventItem to perform this deserialization.
- Around line 175-177: The hard-coded 200 in the call to
logger.list_worker_events(&query.worker_id, 200) causes silent truncation for
long-running workers; add an optional limit query parameter to the handler
(e.g., limit: Option<usize> in the request/query type), parse and default it to
200, clamp it to a reasonable max (e.g., 5_000) to avoid DoS, and pass that
value into list_worker_events instead of the literal 200. Update any request
types and the call site (logger.list_worker_events) and validate the input so
consumers can request larger journals when needed.
In `@src/db.rs`:
- Line 83: The closure passed to path.extension().and_then(|ext| ext.to_str())
uses an abbreviated binding name `ext`; rename this binding to `extension`
(i.e., and_then(|extension| extension.to_str())) to comply with the repository
naming guideline and improve clarity—update any matching closure occurrences in
src/db.rs (where path.extension() is used) to use `extension` instead of `ext`.
In `@src/messaging/webchat.rs`:
- Around line 115-121: The match on StatusUpdate (mapping to WebChatEvent)
currently uses a wildcard arm that returns DeliveryOutcome::NotSurfaced, which
will silently ignore any new StatusUpdate variants; update the match on the
status variable so it is exhaustive (remove the blanket `_` arm) and explicitly
handle known variants or, if you must keep a catch-all, replace `_ => return
Ok(DeliveryOutcome::NotSurfaced)` with a branch that logs the unhandled variant
(e.g., tracing::trace!("unhandled StatusUpdate in webchat adapter: {:?}",
status) and then returns NotSurfaced) or annotate the exhaustive match with
#[allow(unreachable_patterns)] to force compile-time visibility; refer to the
StatusUpdate enum, the WebChatEvent mapping, and DeliveryOutcome::NotSurfaced to
find where to change the logic.
In `@src/tools/browser.rs`:
- Around line 472-493: The helper with_action_timeout currently wraps only a
subset of CDP actions, leaving long-running calls like snapshot, act,
screenshot, evaluate, content, and close_tab unmanaged and allowing stalls;
update the code so each of those methods (functions named snapshot, act,
screenshot, evaluate, content, close_tab) invokes
with_action_timeout(action_name, async { ...original future... }) using the
shared BROWSER_ACTION_TIMEOUT_SECS constant and propagates BrowserError on
timeout or inner error, ensuring the same error formatting used in
with_action_timeout is applied uniformly to those call sites.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (34)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(deployment)/roadmap.mdxdocs/content/docs/(features)/workers.mdxmigrations/20260224000001_worker_delivery_receipts.sqlmigrations/20260224000002_worker_task_contracts.sqlmigrations/20260224000003_worker_tool_calls.sqlmigrations/20260225000001_worker_events.sqlsrc/agent/channel.rssrc/api/agents.rssrc/api/channels.rssrc/api/workers.rssrc/config.rssrc/conversation.rssrc/conversation/history.rssrc/cron/scheduler.rssrc/db.rssrc/lib.rssrc/llm/model.rssrc/main.rssrc/messaging/discord.rssrc/messaging/manager.rssrc/messaging/slack.rssrc/messaging/telegram.rssrc/messaging/traits.rssrc/messaging/webchat.rssrc/tools.rssrc/tools/browser.rssrc/tools/cancel.rssrc/tools/conclude_link.rssrc/tools/react.rssrc/tools/reply.rssrc/tools/send_file.rssrc/tools/skip.rssrc/tools/worker_inspect.rs
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (9)
src/conversation/history.rs (8)
624-647:⚠️ Potential issue | 🟠 Major
mark_worker_task_contract_acknowledgedcan regressprogressing/sla_missedback toacked.The CASE WHEN only exempts the three terminal states. A late-arriving ack notification (e.g., a retry loop racing with a progress heartbeat) will overwrite
progressingorsla_missedwithacked, losing state. Add those states to the protected set.🔧 Proposed fix
.bind(WORKER_CONTRACT_STATE_TERMINAL_PENDING) .bind(WORKER_CONTRACT_STATE_TERMINAL_ACKED) .bind(WORKER_CONTRACT_STATE_TERMINAL_FAILED) +.bind(WORKER_CONTRACT_STATE_PROGRESSING) +.bind(WORKER_CONTRACT_STATE_SLA_MISSED) .bind(WORKER_CONTRACT_STATE_ACKED)And update the SQL CASE to
WHEN state IN (?, ?, ?, ?, ?) THEN state.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 624 - 647, The UPDATE in mark_worker_task_contract_acknowledged currently only protects three terminal states (WORKER_CONTRACT_STATE_TERMINAL_PENDING, WORKER_CONTRACT_STATE_TERMINAL_ACKED, WORKER_CONTRACT_STATE_TERMINAL_FAILED) so a late ack can overwrite progressing or sla_missed; modify the SQL CASE in mark_worker_task_contract_acknowledged to include WORKER_CONTRACT_STATE_PROGRESSING and WORKER_CONTRACT_STATE_SLA_MISSED (i.e., change the WHEN state IN (?, ?, ?) to WHEN state IN (?, ?, ?, ?, ?) and bind the two additional state constants before binding WORKER_CONTRACT_STATE_ACKED and worker_id) so progressing/sla_missed are preserved.
972-1038:⚠️ Potential issue | 🟠 Major
upsert_worker_terminal_receipthas a TOCTOU race between SELECT and INSERT.Two concurrent callers (e.g., worker completion and a retry sweep) can both observe
existing = Noneand both attempt the INSERT, hitting a unique constraint violation if(worker_id, kind)is unique-indexed. Wrap in an atomicINSERT … ON CONFLICT DO UPDATEor a transaction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 972 - 1038, The function upsert_worker_terminal_receipt contains a TOCTOU race: two callers can both see no existing row and attempt INSERT, causing unique-constraint errors for worker_delivery_receipts keyed by (worker_id, kind). Fix by making the operation atomic — replace the SELECT + conditional INSERT/UPDATE with a single UPSERT (INSERT ... ON CONFLICT (worker_id, kind) DO UPDATE SET channel_id = EXCLUDED.channel_id, terminal_state = EXCLUDED.terminal_state, payload_text = EXCLUDED.payload_text, status = 'pending', last_error = NULL, next_attempt_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP RETURNING id) using sqlx::query (or, if your DB doesn't support ON CONFLICT, perform the SELECT/INSERT/UPDATE inside a SERIALIZABLE transaction and handle unique-violation retries). Ensure the code uses the returned id as receipt_id and removes the separate SELECT/conditional INSERT path in upsert_worker_terminal_receipt.
1395-1407:⚠️ Potential issue | 🟠 Major
close_orphaned_runscan incorrectly markterminal_failedon contracts whose receipt was already acked.Because
ack_worker_delivery_receiptis non-transactional, a crash between its two writes leaves receipt =ackedbut contract =terminal_pending. On the next startup,close_orphaned_runsmoves all non-terminal contracts (includingterminal_pending) toterminal_failed, permanently misrepresenting a successfully-delivered contract. This is a downstream consequence of the non-transactional ack flow; fixing that eliminates this risk. Alternatively, add a guard to skip contracts that already have an acked receipt.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1395 - 1407, close_orphaned_runs is marking non-terminal contracts as terminal_failed even when a corresponding receipt has been acked because ack_worker_delivery_receipt is non-transactional; either make ack_worker_delivery_receipt atomic (wrap the receipt write and contract update in a transaction) or change the SQL in close_orphaned_runs to skip contracts that already have an acked receipt (e.g., by joining the receipts table or checking the receipt column) so the UPDATE that sets WORKER_CONTRACT_STATE_TERMINAL_FAILED does not touch rows where receipt = 'acked' or where WORKER_CONTRACT_STATE_TERMINAL_ACKED is already present; update the query and its bind parameters in close_orphaned_runs accordingly to guard against converting acked deliveries to terminal_failed.
1241-1275:⚠️ Potential issue | 🟠 Major
fail_worker_delivery_receipt_attempthas the same non-transactional two-write pattern on max-attempt exhaustion.Receipt is set to
failed(line 1253), then the contract is set toterminal_failed(line 1273) in two separate, non-transactional queries. A crash between the two leaves the system in an inconsistent state. Wrap both writes in a transaction, as recommended forack_worker_delivery_receipt.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1241 - 1275, The two UPDATEs in fail_worker_delivery_receipt_attempt (the receipt SET status = 'failed' and the worker_task_contracts SET state = terminal_failed) must be executed inside a single DB transaction to avoid inconsistent state if a crash occurs between them; change the code to begin a transaction (e.g. let mut tx = self.pool.begin().await?), execute both sqlx::query(...) .bind(...).execute(&mut tx).await calls against the same transaction, then commit the transaction (tx.commit().await?) only after both succeed, matching the pattern used in ack_worker_delivery_receipt; keep the same binds (attempt_count, error, receipt_id, WORKER_CONTRACT_STATE_TERMINAL_FAILED, etc.) and propagate errors to abort the transaction on failure.
1169-1210:⚠️ Potential issue | 🟠 Major
ack_worker_delivery_receiptperforms two non-transactional writes — a crash between them leaves the contract permanently stuck.Receipt is marked
acked(line 1182), then the contract is updated toterminal_acked(line 1204). A crash between the two leaves receipt =ackedbut contract =terminal_pending, whichclose_orphaned_runswill later force toterminal_failed— misrepresenting a successful delivery. Both writes must be in a single transaction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1169 - 1210, The two updates in ack_worker_delivery_receipt (marking worker_delivery_receipts as 'acked' and then updating worker_task_contracts to WORKER_CONTRACT_STATE_TERMINAL_ACKED) must be executed inside a single DB transaction to avoid a crash leaving an inconsistent state; change the method to begin a transaction (e.g. let mut tx = self.pool.begin().await?), run the first UPDATE against &mut tx, check rows_affected, then run the second UPDATE (the worker_task_contracts query that binds WORKER_CONTRACT_STATE_TERMINAL_ACKED and receipt_id) against the same &mut tx, and finally commit the transaction (or let it rollback on error) so both writes succeed or fail atomically. Ensure you replace .execute(&self.pool) with .execute(&mut tx) and propagate errors as before.
749-752:⚠️ Potential issue | 🟠 Major
unwrap_or_default()ontry_getsilently converts decode errors into empty/zero values in all three claim loops.This pattern appears in
claim_due_worker_task_contract_ack_deadlines(lines 749–752),claim_due_worker_task_contract_progress_deadlines(lines 829–831), andclaim_due_worker_task_contract_terminal_deadlines(lines 911–912). A decode error (e.g., a type mismatch or schema drift) silently produces an emptycontract_id, causing the subsequent UPDATE to match no rows (updated == 0), and the item is silently skipped — leading to contracts that are never claimed and retried forever.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 749 - 752, The code is silently converting decoding errors from row.try_get(...) into empty/zero values via unwrap_or_default, causing bad rows to become empty contract_id/fields and be skipped; update each claim function (claim_due_worker_task_contract_ack_deadlines, claim_due_worker_task_contract_progress_deadlines, claim_due_worker_task_contract_terminal_deadlines) to explicitly handle try_get results for the fields contract_id, worker_id_raw, task_summary, and attempt_count: match or use ?/map_err to detect and log decode errors (include column name and row context), then skip that row or return an Err instead of using default values so bad decodes do not produce empty identifiers that silently prevent UPDATEs from matching. Ensure logs include the error and the raw row identifiers to aid debugging.
1644-1680:⚠️ Potential issue | 🟡 Minor
list_worker_events:limit.clamp(1, 500)silently promotes0to1, and theORDER BY created_at DESChas no stable tie-breaker.Two separate issues:
- A caller passing
limit = 0receives 1 event instead of an empty result. The promotion is surprising and undocumented.created_athas seconds resolution in SQLite, so multiple events within the same second are returned in a non-deterministic order. Addidas a tie-breaker.🔧 Proposed fix
+ /// `limit` must be ≥ 1 and is capped at 500. pub async fn list_worker_events( &self, worker_id: &str, limit: i64, ) -> crate::error::Result<Vec<WorkerEventRow>> { + if limit <= 0 { + return Err(anyhow::anyhow!("limit must be > 0, got {limit}")); + } let rows = sqlx::query( "SELECT id, worker_id, channel_id, agent_id, event_type, payload_json, created_at \ FROM worker_events \ WHERE worker_id = ? \ - ORDER BY created_at DESC \ + ORDER BY created_at DESC, id DESC \ LIMIT ?", ) .bind(worker_id) - .bind(limit.clamp(1, 500)) + .bind(limit.min(500))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1644 - 1680, The method list_worker_events currently clamps limit with limit.clamp(1, 500) which silently turns a caller-supplied 0 into 1 and should instead treat 0 as "no results"; change the logic in list_worker_events to return Ok(vec![]) when limit == 0 (and still enforce an upper bound, e.g. min(limit, 500) or reject negative values), and update the SQL ORDER BY clause to include a deterministic tie-breaker by adding id as a secondary sort (e.g., ORDER BY created_at DESC, id DESC) so rows with the same created_at are returned in a stable order; ensure the final bound value (not limit.clamp(1,500)) is what you .bind(...) for the query.
594-618:⚠️ Potential issue | 🟠 Major
upsert_worker_task_contractUPDATE resetsstateback tocreated, regressing already-acknowledged contracts.Line 608 unconditionally binds
WORKER_CONTRACT_STATE_CREATEDas the newstatein the UPDATE branch. A contract that has advanced toacked,progressing, orsla_missedwill be snapped back tocreatedwhen the upsert is re-called (e.g., on task-summary refresh). The guardWHERE state NOT IN (terminal_acked, terminal_failed)does not prevent regressions through intermediate states.🔧 Proposed fix
sqlx::query( "UPDATE worker_task_contracts \ SET task_summary = ?, \ - state = ?, \ + state = CASE WHEN state = ? THEN ? ELSE state END, \ ack_deadline_at = datetime('now', '+' || ? || ' seconds'), \ progress_deadline_at = datetime('now', '+' || ? || ' seconds'), \ terminal_deadline_at = datetime('now', '+' || ? || ' seconds'), \ last_status_hash = ?, \ sla_nudge_sent = 0, \ updated_at = CURRENT_TIMESTAMP \ WHERE worker_id = ? \ AND state NOT IN (?, ?)", ) .bind(task_summary) +.bind(WORKER_CONTRACT_STATE_CREATED) // only if currently created .bind(WORKER_CONTRACT_STATE_CREATED) .bind(timing.ack_secs as i64)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 594 - 618, The UPDATE in upsert_worker_task_contract unconditionally sets state to WORKER_CONTRACT_STATE_CREATED, which regresses advanced contracts; remove the state assignment from the UPDATE clause (or replace it with a no-op like "state = state") and stop binding WORKER_CONTRACT_STATE_CREATED in the UPDATE path so existing rows keep their current state; locate the SQL that updates worker_task_contracts and the .bind(WORKER_CONTRACT_STATE_CREATED) call and adjust the query and binds accordingly so only the INSERT path (not UPDATE) can set the initial created state.src/agent/channel.rs (1)
97-144: Cancel events emitted before confirming abort – duplicate of existing review.
handle.abort()is fire-and-forget; the task may have already completed successfully before the abort fires. EmittingWorkerStatus(cancelled)+WorkerComplete(cancelled)immediately after can produce conflicting terminal events (e.g. done and cancelled for the same worker). The fix suggested in the prior review still applies here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 97 - 144, The code currently calls handle.abort() then immediately sends WorkerStatus(cancelled) and WorkerComplete(cancelled), which can conflict if the task already finished; instead after calling handle.abort() await the JoinHandle (handle.await) and inspect the JoinError/result (use JoinError::is_cancelled or Err from await) to determine whether the task was actually cancelled before emitting WorkerStatus and WorkerComplete; apply the same change in the removed branch (do not emit cancel events immediately when handle is missing—only emit based on the joined/observed outcome or skip if the worker already completed), updating logic around handle.abort(), the JoinHandle await, and the two sends to deps.event_tx for WorkerStatus and WorkerComplete.
🧹 Nitpick comments (3)
src/agent/channel.rs (3)
3189-3207:is_worker_terminal_failure/classify_worker_terminal_stateare implicitly coupled to literal message prefixes fromspawn_worker_task.If the error message format ever changes (e.g., the
"Worker timed out after "prefix in line 2905), these detection functions silently stop working. Consider sharing named constants for the prefix strings, or returning a structured terminal outcome fromspawn_worker_taskinstead of a bareStringthat downstream code must re-parse.♻️ Example: shared prefix constants
+const WORKER_FAILED_PREFIX: &str = "Worker failed:"; +const WORKER_TIMEOUT_PREFIX: &str = "Worker timed out after "; +const WORKER_CANCELLED_PREFIX: &str = "Worker cancelled:"; fn is_worker_terminal_failure(result: &str) -> bool { let trimmed = result.trim_start(); - trimmed.starts_with("Worker failed:") - || trimmed.starts_with("Worker timed out after ") - || trimmed.starts_with("Worker cancelled:") + trimmed.starts_with(WORKER_FAILED_PREFIX) + || trimmed.starts_with(WORKER_TIMEOUT_PREFIX) + || trimmed.starts_with(WORKER_CANCELLED_PREFIX) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 3189 - 3207, The functions is_worker_terminal_failure and classify_worker_terminal_state rely on hard-coded message prefixes ("Worker failed:", "Worker timed out after ", "Worker cancelled:") that duplicate spawn_worker_task's output; extract these prefixes into shared constants (e.g., WORKER_FAILED_PREFIX, WORKER_TIMED_OUT_PREFIX, WORKER_CANCELLED_PREFIX) and use those constants in both spawn_worker_task and here, or better change spawn_worker_task to return a structured Result/enum (e.g., WorkerTerminalState::Failed/TimedOut/Cancelled/Done) instead of a plain String and update is_worker_terminal_failure/classify_worker_terminal_state to consume that enum; reference functions spawn_worker_task, is_worker_terminal_failure, and classify_worker_terminal_state when making the change.
1838-1905: Duplicate progress-refresh logic acrossToolStarted/ToolCompletedarms.Both arms perform identical operations (log the event, read
progress_secs, calltouch_worker_task_contract_progress) and differ only in the event label string. The same pattern repeats again in theWorkerPermissionandWorkerQuestionarms.♻️ Suggested refactor
+ async fn refresh_worker_progress( + &self, + run_logger: &ProcessRunLogger, + worker_id: WorkerId, + event_name: &str, + event_data: serde_json::Value, + hint: Option<&str>, + ) { + run_logger.log_worker_event(worker_id, event_name, event_data); + let progress_secs = self + .deps + .runtime_config + .worker_contract + .load() + .progress_secs + .max(1); + if let Err(error) = run_logger + .touch_worker_task_contract_progress(worker_id, hint, progress_secs) + .await + { + tracing::warn!( + %error, + channel_id = %self.id, + worker_id = %worker_id, + "failed to refresh worker task contract progress from {event_name}" + ); + } + }Then each arm becomes a one-liner call.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 1838 - 1905, The ToolStarted and ToolCompleted match arms duplicate the same progress-refresh logic (log_worker_event, reading deps.runtime_config.worker_contract.load().progress_secs, and calling run_logger.touch_worker_task_contract_progress) differing only by the event label; refactor by extracting a helper function (e.g., refresh_worker_progress_and_log) that accepts &mut run_logger, worker_id, event_label (&str), and Option<&str> tool_name and performs the shared steps (log_worker_event, compute progress_secs.max(1), await touch_worker_task_contract_progress and warn on Err), then replace the body of ProcessEvent::ToolStarted and ProcessEvent::ToolCompleted to call that helper (and likewise reuse it in WorkerPermission and WorkerQuestion arms).
42-46: Add doc comments to the new dispatch/batch constants.
WORKER_CHECKPOINT_MIN_INTERVAL_SECSandWORKER_CHECKPOINT_MAX_CHARSboth carry explanatory comments, but the five new constants below them don't. The batch sizes in particular (i64vsusize) and the 5-second dispatch interval deserve a brief note explaining their tuning rationale.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 42 - 46, Add doc comments above the five constants (WORKER_RECEIPT_DISPATCH_INTERVAL_SECS, WORKER_RECEIPT_DISPATCH_BATCH_SIZE, WORKER_CONTRACT_ACK_BATCH_SIZE, WORKER_CONTRACT_PROGRESS_BATCH_SIZE, WORKER_CONTRACT_TERMINAL_BATCH_SIZE) explaining what each controls, why the values were chosen (e.g., 5s dispatch interval to balance latency and throughput, batch sizes tuned for DB/worker load), and note the type decision for batch sizes (i64 used for compatibility with DB/SQL count types and where conversions to usize occur). Keep each comment short and mention any constraints or places to change if tuning is needed (e.g., conversion sites where you cast i64 -> usize).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/channel.rs`:
- Around line 1773-1793: The event-loop currently awaits DB writes directly
inside the channel match arms (WorkerStarted, WorkerStatus, ToolStarted,
ToolCompleted, WorkerPermission, WorkerQuestion, WorkerComplete), which blocks
the channel; change each awaited call (upsert_worker_task_contract,
touch_worker_task_contract_progress, mark_worker_task_contract_terminal_pending,
upsert_worker_terminal_receipt) to be fire-and-forget by moving the async call
into a tokio::spawn task that captures needed clones (e.g., run_logger.clone(),
self.deps.agent_id.clone(), self.id.clone(), worker_id, public_task_summary,
WorkerTaskContractTiming, etc.), logs errors inside the spawned task, and
removes the .await from the event-loop arms so the match arms return immediately
while the DB writes run in the background.
In `@src/conversation/history.rs`:
- Around line 2094-2098: The test currently discards the Result from
logger.fail_worker_delivery_receipt_attempt with "let _ =" which hides failures;
replace that silent discard by handling the Result: for each call to
fail_worker_delivery_receipt_attempt (in the loop using
WORKER_RECEIPT_MAX_ATTEMPTS and the same receipt_id) unwrap or expect the Result
to avoid silent errors, and for the final attempt assert that the returned
WorkerDeliveryRetryOutcome (from
crate::error::Result<WorkerDeliveryRetryOutcome>) equals the expected terminal
state (e.g., WorkerDeliveryRetryOutcome::TerminalFailed) so failures are
surfaced and the test's terminal-state assertion is meaningful.
- Around line 239-243: The current status_fingerprint uses
std::collections::hash_map::DefaultHasher which is non-deterministic across
process restarts; change status_fingerprint to use a deterministic/stable hasher
(for example fnv::FnvHasher from the fnv crate or fxhash's FxHasher) so the same
status string produces the same 64-bit hex on every run; update the function
status_fingerprint to instantiate the chosen stable hasher, feed it
status.hash(&mut hasher), call hasher.finish() and format as before, and add the
chosen hasher crate to Cargo.toml if needed so last_status_hash persisted in
SQLite remains comparable across restarts.
---
Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 97-144: The code currently calls handle.abort() then immediately
sends WorkerStatus(cancelled) and WorkerComplete(cancelled), which can conflict
if the task already finished; instead after calling handle.abort() await the
JoinHandle (handle.await) and inspect the JoinError/result (use
JoinError::is_cancelled or Err from await) to determine whether the task was
actually cancelled before emitting WorkerStatus and WorkerComplete; apply the
same change in the removed branch (do not emit cancel events immediately when
handle is missing—only emit based on the joined/observed outcome or skip if the
worker already completed), updating logic around handle.abort(), the JoinHandle
await, and the two sends to deps.event_tx for WorkerStatus and WorkerComplete.
In `@src/conversation/history.rs`:
- Around line 624-647: The UPDATE in mark_worker_task_contract_acknowledged
currently only protects three terminal states
(WORKER_CONTRACT_STATE_TERMINAL_PENDING, WORKER_CONTRACT_STATE_TERMINAL_ACKED,
WORKER_CONTRACT_STATE_TERMINAL_FAILED) so a late ack can overwrite progressing
or sla_missed; modify the SQL CASE in mark_worker_task_contract_acknowledged to
include WORKER_CONTRACT_STATE_PROGRESSING and WORKER_CONTRACT_STATE_SLA_MISSED
(i.e., change the WHEN state IN (?, ?, ?) to WHEN state IN (?, ?, ?, ?, ?) and
bind the two additional state constants before binding
WORKER_CONTRACT_STATE_ACKED and worker_id) so progressing/sla_missed are
preserved.
- Around line 972-1038: The function upsert_worker_terminal_receipt contains a
TOCTOU race: two callers can both see no existing row and attempt INSERT,
causing unique-constraint errors for worker_delivery_receipts keyed by
(worker_id, kind). Fix by making the operation atomic — replace the SELECT +
conditional INSERT/UPDATE with a single UPSERT (INSERT ... ON CONFLICT
(worker_id, kind) DO UPDATE SET channel_id = EXCLUDED.channel_id, terminal_state
= EXCLUDED.terminal_state, payload_text = EXCLUDED.payload_text, status =
'pending', last_error = NULL, next_attempt_at = CURRENT_TIMESTAMP, updated_at =
CURRENT_TIMESTAMP RETURNING id) using sqlx::query (or, if your DB doesn't
support ON CONFLICT, perform the SELECT/INSERT/UPDATE inside a SERIALIZABLE
transaction and handle unique-violation retries). Ensure the code uses the
returned id as receipt_id and removes the separate SELECT/conditional INSERT
path in upsert_worker_terminal_receipt.
- Around line 1395-1407: close_orphaned_runs is marking non-terminal contracts
as terminal_failed even when a corresponding receipt has been acked because
ack_worker_delivery_receipt is non-transactional; either make
ack_worker_delivery_receipt atomic (wrap the receipt write and contract update
in a transaction) or change the SQL in close_orphaned_runs to skip contracts
that already have an acked receipt (e.g., by joining the receipts table or
checking the receipt column) so the UPDATE that sets
WORKER_CONTRACT_STATE_TERMINAL_FAILED does not touch rows where receipt =
'acked' or where WORKER_CONTRACT_STATE_TERMINAL_ACKED is already present; update
the query and its bind parameters in close_orphaned_runs accordingly to guard
against converting acked deliveries to terminal_failed.
- Around line 1241-1275: The two UPDATEs in fail_worker_delivery_receipt_attempt
(the receipt SET status = 'failed' and the worker_task_contracts SET state =
terminal_failed) must be executed inside a single DB transaction to avoid
inconsistent state if a crash occurs between them; change the code to begin a
transaction (e.g. let mut tx = self.pool.begin().await?), execute both
sqlx::query(...) .bind(...).execute(&mut tx).await calls against the same
transaction, then commit the transaction (tx.commit().await?) only after both
succeed, matching the pattern used in ack_worker_delivery_receipt; keep the same
binds (attempt_count, error, receipt_id, WORKER_CONTRACT_STATE_TERMINAL_FAILED,
etc.) and propagate errors to abort the transaction on failure.
- Around line 1169-1210: The two updates in ack_worker_delivery_receipt (marking
worker_delivery_receipts as 'acked' and then updating worker_task_contracts to
WORKER_CONTRACT_STATE_TERMINAL_ACKED) must be executed inside a single DB
transaction to avoid a crash leaving an inconsistent state; change the method to
begin a transaction (e.g. let mut tx = self.pool.begin().await?), run the first
UPDATE against &mut tx, check rows_affected, then run the second UPDATE (the
worker_task_contracts query that binds WORKER_CONTRACT_STATE_TERMINAL_ACKED and
receipt_id) against the same &mut tx, and finally commit the transaction (or let
it rollback on error) so both writes succeed or fail atomically. Ensure you
replace .execute(&self.pool) with .execute(&mut tx) and propagate errors as
before.
- Around line 749-752: The code is silently converting decoding errors from
row.try_get(...) into empty/zero values via unwrap_or_default, causing bad rows
to become empty contract_id/fields and be skipped; update each claim function
(claim_due_worker_task_contract_ack_deadlines,
claim_due_worker_task_contract_progress_deadlines,
claim_due_worker_task_contract_terminal_deadlines) to explicitly handle try_get
results for the fields contract_id, worker_id_raw, task_summary, and
attempt_count: match or use ?/map_err to detect and log decode errors (include
column name and row context), then skip that row or return an Err instead of
using default values so bad decodes do not produce empty identifiers that
silently prevent UPDATEs from matching. Ensure logs include the error and the
raw row identifiers to aid debugging.
- Around line 1644-1680: The method list_worker_events currently clamps limit
with limit.clamp(1, 500) which silently turns a caller-supplied 0 into 1 and
should instead treat 0 as "no results"; change the logic in list_worker_events
to return Ok(vec![]) when limit == 0 (and still enforce an upper bound, e.g.
min(limit, 500) or reject negative values), and update the SQL ORDER BY clause
to include a deterministic tie-breaker by adding id as a secondary sort (e.g.,
ORDER BY created_at DESC, id DESC) so rows with the same created_at are returned
in a stable order; ensure the final bound value (not limit.clamp(1,500)) is what
you .bind(...) for the query.
- Around line 594-618: The UPDATE in upsert_worker_task_contract unconditionally
sets state to WORKER_CONTRACT_STATE_CREATED, which regresses advanced contracts;
remove the state assignment from the UPDATE clause (or replace it with a no-op
like "state = state") and stop binding WORKER_CONTRACT_STATE_CREATED in the
UPDATE path so existing rows keep their current state; locate the SQL that
updates worker_task_contracts and the .bind(WORKER_CONTRACT_STATE_CREATED) call
and adjust the query and binds accordingly so only the INSERT path (not UPDATE)
can set the initial created state.
---
Nitpick comments:
In `@src/agent/channel.rs`:
- Around line 3189-3207: The functions is_worker_terminal_failure and
classify_worker_terminal_state rely on hard-coded message prefixes ("Worker
failed:", "Worker timed out after ", "Worker cancelled:") that duplicate
spawn_worker_task's output; extract these prefixes into shared constants (e.g.,
WORKER_FAILED_PREFIX, WORKER_TIMED_OUT_PREFIX, WORKER_CANCELLED_PREFIX) and use
those constants in both spawn_worker_task and here, or better change
spawn_worker_task to return a structured Result/enum (e.g.,
WorkerTerminalState::Failed/TimedOut/Cancelled/Done) instead of a plain String
and update is_worker_terminal_failure/classify_worker_terminal_state to consume
that enum; reference functions spawn_worker_task, is_worker_terminal_failure,
and classify_worker_terminal_state when making the change.
- Around line 1838-1905: The ToolStarted and ToolCompleted match arms duplicate
the same progress-refresh logic (log_worker_event, reading
deps.runtime_config.worker_contract.load().progress_secs, and calling
run_logger.touch_worker_task_contract_progress) differing only by the event
label; refactor by extracting a helper function (e.g.,
refresh_worker_progress_and_log) that accepts &mut run_logger, worker_id,
event_label (&str), and Option<&str> tool_name and performs the shared steps
(log_worker_event, compute progress_secs.max(1), await
touch_worker_task_contract_progress and warn on Err), then replace the body of
ProcessEvent::ToolStarted and ProcessEvent::ToolCompleted to call that helper
(and likewise reuse it in WorkerPermission and WorkerQuestion arms).
- Around line 42-46: Add doc comments above the five constants
(WORKER_RECEIPT_DISPATCH_INTERVAL_SECS, WORKER_RECEIPT_DISPATCH_BATCH_SIZE,
WORKER_CONTRACT_ACK_BATCH_SIZE, WORKER_CONTRACT_PROGRESS_BATCH_SIZE,
WORKER_CONTRACT_TERMINAL_BATCH_SIZE) explaining what each controls, why the
values were chosen (e.g., 5s dispatch interval to balance latency and
throughput, batch sizes tuned for DB/worker load), and note the type decision
for batch sizes (i64 used for compatibility with DB/SQL count types and where
conversions to usize occur). Keep each comment short and mention any constraints
or places to change if tuning is needed (e.g., conversion sites where you cast
i64 -> usize).
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/tools/browser.rs (2)
947-964:⚠️ Potential issue | 🟡 Minor
handle_closeis not wrapped inwith_action_timeout, inconsistent with the hardening goal.Every other CDP/browser operation is now timeout-guarded, but
browser.close()(line 951) can still hang indefinitely if the Chrome process is unresponsive — exactly the scenario timeouts are meant to prevent. Since the close errors are already soft-handled (tracing::warn), wrapping it is straightforward.Proposed fix
if let Some(mut browser) = state.browser.take() - && let Err(error) = browser.close().await { - tracing::warn!(%error, "browser close returned error"); + match tokio::time::timeout( + Duration::from_secs(BROWSER_ACTION_TIMEOUT_SECS), + browser.close(), + ) + .await + { + Ok(Err(error)) => tracing::warn!(%error, "browser close returned error"), + Err(_) => tracing::warn!("browser close timed out after {BROWSER_ACTION_TIMEOUT_SECS}s"), + Ok(Ok(())) => {} + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 947 - 964, The call to browser.close() inside handle_close should be wrapped in the existing with_action_timeout helper to avoid hanging; take the browser out of state as currently done, then call with_action_timeout(...).await (passing the close future, matching the other usages of with_action_timeout) and retain the current soft-handling: if with_action_timeout returns Err (whether a timeout or close error), log it with tracing::warn!(%error, "browser close returned error"). Keep the rest of state cleanup (state.pages.clear(), state.active_target = None, etc.) and ensure the call uses the same timeout semantics as other CDP operations.
1093-1099:⚠️ Potential issue | 🟠 Major
truncate_for_displaypanics on multi-byte UTF-8 — use a char-boundary-safe slice.
&text[..max_len]will panic at runtime ifmax_lenfalls inside a multi-byte character (e.g. emoji, CJK). This is reachable viahandle_act→ActKind::Typewith user-supplied text. Same issue exists at line 927 inhandle_content.Proposed fix
fn truncate_for_display(text: &str, max_len: usize) -> String { if text.len() <= max_len { text.to_string() } else { + let end = text.floor_char_boundary(max_len); + format!("{}...", &text[..end]) - format!("{}...", &text[..max_len]) } }And for line 927 in
handle_content:let truncated = if html.len() > 100_000 { + let end = html.floor_char_boundary(100_000); format!( "{}... [truncated, {} bytes total]", - &html[..100_000], + &html[..end], html.len() ) } else {
str::floor_char_boundaryis stable since Rust 1.91.0 (available now).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 1093 - 1099, The current truncate_for_display uses &text[..max_len] which can panic for multi-byte UTF-8; change it to compute a char-boundary-safe end index (e.g. let end = text.floor_char_boundary(max_len); ) and then slice with &text[..end] (or text.get(..end).unwrap_or(text)) and format!("{}...", &text[..end]). Apply the same char-boundary-safe slice fix to the similar truncation in handle_content (the occurrence noted at the other truncation site) so both places avoid panics on multi-byte characters.
♻️ Duplicate comments (7)
src/db.rs (1)
91-101: Previously flagged empty-version guard is now in place — LGTM.The
assert!(!version.is_empty(), ...)at Line 95 directly addresses the earlier review concern. No further action needed here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/db.rs` around lines 91 - 101, The empty-version guard is already implemented: keep the existing logic that splits file_name with file_name.split_once('_') and the assertions on version (the assert!(!version.is_empty(), ...) and assert!(version.chars().all(|character| character.is_ascii_digit()), ...)) as-is; no code changes are necessary for functions handling migration filename parsing (refer to file_name, version, and the two assert! calls).src/conversation/history.rs (4)
747-750:⚠️ Potential issue | 🟠 MajorStop masking database decode failures in claim/retry paths.
Lines 747-750 (and the same pattern in the listed ranges) use
try_get(...).unwrap_or_default(). Decode/type errors become empty IDs/zero attempts, which can produce no-op updates and retry churn instead of failing loudly.💡 Suggested fix pattern
- let contract_id: String = row.try_get("id").unwrap_or_default(); - let worker_id_raw: String = row.try_get("worker_id").unwrap_or_default(); - let task_summary: String = row.try_get("task_summary").unwrap_or_default(); - let attempt_count: i64 = row.try_get("attempt_count").unwrap_or_default(); + let contract_id: String = row.try_get("id").map_err(|error| anyhow::anyhow!(error))?; + let worker_id_raw: String = row.try_get("worker_id").map_err(|error| anyhow::anyhow!(error))?; + let task_summary: String = row.try_get("task_summary").map_err(|error| anyhow::anyhow!(error))?; + let attempt_count: i64 = row.try_get("attempt_count").map_err(|error| anyhow::anyhow!(error))?;Based on learnings: "Don't silently discard errors; no let _ = on Results. Handle, log, or propagate errors. Only exception is .ok() on channel sends where the receiver may be dropped".
Also applies to: 827-830, 909-910, 1074-1074, 1092-1096, 1135-1135, 1153-1157, 1239-1240
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 747 - 750, The code is masking DB decode errors by using row.try_get(...).unwrap_or_default() for fields like contract_id, worker_id_raw, task_summary, and attempt_count; replace these unwrap_or_default calls with proper error handling—either propagate the Err from row.try_get(...) (using ? or map_err to add context) or log and return an explicit error so a failed decode doesn't become an empty ID/zero attempt and cause no-op updates/retry churn; apply the same change to the other occurrences you noted (around the ranges 827–830, 909–910, 1074, 1092–1096, 1135, 1153–1157, 1239–1240) so all row.try_get calls fail loudly with contextual errors instead of defaulting silently.
1683-1684:⚠️ Potential issue | 🟡 MinorMake worker event ordering deterministic for same-timestamp rows.
Line 1683 orders only by
created_at DESC; rows created within the same timestamp can appear in unstable order. Add anidtie-breaker.💡 Suggested fix
- ORDER BY created_at DESC \ + ORDER BY created_at DESC, id DESC \🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1683 - 1684, The ORDER BY clause uses only "created_at DESC" which yields non-deterministic ordering for rows with identical timestamps; update the SQL fragment that contains "ORDER BY created_at DESC LIMIT ?" to include the id tie-breaker (for example "ORDER BY created_at DESC, id DESC LIMIT ?") so rows with the same created_at are deterministically ordered by id.
239-243:⚠️ Potential issue | 🟠 MajorUse a stable hash algorithm for persisted status fingerprints.
Lines 240-242 use
DefaultHasherforlast_status_hash. That hash is not guaranteed stable across Rust/toolchain upgrades, so persisted comparisons can drift and produce false status-change detections after deploys.💡 Suggested fix
fn status_fingerprint(status: &str) -> String { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - status.hash(&mut hasher); - format!("{:016x}", hasher.finish()) + // FNV-1a 64-bit (stable/deterministic across runs and releases) + let mut hash: u64 = 0xcbf29ce484222325; + for byte in status.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + format!("{hash:016x}") }Rust std::collections::hash_map::DefaultHasher: is output guaranteed stable across compiler/library versions for persisted values?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 239 - 243, status_fingerprint currently uses std::collections::hash_map::DefaultHasher which is not stable across toolchain versions; replace it with a stable cryptographic hash (e.g., SHA-256 or blake3) so persisted fingerprints don’t change between Rust upgrades. Update the status_fingerprint function to compute a fixed digest using a stable hasher (for example, sha2::Sha256 or blake3), format the digest as a hex string (or truncated hex prefix if you need a shorter fingerprint), and ensure you add the chosen crate to Cargo.toml and import it in conversation::history.rs; keep the function name status_fingerprint and return type String to minimize call-site changes.
627-639:⚠️ Potential issue | 🟠 MajorPrevent contract state regression on acknowledgment.
Lines 629-631 currently force every non-terminal state to
acked. This can regressprogressing/sla_missedback toackedafter progress has already been recorded.💡 Suggested fix
"UPDATE worker_task_contracts \ SET state = CASE \ - WHEN state IN (?, ?, ?) THEN state \ - ELSE ? \ + WHEN state IN (?, ?, ?) THEN state \ + WHEN state = ? THEN ? \ + ELSE state \ END, \ updated_at = CURRENT_TIMESTAMP \ WHERE worker_id = ?", ) .bind(WORKER_CONTRACT_STATE_TERMINAL_PENDING) .bind(WORKER_CONTRACT_STATE_TERMINAL_ACKED) .bind(WORKER_CONTRACT_STATE_TERMINAL_FAILED) +.bind(WORKER_CONTRACT_STATE_CREATED) .bind(WORKER_CONTRACT_STATE_ACKED) .bind(worker_id.to_string())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 627 - 639, The UPDATE on worker_task_contracts currently sets non-terminal states to WORKER_CONTRACT_STATE_ACKED, which can regress states like progressing or sla_missed; change the CASE so it only transitions specific pre-ack states (e.g., pending) to ACKED and leaves other non-terminal states unchanged: modify the WHEN clause in the UPDATE (the statement using WORKER_CONTRACT_STATE_TERMINAL_PENDING / TERMINAL_ACKED / TERMINAL_FAILED and binding WORKER_CONTRACT_STATE_ACKED) so that only the intended source state(s) are matched and bound for transition to WORKER_CONTRACT_STATE_ACKED, leaving all other states untouched; update the .bind(...) arguments to match the new WHEN condition order for the CASE.src/messaging/discord.rs (1)
481-487: LGTM — non-visible status branches correctly returnNotSurfaced.
StopTyping,ToolStarted,ToolCompleted, andBranchStartednow returnfalse(mapped toDeliveryOutcome::NotSurfaced), ensuring they don't incorrectly acknowledge delivery.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/discord.rs` around lines 481 - 487, The match arms for StatusUpdate::StopTyping, ::ToolStarted, ::ToolCompleted, and ::BranchStarted correctly return false (mapped to DeliveryOutcome::NotSurfaced) so no functional change is needed; ensure this is intentional by verifying that the boolean-to-DeliveryOutcome mapping treats false as NotSurfaced and that the call to self.stop_typing(message).await in these arms properly awaits, handles or logs any errors (refer to the stop_typing method and the match on StatusUpdate in src/messaging/discord.rs) so the behavior and error handling remain explicit and consistent.src/agent/channel.rs (1)
473-492: Awaited DB round-trips on the timer tick path block the channel event loop.
flush_due_worker_delivery_receipts(Line 474) andflush_due_worker_task_contract_deadlines(Line 482) are awaited directly in theselect!timer arm. The latter issues three sequential DB round-trips (claim_due_worker_task_contract_ack_deadlines,claim_due_worker_task_contract_progress_deadlines,claim_due_worker_task_contract_terminal_deadlines) before yielding, stalling the entire event loop on each tick under DB pressure.Because both functions consume query results to drive status updates, they can't be dropped into a bare fire-and-forget spawn. However,
response_txisClone, so each can be spawned with a cloned sender that drives the outbound calls from the background task:// Example pattern for flush_due_worker_delivery_receipts let run_logger = self.state.process_run_logger.clone(); let channel_id = self.id.clone(); let response_tx = self.response_tx.clone(); tokio::spawn(async move { // claim + send inside the spawned task }); self.worker_receipt_dispatch_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(WORKER_RECEIPT_DISPATCH_INTERVAL_SECS);As per coding guidelines: "Use fire-and-forget DB writes with tokio::spawn for conversation history saves, memory writes, and worker log persistence" and learned rule: "Channels must be always responsive — never blocked by work."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 473 - 492, The timer tick arm currently awaits flush_due_worker_delivery_receipts and flush_due_worker_task_contract_deadlines which performs DB round-trips and blocks the channel event loop; instead, capture/clone the minimal context (e.g., self.response_tx.clone(), self.state.process_run_logger.clone(), self.id.clone() or channel id) and tokio::spawn an async fire-and-forget task that performs the claim+send work, then immediately update worker_receipt_dispatch_deadline and worker_contract_tick_deadline in the select! arm without awaiting the spawned tasks; apply this pattern for both flush_due_worker_delivery_receipts and flush_due_worker_task_contract_deadlines so the channel remains responsive while background tasks use cloned response_tx to drive outbound calls.
🧹 Nitpick comments (4)
src/tools/browser.rs (2)
786-787: Two sequential timeouts foract:type— could a single combined timeout be more appropriate?Lines 786–787 apply separate 45 s timeouts for the focus-click and the type-str. In the worst case,
act:typecan take up to 90 s before failing. If the intent is to bound the total wall-clock time per action, consider wrapping both in a singlewith_action_timeoutcall. If independent timeouts are intentional (e.g. typing very long strings), this is fine as-is.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 786 - 787, The two sequential calls use separate with_action_timeout wrappers ("act focus" around element.click() and "act type" around element.type_str(&text)), which allows up to double the intended wall-clock time; change to a single with_action_timeout that runs both actions in sequence (call element.click() then element.type_str(&text) inside one closure) so the total operation is bounded by one timeout, or explicitly document/keep separate if independent timeouts are desired.
521-522: Lock held for up to 45 s during browser launch — acceptable but be aware of the implication.
state(an asyncMutex) is acquired at line 495 and held through thewith_action_timeoutcall at line 521–522. This pattern is repeated across all handlers. Since browser operations are inherently sequential per worker, this is likely fine, but if any caller ever awaits multipleBrowserToolmethods concurrently, the second call will block for up to the full timeout duration. Worth a brief doc comment onstatenoting this design choice.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 521 - 522, The async Mutex field state in BrowserTool is intentionally held across long awaits (e.g., the with_action_timeout("browser launch", Browser::launch(chrome_config)).await? call and other handlers), which can block concurrent calls for up to the timeout; add a brief doc comment on the state field (or the BrowserTool type) documenting this design choice and its implications (that browser operations are sequential per worker and the mutex is intentionally held during long-running browser operations like with_action_timeout and Browser::launch), so future maintainers know why we don't release the lock before those awaits and can decide whether to refactor for finer-grained locking if concurrency becomes necessary.docs/content/docs/(configuration)/config.mdx (1)
481-488: Document how0is handled for worker contract durations.The parser treats
0as unset/fallback behavior; adding that note here would prevent misconfiguration surprises.📝 Suggested doc patch
### `[defaults.worker_contract]` | Key | Type | Default | Description | |-----|------|---------|-------------| | `ack_secs` | integer | 5 | Deadline to confirm a worker start was surfaced | | `progress_secs` | integer | 45 | Deadline between meaningful worker progress updates | | `tick_secs` | integer | 2 | Poll interval for worker contract deadline checks | +`0` is treated as unset and falls back to inherited/default values.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(configuration)/config.mdx around lines 481 - 488, The docs for [defaults.worker_contract] (keys ack_secs, progress_secs, tick_secs) don't explain that a value of 0 is treated as "unset" (use fallback behavior); update the table description and/or add a short note under the `[defaults.worker_contract]` section clarifying that setting any of ack_secs, progress_secs, or tick_secs to 0 disables the explicit duration and triggers parser fallback/default behavior so users won't be surprised by 0 meaning "use default" rather than "no timeout".src/agent/channel.rs (1)
1894-1973: Identicaltokio::spawnblocks duplicated acrossToolStartedandToolCompletedarms.Both arms clone the same set of variables and spawn the same
touch_worker_task_contract_progresscall. Extract a small helper to reduce the duplication:♻️ Proposed refactor
+ fn spawn_worker_contract_progress_touch( + run_logger: ProcessRunLogger, + channel_id: ChannelId, + worker_id: WorkerId, + hint: Option<String>, + progress_secs: u64, + ) { + tokio::spawn(async move { + if let Err(error) = run_logger + .touch_worker_task_contract_progress(worker_id, hint.as_deref(), progress_secs) + .await + { + tracing::warn!( + %error, + channel_id = %channel_id, + worker_id = %worker_id, + "failed to refresh worker task contract progress" + ); + } + }); + }Then replace both spawn blocks with a call to this helper.
The same pattern is also repeated in the
WorkerPermissionandWorkerQuestionarms (Lines 2000–2016, 2041–2053).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 1894 - 1973, Duplicate async spawn logic in the ProcessEvent::ToolStarted and ProcessEvent::ToolCompleted arms (and similarly in the WorkerPermission and WorkerQuestion arms) should be extracted into a small helper to remove repetition: create a helper function (e.g., schedule_touch_worker_task_contract_progress or spawn_touch_progress) that accepts the cloned run_logger, channel id, event_worker_id, tool_name: Option<String>/&str and progress_secs and performs the tokio::spawn block calling run_logger.touch_worker_task_contract_progress(...).await and logging the tracing::warn on error; then replace the inline tokio::spawn blocks in ProcessEvent::ToolStarted, ProcessEvent::ToolCompleted, and the WorkerPermission/WorkerQuestion arms with calls to this helper, passing the same cloned values (run_logger.clone(), self.id.clone(), *worker_id, tool_name.clone(), progress_secs).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/content/docs/`(features)/workers.mdx:
- Around line 63-74: The state machine block and terminal-state sentence need to
include the documented timed_out terminal path: add transitions from Running to
timed_out (e.g., "Running ──→ TimedOut") and from WaitingForInput to timed_out
(e.g., "WaitingForInput ──→ TimedOut"), and update the terminal-states sentence
to list timed_out alongside Done, Failed, and Cancelled; update any casing to
match the lifecycle text’s "timed_out" token so the examples and sentences
consistently show that state as terminal.
In `@src/agent/channel.rs`:
- Around line 2960-2980: The RecvError::Lagged branch currently only logs and
doesn’t update the inactivity deadline, causing spurious timeouts when events
were dropped; in the handler for
tokio::sync::broadcast::error::RecvError::Lagged(skipped) (the match arm
handling event_rx.recv()), call the same deadline reset logic used for Ok(event)
— i.e., set deadline = tokio::time::Instant::now() + timeout_duration — after
logging (or before), so that is_worker_progress_event’s intent is preserved even
when the channel lagged for worker_id.
- Around line 3295-3313: The two functions is_worker_terminal_failure and
classify_worker_terminal_state rely on hard-coded prefix literals that must
match the format!() messages in spawn_worker_task and
send_worker_terminal_events; extract those prefixes into shared constants (e.g.,
WORKER_FAILED_PREFIX, WORKER_TIMED_OUT_PREFIX, WORKER_CANCELLED_PREFIX) and use
those constants both in the definitions of is_worker_terminal_failure and
classify_worker_terminal_state and in every format!() call in spawn_worker_task
and send_worker_terminal_events so that string matching remains consistent
across producers and consumers.
In `@src/db.rs`:
- Around line 75-106: The test migration_versions_are_unique can vacuously pass
if there are zero .sql files; after the loop, add a non-empty guard such as
asserting that seen_versions is not empty (or a count of discovered .sql files >
0) and emit a clear error like "no migrations found in migrations/"; update
references in the function (seen_versions or a new counter) so the assertion
runs after iterating the directory to fail the test when no migration files are
present.
In `@src/messaging/discord.rs`:
- Around line 29-30: The doc comment for the struct field progress_messages is
inaccurate: the key is scoped by progress_message_key as "channel_id:worker_id"
(per-worker-per-channel), not just per-channel; update the comment on
progress_messages to state "Per-worker-per-channel progress message used for
worker checkpoint edits" and reference progress_message_key and the HashMap key
format to make intent clear.
- Around line 112-132: The TOCTOU happens because existing_id is read under a
read lock then dropped, allowing concurrent upserts to both send messages; fix
by performing the existence check and potential insert while holding a single
write lock: acquire self.progress_messages.write().await at the start of
upsert_progress_message (or the enclosing method), check map.get(&key).copied(),
attempt to edit the message if present, and if edit fails or none is present
create/send the new message and insert its id into the map before releasing the
write lock so no concurrent caller can also send a message for the same key;
keep the same EditMessage/CreateMessage logic (use
channel_id.edit_message/send_message and insert sent.id) but under the write
lock to prevent duplicates.
In `@src/tools/cancel.rs`:
- Around line 110-121: The displayed cancel message currently uses the raw
args.reason which can be whitespace-only and diverge from how cancel_worker in
src/agent/channel.rs normalises reasons; compute a trimmed/display reason first
(e.g. derive display_reason from args.reason via
.as_deref().map(str::trim).filter(|v| !v.is_empty()).unwrap_or("cancelled by
request")) and then build the message using display_reason along with
args.process_type and args.process_id so the confirmation string matches the
stored reason.
---
Outside diff comments:
In `@src/tools/browser.rs`:
- Around line 947-964: The call to browser.close() inside handle_close should be
wrapped in the existing with_action_timeout helper to avoid hanging; take the
browser out of state as currently done, then call with_action_timeout(...).await
(passing the close future, matching the other usages of with_action_timeout) and
retain the current soft-handling: if with_action_timeout returns Err (whether a
timeout or close error), log it with tracing::warn!(%error, "browser close
returned error"). Keep the rest of state cleanup (state.pages.clear(),
state.active_target = None, etc.) and ensure the call uses the same timeout
semantics as other CDP operations.
- Around line 1093-1099: The current truncate_for_display uses &text[..max_len]
which can panic for multi-byte UTF-8; change it to compute a char-boundary-safe
end index (e.g. let end = text.floor_char_boundary(max_len); ) and then slice
with &text[..end] (or text.get(..end).unwrap_or(text)) and format!("{}...",
&text[..end]). Apply the same char-boundary-safe slice fix to the similar
truncation in handle_content (the occurrence noted at the other truncation site)
so both places avoid panics on multi-byte characters.
---
Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 473-492: The timer tick arm currently awaits
flush_due_worker_delivery_receipts and flush_due_worker_task_contract_deadlines
which performs DB round-trips and blocks the channel event loop; instead,
capture/clone the minimal context (e.g., self.response_tx.clone(),
self.state.process_run_logger.clone(), self.id.clone() or channel id) and
tokio::spawn an async fire-and-forget task that performs the claim+send work,
then immediately update worker_receipt_dispatch_deadline and
worker_contract_tick_deadline in the select! arm without awaiting the spawned
tasks; apply this pattern for both flush_due_worker_delivery_receipts and
flush_due_worker_task_contract_deadlines so the channel remains responsive while
background tasks use cloned response_tx to drive outbound calls.
In `@src/conversation/history.rs`:
- Around line 747-750: The code is masking DB decode errors by using
row.try_get(...).unwrap_or_default() for fields like contract_id, worker_id_raw,
task_summary, and attempt_count; replace these unwrap_or_default calls with
proper error handling—either propagate the Err from row.try_get(...) (using ? or
map_err to add context) or log and return an explicit error so a failed decode
doesn't become an empty ID/zero attempt and cause no-op updates/retry churn;
apply the same change to the other occurrences you noted (around the ranges
827–830, 909–910, 1074, 1092–1096, 1135, 1153–1157, 1239–1240) so all
row.try_get calls fail loudly with contextual errors instead of defaulting
silently.
- Around line 1683-1684: The ORDER BY clause uses only "created_at DESC" which
yields non-deterministic ordering for rows with identical timestamps; update the
SQL fragment that contains "ORDER BY created_at DESC LIMIT ?" to include the id
tie-breaker (for example "ORDER BY created_at DESC, id DESC LIMIT ?") so rows
with the same created_at are deterministically ordered by id.
- Around line 239-243: status_fingerprint currently uses
std::collections::hash_map::DefaultHasher which is not stable across toolchain
versions; replace it with a stable cryptographic hash (e.g., SHA-256 or blake3)
so persisted fingerprints don’t change between Rust upgrades. Update the
status_fingerprint function to compute a fixed digest using a stable hasher (for
example, sha2::Sha256 or blake3), format the digest as a hex string (or
truncated hex prefix if you need a shorter fingerprint), and ensure you add the
chosen crate to Cargo.toml and import it in conversation::history.rs; keep the
function name status_fingerprint and return type String to minimize call-site
changes.
- Around line 627-639: The UPDATE on worker_task_contracts currently sets
non-terminal states to WORKER_CONTRACT_STATE_ACKED, which can regress states
like progressing or sla_missed; change the CASE so it only transitions specific
pre-ack states (e.g., pending) to ACKED and leaves other non-terminal states
unchanged: modify the WHEN clause in the UPDATE (the statement using
WORKER_CONTRACT_STATE_TERMINAL_PENDING / TERMINAL_ACKED / TERMINAL_FAILED and
binding WORKER_CONTRACT_STATE_ACKED) so that only the intended source state(s)
are matched and bound for transition to WORKER_CONTRACT_STATE_ACKED, leaving all
other states untouched; update the .bind(...) arguments to match the new WHEN
condition order for the CASE.
In `@src/db.rs`:
- Around line 91-101: The empty-version guard is already implemented: keep the
existing logic that splits file_name with file_name.split_once('_') and the
assertions on version (the assert!(!version.is_empty(), ...) and
assert!(version.chars().all(|character| character.is_ascii_digit()), ...))
as-is; no code changes are necessary for functions handling migration filename
parsing (refer to file_name, version, and the two assert! calls).
In `@src/messaging/discord.rs`:
- Around line 481-487: The match arms for StatusUpdate::StopTyping,
::ToolStarted, ::ToolCompleted, and ::BranchStarted correctly return false
(mapped to DeliveryOutcome::NotSurfaced) so no functional change is needed;
ensure this is intentional by verifying that the boolean-to-DeliveryOutcome
mapping treats false as NotSurfaced and that the call to
self.stop_typing(message).await in these arms properly awaits, handles or logs
any errors (refer to the stop_typing method and the match on StatusUpdate in
src/messaging/discord.rs) so the behavior and error handling remain explicit and
consistent.
---
Nitpick comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Around line 481-488: The docs for [defaults.worker_contract] (keys ack_secs,
progress_secs, tick_secs) don't explain that a value of 0 is treated as "unset"
(use fallback behavior); update the table description and/or add a short note
under the `[defaults.worker_contract]` section clarifying that setting any of
ack_secs, progress_secs, or tick_secs to 0 disables the explicit duration and
triggers parser fallback/default behavior so users won't be surprised by 0
meaning "use default" rather than "no timeout".
In `@src/agent/channel.rs`:
- Around line 1894-1973: Duplicate async spawn logic in the
ProcessEvent::ToolStarted and ProcessEvent::ToolCompleted arms (and similarly in
the WorkerPermission and WorkerQuestion arms) should be extracted into a small
helper to remove repetition: create a helper function (e.g.,
schedule_touch_worker_task_contract_progress or spawn_touch_progress) that
accepts the cloned run_logger, channel id, event_worker_id, tool_name:
Option<String>/&str and progress_secs and performs the tokio::spawn block
calling run_logger.touch_worker_task_contract_progress(...).await and logging
the tracing::warn on error; then replace the inline tokio::spawn blocks in
ProcessEvent::ToolStarted, ProcessEvent::ToolCompleted, and the
WorkerPermission/WorkerQuestion arms with calls to this helper, passing the same
cloned values (run_logger.clone(), self.id.clone(), *worker_id,
tool_name.clone(), progress_secs).
In `@src/tools/browser.rs`:
- Around line 786-787: The two sequential calls use separate with_action_timeout
wrappers ("act focus" around element.click() and "act type" around
element.type_str(&text)), which allows up to double the intended wall-clock
time; change to a single with_action_timeout that runs both actions in sequence
(call element.click() then element.type_str(&text) inside one closure) so the
total operation is bounded by one timeout, or explicitly document/keep separate
if independent timeouts are desired.
- Around line 521-522: The async Mutex field state in BrowserTool is
intentionally held across long awaits (e.g., the with_action_timeout("browser
launch", Browser::launch(chrome_config)).await? call and other handlers), which
can block concurrent calls for up to the timeout; add a brief doc comment on the
state field (or the BrowserTool type) documenting this design choice and its
implications (that browser operations are sequential per worker and the mutex is
intentionally held during long-running browser operations like
with_action_timeout and Browser::launch), so future maintainers know why we
don't release the lock before those awaits and can decide whether to refactor
for finer-grained locking if concurrency becomes necessary.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(features)/workers.mdxmigrations/20260225000001_worker_events.sqlsrc/agent/channel.rssrc/api/workers.rssrc/config.rssrc/conversation/history.rssrc/db.rssrc/messaging/discord.rssrc/messaging/webchat.rssrc/tools/browser.rssrc/tools/cancel.rssrc/tools/conclude_link.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/messaging/webchat.rs
- migrations/20260225000001_worker_events.sql
- src/tools/conclude_link.rs
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
src/agent/channel.rs (2)
2929-2935:⚠️ Potential issue | 🟡 MinorReset inactivity deadline on lagged worker events.
When the timeout watcher lags, events were dropped because the worker was active. Not extending the deadline here can trigger false timeouts.
🐛 Proposed fix
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { tracing::warn!( worker_id = %worker_id, skipped, "worker timeout watcher lagged on event stream" ); + deadline = tokio::time::Instant::now() + timeout_duration; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2929 - 2935, The timeout watcher currently ignores Tokio broadcast::RecvError::Lagged in the match arm (the Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) branch) and only logs a warning, which can cause false timeouts because the worker was active but events were dropped; update that branch to treat a Lagged event as activity by resetting the watcher’s inactivity deadline (e.g., update the same deadline/last_activity variable used elsewhere in the timeout watcher logic) so the worker_id’s timeout is extended when skipped events are observed. Ensure you reference the RecvError::Lagged arm and the inactivity deadline/last_activity variable used by the timeout watcher code when making the change.
493-503:⚠️ Potential issue | 🟠 MajorDeadline receipt/contract scans still block the channel loop.
These paths run multiple awaited SQLite reads/writes inline on the main channel loop, so slow DB I/O can pause message/event handling and make the channel feel unresponsive.
As per coding guidelines: "Use fire-and-forget DB writes with tokio::spawn for conversation history saves, memory writes, and worker log persistence. User gets their response immediately."
Also applies to: 2180-2342
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 493 - 503, The deadline-check branches in the main channel loop are performing awaited DB I/O (calls to flush_due_worker_delivery_receipts() and flush_due_worker_task_contract_deadlines() inside the loop), which blocks message/event handling; change these to fire-and-forget background tasks by offloading the DB work to tokio::spawn (or a dedicated background worker) so the loop only triggers the job and continues immediately; specifically, replace the await calls in the loop with spawning a task that clones any required state/handles and calls flush_due_worker_delivery_receipts() or flush_due_worker_task_contract_deadlines() (and similarly refactor the DB-heavy code paths around functions referenced in the 2180-2342 region) and ensure spawned tasks capture/clone necessary Arc/Mutex handles and log errors instead of awaiting them in the loop.
🧹 Nitpick comments (2)
src/conversation/history.rs (2)
1482-1541:close_orphaned_runsexecutes 4 separate writes without a wrapping transaction.A crash between any two steps (e.g., between the
worker_runsUPDATE and theworker_delivery_receiptsUPDATE) leaves a partially cleaned startup state. TheWHEREguards on each step are idempotent, so the next restart recovers safely, but in-process queries between steps may briefly see an inconsistent view. Wrapping all fourexecutecalls in a singleBEGIN/COMMITwould make the cleanup atomic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1482 - 1541, close_orphaned_runs runs four separate UPDATEs (worker_result, branch_result, receipt_result, contract_result) without a transaction; wrap all queries in a single database transaction so the four updates are applied atomically: start a transaction (e.g., pool.begin()), run each sqlx::query against the transaction handle instead of &self.pool, .execute(&mut tx).await for each, capture their rows_affected values, and then commit the transaction; on any error rollback and propagate the error so partial updates are not left visible between steps.
445-468:log_worker_startedduplicates theworker_eventsINSERT instead of reusinglog_worker_event_with_context.The private
log_worker_event_with_contexthelper exists exactly for this purpose.log_worker_startedalready haschannel_idandagent_idin scope and re-implements the same INSERT SQL inline.log_worker_completed(Lines 527–552) has the same duplication. Any future schema change toworker_eventsmust now be applied in three places.♻️ Proposed refactor for `log_worker_started`
- let payload_json = serde_json::json!({ - "task": task, - "worker_type": worker_type, - }) - .to_string(); - let event_id = uuid::Uuid::new_v4().to_string(); - - if let Err(error) = sqlx::query( - "INSERT INTO worker_events \ - (id, worker_id, channel_id, agent_id, event_type, payload_json) \ - VALUES (?, ?, ?, ?, 'started', ?)", - ) - .bind(&event_id) - .bind(&id) - .bind(&channel_id) - .bind(&agent_id) - .bind(&payload_json) - .execute(&pool) - .await - { - tracing::warn!(%error, worker_id = %id, "failed to persist worker start event"); - }After the worker_runs INSERT succeeds, replace the inline block with a call to
self.log_worker_event_with_context:// (outside the spawn, before it) self.log_worker_event_with_context( id.clone(), channel_id.clone(), Some(agent_id.clone()), "started".to_string(), Some(serde_json::json!({"task": task, "worker_type": worker_type}).to_string()), );Apply the same pattern to
log_worker_completed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 445 - 468, log_worker_started (and similarly log_worker_completed) duplicates the INSERT into worker_events instead of calling the existing helper log_worker_event_with_context; replace the inline sqlx::query block in log_worker_started with a call to self.log_worker_event_with_context(id.clone(), channel_id.clone(), Some(agent_id.clone()), "started".to_string(), Some(serde_json::json!({"task": task, "worker_type": worker_type}).to_string())) (and do the analogous replacement in log_worker_completed), ensuring you construct the payload_json the same way and pass agent_id as Some(...) where available so all worker_events INSERTs go through the single helper.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/channel.rs`:
- Around line 1485-1490: The outbound status send currently swallows errors with
"let _ = ...send(...).await"; change it to handle the Result from
response_tx.send(OutboundEnvelope::from(OutboundResponse::Status(crate::StatusUpdate::Thinking))).await
(and the similar occurrence around 1730-1735) by matching or using if let
Err(err) = ... and then log or propagate the error (e.g., process or log a
warning/error including err) instead of discarding it so delivery failures for
lifecycle signals are visible.
In `@src/conversation/history.rs`:
- Around line 1173-1198: The loop that claims rows and builds
PendingWorkerDeliveryReceipt uses unwrap_or_default() which can produce garbage
while leaving the receipt in 'sending'; replace each unwrap_or_default() for
fields (id, worker_id, channel_id, terminal_state, payload_text, attempt_count)
with the same match pattern used earlier (e.g., match row.try_get("...") { Ok(v)
=> v, Err(e) => { warn!(...), attempt to revert the claim by updating the row
back to status = 'pending' if still 'sending' (execute an UPDATE ... WHERE id =
? AND status = 'sending'), then continue; } }) so that any decode error logs a
warning, reverts the status and skips adding the malformed row; apply the same
change in claim_due_worker_terminal_receipts_any and use the
PendingWorkerDeliveryReceipt construction only after all fields decoded
successfully.
---
Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 2929-2935: The timeout watcher currently ignores Tokio
broadcast::RecvError::Lagged in the match arm (the
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) branch) and only
logs a warning, which can cause false timeouts because the worker was active but
events were dropped; update that branch to treat a Lagged event as activity by
resetting the watcher’s inactivity deadline (e.g., update the same
deadline/last_activity variable used elsewhere in the timeout watcher logic) so
the worker_id’s timeout is extended when skipped events are observed. Ensure you
reference the RecvError::Lagged arm and the inactivity deadline/last_activity
variable used by the timeout watcher code when making the change.
- Around line 493-503: The deadline-check branches in the main channel loop are
performing awaited DB I/O (calls to flush_due_worker_delivery_receipts() and
flush_due_worker_task_contract_deadlines() inside the loop), which blocks
message/event handling; change these to fire-and-forget background tasks by
offloading the DB work to tokio::spawn (or a dedicated background worker) so the
loop only triggers the job and continues immediately; specifically, replace the
await calls in the loop with spawning a task that clones any required
state/handles and calls flush_due_worker_delivery_receipts() or
flush_due_worker_task_contract_deadlines() (and similarly refactor the DB-heavy
code paths around functions referenced in the 2180-2342 region) and ensure
spawned tasks capture/clone necessary Arc/Mutex handles and log errors instead
of awaiting them in the loop.
---
Nitpick comments:
In `@src/conversation/history.rs`:
- Around line 1482-1541: close_orphaned_runs runs four separate UPDATEs
(worker_result, branch_result, receipt_result, contract_result) without a
transaction; wrap all queries in a single database transaction so the four
updates are applied atomically: start a transaction (e.g., pool.begin()), run
each sqlx::query against the transaction handle instead of &self.pool,
.execute(&mut tx).await for each, capture their rows_affected values, and then
commit the transaction; on any error rollback and propagate the error so partial
updates are not left visible between steps.
- Around line 445-468: log_worker_started (and similarly log_worker_completed)
duplicates the INSERT into worker_events instead of calling the existing helper
log_worker_event_with_context; replace the inline sqlx::query block in
log_worker_started with a call to self.log_worker_event_with_context(id.clone(),
channel_id.clone(), Some(agent_id.clone()), "started".to_string(),
Some(serde_json::json!({"task": task, "worker_type": worker_type}).to_string()))
(and do the analogous replacement in log_worker_completed), ensuring you
construct the payload_json the same way and pass agent_id as Some(...) where
available so all worker_events INSERTs go through the single helper.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lock,!**/*.lockCargo.tomlis excluded by!**/*.toml
📒 Files selected for processing (2)
src/agent/channel.rssrc/conversation/history.rs
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (3)
src/conversation/history.rs (1)
1186-1278:⚠️ Potential issue | 🟠 MajorReceipts stranded in
sendingif field decoding fails after claim — also applies toclaim_due_worker_terminal_receipts_any.After setting
status = 'sending'(Line 1186–1195), any field decode failure causes acontinuethat skips the push without reverting the row topending. The receipt is stuck insendingfor the current process lifetime and only re-queued byclose_orphaned_runson restart.Apply the revert-on-decode-failure pattern suggested in the prior review (add a best-effort
UPDATE … SET status = 'pending' WHERE id = ? AND status = 'sending'before eachcontinuethat follows a successful claim).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1186 - 1278, After successfully setting status = 'sending' (the `updated` check after the `sqlx::query(... UPDATE worker_delivery_receipts ...)` that binds `receipt_id`), add a best-effort rollback before every `continue` that follows a failed `row.try_get(...)`: execute an `UPDATE worker_delivery_receipts SET status = 'pending' WHERE id = ? AND status = 'sending'` (using the same `&mut *tx` and `receipt_id`) and ignore/log any errors, then continue; apply this same pattern in the analogous function `claim_due_worker_terminal_receipts_any` and around all `row.try_get` branches that currently `continue` after a successful claim so receipts aren't stranded in `sending`.src/agent/channel.rs (2)
186-194:⚠️ Potential issue | 🟡 MinorReturn a non-success outcome when cancellation didn’t actually happen.
If
handle.awaitreturnsOk(()), the worker finished before cancellation took effect, but this still returnsOk(()). Upstream callers can then report a successful cancel even though no cancel occurred.Suggested fix
- Ok(()) => { + Ok(()) => { tracing::debug!( worker_id = %worker_id, channel_id = %self.channel_id, "worker finished before cancellation took effect" ); + return Err(format!("Worker {worker_id} already completed")); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 186 - 194, When awaiting the worker handle (handle.await) you currently log when it returns Ok(()) (worker finished before cancellation) but then still return Ok(()) to callers; change that code path so it returns a non-success outcome instead (e.g., return an Err or a CancelOutcome::NotCancelled) so upstream code can detect that cancellation did not occur. Specifically, inside the cancellation routine where you check handle.await (the branch that logs worker_id and self.channel_id), replace the final Ok(()) return with an appropriate error/result variant indicating "cancel failed/already finished" and keep the tracing::debug log.
1485-1490:⚠️ Potential issue | 🟡 MinorDon’t swallow outbound status send failures.
ThinkingandStopTypingstatus sends still uselet _ = ..., which hides delivery failures.Suggested fix
- let _ = self + if let Err(error) = self .response_tx .send(OutboundEnvelope::from(OutboundResponse::Status( crate::StatusUpdate::Thinking, ))) - .await; + .await + { + tracing::debug!(%error, channel_id = %self.id, "failed to send thinking status"); + } ... - let _ = self + if let Err(error) = self .response_tx .send(OutboundEnvelope::from(OutboundResponse::Status( crate::StatusUpdate::StopTyping, ))) - .await; + .await + { + tracing::debug!(%error, channel_id = %self.id, "failed to send stop-typing status"); + }As per coding guidelines: "Don't silently discard errors; no let _ = on Results. Handle, log, or propagate errors. Only exception is .ok() on channel sends where the receiver may be dropped".
Also applies to: 1730-1735
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 1485 - 1490, The code silences send errors by using "let _ = self.response_tx.send(...).await" for status messages; replace those with proper error handling for the OutboundResponse::Status sends (for Thinking and StopTyping) by awaiting the send, matching the Result, and either logging the error or returning/propagating it instead of discarding it. Locate the send calls using self.response_tx.send(OutboundEnvelope::from(OutboundResponse::Status(crate::StatusUpdate::Thinking))) and the analogous StopTyping send (also the similar block around the 1730-1735 region) and change them to handle Err(e) (e.g., process_logger.error or propagate the error) rather than using let _ =.
🧹 Nitpick comments (4)
docs/content/docs/(configuration)/config.mdx (1)
87-91: Consider adding inline comments for consistency with other config sections.Every other
[defaults.*]block in this TOML example has inline comments describing each key (e.g., lines 83–85 for compaction, lines 95–98 for cortex). The newworker_contractblock is the only one without them.📝 Suggested inline comments
# Deterministic worker task contract timing. [defaults.worker_contract] -ack_secs = 5 -progress_secs = 45 -tick_secs = 2 +ack_secs = 5 # deadline to confirm worker start was surfaced +progress_secs = 45 # deadline between meaningful progress updates +tick_secs = 2 # poll interval for contract deadline checks🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(configuration)/config.mdx around lines 87 - 91, Add inline comments to the [defaults.worker_contract] TOML block to match the style of other sections: annotate ack_secs, progress_secs, and tick_secs with brief descriptions of their purpose (e.g., ack timeout, progress heartbeat interval, and scheduling tick interval) and example units/values consistent with nearby blocks so the new block follows the same documentation format as the other [defaults.*] entries.docs/content/docs/(features)/workers.mdx (1)
153-163: Worker event journal section is concise and complete.Documents the event types, append-only semantics, and exposure via API/
worker_inspect. One small nit: the bullet list mixes backtick-formatted event names with plain text descriptions using+separators — this reads fine but consider whether/intool_started/tool_completedshould be two separate bullet items for consistency with the other entries.📝 Minor formatting suggestion
- `started` with task + worker type - `status` checkpoints -- `tool_started` / `tool_completed` -- `permission` / `question` +- `tool_started` +- `tool_completed` +- `permission` +- `question` - `completed` with terminal summary🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(features)/workers.mdx around lines 153 - 163, Split the combined "tool_started / tool_completed" line into two separate bullets for consistency with other event entries: list `tool_started` and `tool_completed` as distinct items in the Worker Event Journal section (which documents append-only `worker_events` and is surfaced via the workers API / `worker_inspect`), keeping the surrounding bullets (`started`, `status`, `permission`/`question`, `completed`) unchanged and preserving backtick formatting for the event names.src/messaging/discord.rs (1)
431-481: Extract repeated worker progress update flow into a helper.
WorkerStarted,WorkerCheckpoint, andWorkerCompletedrepeat the same stop-typing / upsert / log / surfaced logic. Consolidating this will reduce drift risk and make future contract changes safer.♻️ Proposed refactor
+ async fn update_worker_progress( + &self, + message: &InboundMessage, + worker_id: crate::WorkerId, + text: String, + clear_after: bool, + ) -> bool { + self.stop_typing(message).await; + match self.upsert_progress_message(message, worker_id, &text).await { + Ok(()) => { + if clear_after { + self.clear_progress_message(message, worker_id).await; + } + true + } + Err(error) => { + tracing::debug!(%error, "failed to update discord progress message"); + false + } + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/discord.rs` around lines 431 - 481, Create a helper that centralizes the repeated stop-typing / upsert / log / optional-clear flow used by StatusUpdate::WorkerStarted, StatusUpdate::WorkerCheckpoint and StatusUpdate::WorkerCompleted: implement a method (e.g., handle_worker_progress) that accepts the message, worker_id, text, and a clear_on_success boolean, calls self.stop_typing(message).await, calls self.upsert_progress_message(message, worker_id, &text).await, logs the error with tracing::debug!(%error, ...) on Err and returns false, and on Ok optionally calls self.clear_progress_message(message, worker_id).await when clear_on_success is true and then returns true; then replace the three match arms to build the text using short_worker_id(...) and call this helper with clear_on_success = true only for WorkerCompleted and false for the others.src/conversation/history.rs (1)
1959-1973:list_worker_events: non-nullable fields useunwrap_or_default()— silently returns corrupt rows.
id,worker_id, andevent_typeare non-nullable primary/required fields. A decode error silently produces empty-string values, emitting malformedWorkerEventRows to callers. The claim paths in the same file usematch row.try_get(...) { Err(_) => continue }for the same class of error.♻️ Proposed refactor
- let mut events = rows - .into_iter() - .map(|row| WorkerEventRow { - id: row.try_get("id").unwrap_or_default(), - worker_id: row.try_get("worker_id").unwrap_or_default(), - channel_id: row.try_get("channel_id").ok(), - agent_id: row.try_get("agent_id").ok(), - event_type: row.try_get("event_type").unwrap_or_default(), - payload_json: row.try_get("payload_json").ok(), - created_at: row - .try_get::<chrono::DateTime<chrono::Utc>, _>("created_at") - .map(|t| t.to_rfc3339()) - .unwrap_or_default(), - }) - .collect::<Vec<_>>(); + let mut events = Vec::with_capacity(rows.len()); + for row in rows { + let id: String = match row.try_get("id") { + Ok(value) => value, + Err(error) => { + tracing::warn!(%error, worker_id = %worker_id, "skipping malformed worker_event row (id)"); + continue; + } + }; + let event_worker_id: String = match row.try_get("worker_id") { + Ok(value) => value, + Err(error) => { + tracing::warn!(%error, worker_id = %worker_id, "skipping malformed worker_event row (worker_id)"); + continue; + } + }; + let event_type: String = match row.try_get("event_type") { + Ok(value) => value, + Err(error) => { + tracing::warn!(%error, worker_id = %worker_id, "skipping malformed worker_event row (event_type)"); + continue; + } + }; + events.push(WorkerEventRow { + id, + worker_id: event_worker_id, + channel_id: row.try_get("channel_id").ok(), + agent_id: row.try_get("agent_id").ok(), + event_type, + payload_json: row.try_get("payload_json").ok(), + created_at: row + .try_get::<chrono::DateTime<chrono::Utc>, _>("created_at") + .map(|t| t.to_rfc3339()) + .unwrap_or_default(), + }); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/conversation/history.rs` around lines 1959 - 1973, In list_worker_events the mapping builds WorkerEventRow using unwrap_or_default for non-nullable fields (id, worker_id, event_type), which can silently produce corrupt rows; change the map to explicitly handle try_get errors for those fields (e.g., use match or filter_map) and skip any rows where extracting id/worker_id/event_type fails (mirror the claim-paths logic that does Err(_) => continue) so only fully-decoded WorkerEventRow values are collected; keep optional fields using .ok() and preserve created_at conversion behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/conversation/history.rs`:
- Around line 516-551: log_worker_completed currently logs a warning on the
failing UPDATE of worker_runs but still proceeds to insert a 'completed' row
into worker_events; modify log_worker_completed so that after the
sqlx::query(...) that attempts to UPDATE worker_runs (the one that currently
only triggers tracing::warn!(%error, worker_id = %id, "failed to persist worker
completion")), you return early on Err(error) (matching log_worker_started
behavior) instead of continuing to build payload_json/event_id and executing the
INSERT into worker_events; ensure you still log the error before returning and
reference the same variables (id, pool) so no further INSERTs (the sqlx::query
that binds event_id, id, payload_json and executes) run when the UPDATE fails.
In `@src/tools/browser.rs`:
- Around line 959-964: The code currently logs errors from
Self::with_action_timeout("browser close", async { browser.close().await }) but
still proceeds to return "Browser closed", which misreports failures; modify the
block that handles state.browser.take() so that if close_result is Err(error)
you return or propagate an Err (instead of continuing) — for example, after
logging the error from browser.close() return an Err with that error (or convert
it into the method's error type) so callers see the failure; update references
in this function that now rely on a successful close (the state.browser.take()
handling and the surrounding return value) to handle the early error return.
---
Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 186-194: When awaiting the worker handle (handle.await) you
currently log when it returns Ok(()) (worker finished before cancellation) but
then still return Ok(()) to callers; change that code path so it returns a
non-success outcome instead (e.g., return an Err or a
CancelOutcome::NotCancelled) so upstream code can detect that cancellation did
not occur. Specifically, inside the cancellation routine where you check
handle.await (the branch that logs worker_id and self.channel_id), replace the
final Ok(()) return with an appropriate error/result variant indicating "cancel
failed/already finished" and keep the tracing::debug log.
- Around line 1485-1490: The code silences send errors by using "let _ =
self.response_tx.send(...).await" for status messages; replace those with proper
error handling for the OutboundResponse::Status sends (for Thinking and
StopTyping) by awaiting the send, matching the Result, and either logging the
error or returning/propagating it instead of discarding it. Locate the send
calls using
self.response_tx.send(OutboundEnvelope::from(OutboundResponse::Status(crate::StatusUpdate::Thinking)))
and the analogous StopTyping send (also the similar block around the 1730-1735
region) and change them to handle Err(e) (e.g., process_logger.error or
propagate the error) rather than using let _ =.
In `@src/conversation/history.rs`:
- Around line 1186-1278: After successfully setting status = 'sending' (the
`updated` check after the `sqlx::query(... UPDATE worker_delivery_receipts ...)`
that binds `receipt_id`), add a best-effort rollback before every `continue`
that follows a failed `row.try_get(...)`: execute an `UPDATE
worker_delivery_receipts SET status = 'pending' WHERE id = ? AND status =
'sending'` (using the same `&mut *tx` and `receipt_id`) and ignore/log any
errors, then continue; apply this same pattern in the analogous function
`claim_due_worker_terminal_receipts_any` and around all `row.try_get` branches
that currently `continue` after a successful claim so receipts aren't stranded
in `sending`.
---
Nitpick comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Around line 87-91: Add inline comments to the [defaults.worker_contract] TOML
block to match the style of other sections: annotate ack_secs, progress_secs,
and tick_secs with brief descriptions of their purpose (e.g., ack timeout,
progress heartbeat interval, and scheduling tick interval) and example
units/values consistent with nearby blocks so the new block follows the same
documentation format as the other [defaults.*] entries.
In `@docs/content/docs/`(features)/workers.mdx:
- Around line 153-163: Split the combined "tool_started / tool_completed" line
into two separate bullets for consistency with other event entries: list
`tool_started` and `tool_completed` as distinct items in the Worker Event
Journal section (which documents append-only `worker_events` and is surfaced via
the workers API / `worker_inspect`), keeping the surrounding bullets (`started`,
`status`, `permission`/`question`, `completed`) unchanged and preserving
backtick formatting for the event names.
In `@src/conversation/history.rs`:
- Around line 1959-1973: In list_worker_events the mapping builds WorkerEventRow
using unwrap_or_default for non-nullable fields (id, worker_id, event_type),
which can silently produce corrupt rows; change the map to explicitly handle
try_get errors for those fields (e.g., use match or filter_map) and skip any
rows where extracting id/worker_id/event_type fails (mirror the claim-paths
logic that does Err(_) => continue) so only fully-decoded WorkerEventRow values
are collected; keep optional fields using .ok() and preserve created_at
conversion behavior.
In `@src/messaging/discord.rs`:
- Around line 431-481: Create a helper that centralizes the repeated stop-typing
/ upsert / log / optional-clear flow used by StatusUpdate::WorkerStarted,
StatusUpdate::WorkerCheckpoint and StatusUpdate::WorkerCompleted: implement a
method (e.g., handle_worker_progress) that accepts the message, worker_id, text,
and a clear_on_success boolean, calls self.stop_typing(message).await, calls
self.upsert_progress_message(message, worker_id, &text).await, logs the error
with tracing::debug!(%error, ...) on Err and returns false, and on Ok optionally
calls self.clear_progress_message(message, worker_id).await when
clear_on_success is true and then returns true; then replace the three match
arms to build the text using short_worker_id(...) and call this helper with
clear_on_success = true only for WorkerCompleted and false for the others.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(features)/workers.mdxsrc/agent/channel.rssrc/conversation/history.rssrc/db.rssrc/messaging/discord.rssrc/tools/browser.rssrc/tools/cancel.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/db.rs
| @@ -1649,7 +2062,7 @@ impl Channel { | |||
| self.state.worker_handles.write().await.remove(worker_id); | |||
There was a problem hiding this comment.
[🟡 Medium]
The WorkerComplete handler now gates both history injection and retrigger on !is_worker_terminal_failure(result). Previously, all completions (including failures) were added to the channel conversation history and triggered the LLM retrigger. Now, failed/timed_out/cancelled workers rely solely on the durable receipt system for user notification. This means the LLM never learns about the failure in its conversation context and cannot self-correct (e.g., "the worker failed, let me try a different approach"). If a user asks about the failure, the LLM has no context until it explicitly uses worker_inspect. The receipt delivers a terse message to the user, but the loss of retrigger means the channel LLM won't proactively acknowledge the failure.
// src/agent/channel.rs
if *notify && !is_worker_terminal_failure(result) {
let mut history = self.state.history.write().await;
let worker_message = format!("[Worker {worker_id} completed]: {result}");
history.push(rig::message::Message::from(worker_message));Consider at minimum injecting the failure into history (even without retriggering) so the LLM has context when the user follows up, or add a separate lightweight retrigger for failures.
|
I think this conflicts with the architecture we just merged in #192. In #192, we made
This PR adds a second durable history store (
That creates overlap with #192 in a few places:
I am onboard with the delivery-contract / delivery-outcome hardening in this PR. Request: either
|
Good call. I started this PR before the other merged in, i'll step back and create a new one that complements the new implementation |
Summary
This PR hardens long-running worker UX guarantees so users always get deterministic feedback, and adds durable worker event history for post-run debugging.
Why this matters
Spacebot can feel "hung" when worker status updates are accepted by an adapter but not actually surfaced to the user. This PR makes surfacing semantics explicit and ties them directly to worker task-contract acknowledgement and retry behavior.
It also adds a durable worker event timeline so we can debug worker runs after in-memory status is gone.
Changes
DeliveryOutcome(Surfaced/NotSurfaced) for messaging status delivery.worker_eventstable + indexes.startedstatustool_started/tool_completedpermission/questioncompletedProcessRunLogger::list_worker_events(...)worker_inspecttool output.20260224000001_worker_tool_calls.sql->20260224000003_worker_tool_calls.sql[defaults.worker_contract]settings.Validation
Build/lint
cargo fmt --all -- --check✅cargo check --all-targets✅cargo clippy --all-targets✅ (warnings only)Targeted tests
cargo test db::tests::migration_versions_are_unique --lib✅cargo test conversation::history::tests --lib -- --test-threads=1✅cargo test worker_ack_checkpoint_is_deterministic --lib✅cargo test worker_progress_sla_nudge_is_deterministic --lib✅cargo test worker_task_timeout_emits_terminal_events --lib✅cargo test worker_timeout_resets_on_progress_events --lib✅Full lib suite
cargo test --lib -- --test-threads=1onnx/model.onnx) inmemory::search::*tests.Risk / rollout
worker_eventstable + indexes).NotSurfaced, enabling deterministic fallback/retry behavior instead of false-positive ack.Note
This PR introduces typed delivery outcomes for worker status messages and adds a durable event journal for debugging worker lifecycles. The changes make worker feedback deterministic by distinguishing between accepted and actually surfaced status updates. Schema additions are backward-compatible, and all messaging adapters are updated to support the new outcome semantics.
Written by Tembo for commit d2b612b.