fix: v0.6.0 correctness bugs (#84, #82, #95)#106
Conversation
Two correctness bugs in crash recovery: 1. recoverQueuedTasks() blindly re-enqueued tasks without checking DependencyRepository.isBlocked(). Blocked pipeline tasks would execute out of order after restart. Now checks dependencies and skips blocked tasks (fail-safe: skips on DB error too). 2. recoverRunningTasks() and cleanDeadWorkerRegistrations() marked crashed tasks as FAILED but never emitted TaskFailed events. DependencyHandler never resolved dependencies, leaving downstream pipeline tasks blocked forever. Now emits TaskFailed after each successful status update.
getExecutionHistory(scheduleId, 1) only returned the latest execution. Older overlapping CRON runs with in-flight tasks were missed. Now fetches all executions, filters to 'triggered' status (active runs), and cancels tasks across all of them. Completed/failed/missed/skipped executions are skipped to avoid unnecessary no-op cancellation events.
After stdout.slice(-tail), totalSize still reflected the full pre-slice size. Affected two code paths: 1. TaskManager.getLogs() DB fallback — now recalculates from sliced arrays 2. BufferedOutputCapture.getOutput() — recalculates when tail was applied, preserves byte-based buffer.totalSize for non-tail case
- Check TaskFailed emit result in RecoveryManager (2 sites) and log on failure instead of silently swallowing errors - Extract linesSize helper to avoid inline reduce duplication - Use top-level TaskId import instead of inline import() in schedule-manager - Use flatMap for cleaner active execution task collection - Format multi-line constructor in integration test
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant B as Bootstrap
participant RM as RecoveryManager
participant DR as DependencyRepo
participant TR as TaskRepository
participant Q as TaskQueue
participant EB as EventBus
participant DH as DependencyHandler
B->>RM: recover()
note over RM: Phase 0 — Dead Worker Cleanup
RM->>TR: findById(reg.taskId)
RM->>TR: update(taskId, FAILED)
RM->>EB: emit('TaskFailed') [NEW]
EB-->>DH: handleTaskFailed → resolveDependency
note over RM: Phase 2 — Recover QUEUED tasks
RM->>TR: findByStatus(QUEUED)
loop each queued task
RM->>Q: contains(task.id)?
RM->>DR: isBlocked(task.id) [NEW]
alt blocked
RM-->>RM: skip (blockedCount++)
else not blocked / error
RM->>Q: enqueue(task)
RM->>EB: emit('TaskQueued')
end
end
note over RM: Phase 3 — Recover RUNNING tasks
RM->>TR: findByStatus(RUNNING)
loop each running task
RM->>TR: findById (TOCTOU guard)
RM->>TR: update(taskId, FAILED)
RM->>EB: emit('TaskFailed') [NEW]
EB-->>DH: handleTaskFailed → resolves deps for QUEUED tasks skipped in Phase 2
end
Last reviewed commit: "fix: invert isBlocke..." |
| const historyResult = await this.scheduleRepository.getExecutionHistory(scheduleId); | ||
| if (historyResult.ok) { | ||
| const activeExecutions = historyResult.value.filter((e) => e.status === 'triggered'); |
There was a problem hiding this comment.
Active executions silently capped by
DEFAULT_LIMIT = 100
getExecutionHistory(scheduleId) is called here without an explicit limit, so the underlying SQLite query falls back to DEFAULT_LIMIT = 100 (ordered scheduled_for DESC). For a high-frequency schedule (e.g., every minute) that has accumulated more than 100 historical records, any "triggered" executions beyond the 100 most recent will never be returned — and therefore their tasks will never be cancelled.
The prior code fetched only the single latest execution, so this PR widens the window but leaves the tail open. A safer approach would be a purpose-built query (e.g., getActiveExecutions) that filters by status = 'triggered' at the database level with no limit, or at minimum pass a sufficiently large explicit limit:
const historyResult = await this.scheduleRepository.getExecutionHistory(scheduleId);Consider filtering at the SQL layer instead:
// e.g. new repository method:
// getActiveExecutions(scheduleId): Promise<Result<readonly ScheduleExecution[]>>
// SELECT * FROM schedule_executions WHERE schedule_id = ? AND status = 'triggered'
This avoids loading and JS-filtering up to 100 rows just to find the subset that are still active.
| import { err, ok, Result } from '../core/result.js'; | ||
|
|
||
| /** Sum the character lengths of all lines in an array */ | ||
| function linesSize(lines: readonly string[]): number { |
There was a problem hiding this comment.
BLOCKING: Duplicated linesSize utility function
The identical linesSize() helper is now defined in two separate files: here at line 13 AND in src/services/task-manager.ts:33. All 8 code reviews flagged this as a DRY violation and maintenance risk.
Impact: If the calculation logic ever needs to change (e.g., to account for newlines or switch to byte-length measurement), both sites must be updated in sync. It's easy to miss one and introduce divergence.
Fix: Extract to a shared utility file (src/utils/output.ts) and import from both:
// src/utils/output.ts
/** Sum the character lengths of all lines in an array */
export function linesSize(lines: readonly string[]): number {
return lines.reduce((sum, line) => sum + line.length, 0);
}Then remove both duplicate definitions and import in their place.
8 reviewers flagged this issue across architecture, complexity, consistency, performance, regression, security, and TypeScript reviews
| const frozenStdout = Object.freeze([...stdout]); | ||
| const frozenStderr = Object.freeze([...stderr]); | ||
| const wasTailSliced = tail !== undefined && tail > 0; | ||
| const totalSize = wasTailSliced |
There was a problem hiding this comment.
BLOCKING: totalSize uses character length but capture() uses byte length
The recalculated totalSize here uses linesSize() which sums line.length (character count), but the capture() method at line 51 accumulates totalSize using Buffer.byteLength(data, 'utf8') (byte count).
The Problem: For ASCII-only output these are identical, but for multi-byte characters (emoji, CJK, accented chars), the values diverge. When tail-slicing is applied, totalSize reflects character length. When NOT applied (non-tail code path at line 120), it reflects byte length. This means the same field returns different units depending on code path — a latent correctness bug.
Impact (6 reviewers flagged this): Consumers relying on totalSize comparisons (e.g., process-connector.ts:143) may behave differently depending on which path was taken. For Claude Code output (mostly ASCII) this is unlikely to cause bugs today, but it's a semantic inconsistency introduced by this fix.
Fix: Choose ONE unit consistently. Either:
- Use
Buffer.byteLengthinlinesSizeto matchcapture(), OR - Use
string.lengthincapture()to matchlinesSize()
Given the maxBufferSize limit uses byte-length, byte-length is likely correct:
function linesSize(lines: readonly string[]): number {
return lines.reduce((sum, line) => sum + Buffer.byteLength(line, 'utf8'), 0);
}Architecture, consistency, regression, security, and TypeScript reviews flagged this
| error: expect.objectContaining({ message: 'Worker process died (dead PID detected)' }), | ||
| exitCode: -1, | ||
| }); | ||
| }); |
There was a problem hiding this comment.
HIGH CONFIDENCE: Missing error path tests for TaskFailed emission failures
The tests above verify successful TaskFailed emission (happy paths), but the production code in recovery-manager.ts has error-handling branches (lines 129-132 and 271-274) that are NOT tested.
The Gap: When eventBus.emit('TaskFailed', ...) returns an error, the code logs it and continues recovery. But there are no tests verifying:
- The error message is logged correctly
- Recovery doesn't abort when event emission fails
Impact: If the emit-failure handler is accidentally removed or broken, no test would catch it. Given this is error-path logic (recovery during system failure), it deserves explicit coverage.
Missing Tests:
// For crashed task path
it('should log error but continue when TaskFailed emit fails for crashed task', async () => {
// Setup: taskFailed emit returns error
eventBus.emit.mockResolvedValueOnce(err(emitError));
const result = await manager.recover();
expect(result.ok).toBe(true); // Recovery continues
expect(logger.error).toHaveBeenCalledWith('Failed to emit TaskFailed event', ...);
});
// For dead worker path
it('should log error but continue when TaskFailed emit fails for dead worker', async () => {
// Similar pattern
});Tests review identified this gap: 2 error branches, 0 tests covering them
| }); | ||
|
|
||
| // Emit TaskFailed so DependencyHandler resolves deps for downstream tasks | ||
| const failedEmitResult = await this.eventBus.emit('TaskFailed', { |
There was a problem hiding this comment.
Summary: 3 Inline Comments Posted + Low-Confidence Findings
Comments Created
Inline comments (≥80% reviewer confidence, blocking):
- Duplicated
linesSizefunction (line 13, output-capture.ts) — 8 reviewers flagged totalSizebyte-vs-character inconsistency (line 118, output-capture.ts) — 6 reviewers flagged- Missing TaskFailed error path tests (line 617, recovery-manager.test.ts) — Tests reviewer flagged
Lower-Confidence Findings (60-79% confidence)
These were flagged by 1-2 reviewers only; consolidated into summary rather than individual inline comments:
Should Fix (Medium confidence)
TestOutputCaptureuses differenttotalSizecalculation — Should align withBufferedOutputCaptureusing sharedlinesSizehelpercancelScheduleunbounded execution fetch (schedule-manager.ts:183) — Now defaults to 100 rows instead of 1 (the bug fix), but could add SQL-level filter to avoid fetch-then-filter pattern- RecoveryManager constructor parameter count (6 params) — Approaching threshold; consider configuration object if another param is added
- Complexity:
cleanDeadWorkerRegistrationsnesting depth — ExtractingemitTaskFailedhelper would reduce this - Performance: Sequential N dependency checks in recovery loop — Current scale acceptable (N typically single digits); can batch if scale grows
Pre-existing Issues (Not introduced by this PR)
- Output capture doesn't fail explicitly on error (empty catch at line 84)
worker-handler.ts:445emitsTaskFailedwithnew Error()instead ofBackbeatError- Tail tests lack explicit
expect(output.ok).toBe(true)guards - TOCTOU window in dependency check (mitigated by queue deduplication)
What the PR Does Well
✓ Fixes 4 real correctness bugs (dependency checking, multi-execution cancellation, totalSize recalc, output recovery)
✓ Proper error handling with Result types
✓ DependencyRepository injection via interface (DIP pattern)
✓ Good test coverage for fixed behaviors (unit + integration)
✓ All consumers of RecoveryManager constructor updated
Recommendation
CHANGES REQUESTED on the 3 blocking issues. Once addressed, this PR improves correctness significantly.
Generated by Claude Code review analysis — 8 comprehensive reviews spanning architecture, complexity, consistency, performance, regression, security, tests, and TypeScript
Add 2 tests verifying that TaskFailed emit failures in both recovery paths (dead worker cleanup and crashed running task) are logged but do not prevent recovery from completing. Add ARCHITECTURE EXCEPTION comments documenting the double-write pattern in both code paths. Co-Authored-By: Claude <noreply@anthropic.com>
Validates that RecoveryManager skips re-queuing QUEUED tasks with unresolved dependencies using real SQLiteDependencyRepository. The existing unit test covers this with mocks; this integration test exercises the full SQL path (addDependency -> isBlocked -> recovery). Co-Authored-By: Claude <noreply@anthropic.com>
Extract linesSize from output-capture.ts and task-manager.ts into a shared linesByteSize utility in src/utils/output.ts. The new function uses Buffer.byteLength (UTF-8 bytes) instead of string.length (characters), fixing a unit inconsistency where: - capture() tracked totalSize in bytes via Buffer.byteLength - getOutput() tail-slicing recalculated totalSize in characters - TestOutputCapture always used characters Multi-byte content (emoji, CJK) now returns consistent byte values regardless of whether tail-slicing was applied. Fixes: output-capture:51:correctness, output-capture:213:consistency Resolves: output-capture:13:dry (deliberate duplication superseded by correctness fix requiring shared implementation) Co-Authored-By: Claude <noreply@anthropic.com>
…tasks When dependencyRepo.isBlocked() errors during recovery, fall through to enqueue instead of skipping. The previous skip behavior could permanently strand dependency-free tasks since TaskUnblocked would never fire for them.
| const { queuedCount, blockedCount } = await this.recoverQueuedTasks(queuedResult.value); | ||
| const failedCount = await this.recoverRunningTasks(runningResult.value); |
There was a problem hiding this comment.
QUEUED tasks blocked on RUNNING crashed tasks may be stranded
recoverQueuedTasks (Phase 2) runs before recoverRunningTasks (Phase 3). Consider a QUEUED task B that depends on a RUNNING (crashed) task A that has no dead-worker registration:
- Phase 2 —
isBlocked(B)returnstrue→ B is skipped. ✓ - Phase 3 — Task A is marked
FAILEDandTaskFailedis emitted. - If
DependencyHandleris live it resolves the dependency and emitsTaskUnblocked, re-queuing B via the normal event chain. ✓ - If
DependencyHandleris not yet wired at recovery time, B is never re-queued.
Phase 0 (cleanDeadWorkerRegistrations) avoids this for tasks backed by a dead worker registration because it runs first, and the subsequent findByStatus(RUNNING) no longer includes those tasks. The gap is for RUNNING tasks whose worker row is simply missing (not detected in Phase 0). The architecture comment acknowledges handler-readiness uncertainty, but this specific interplay isn't called out.
A low-cost safety net would be a second pass over still-blocked tasks after Phase 3 completes, re-queuing any whose dependency is now resolved:
// After recoverRunningTasks — re-check tasks that were skipped as blocked
for (const task of blockedTasks) {
const stillBlocked = await this.dependencyRepo.isBlocked(task.id);
if (stillBlocked.ok && !stillBlocked.value) {
this.queue.enqueue(task);
}
}Not blocking merge, but worth tracking.
- Bump version 0.5.0 → 0.6.0 - Update release notes with all 8 PRs (was missing #85, #86, #91, #94, #100, #106, #107) - Mark v0.6.0 as released in ROADMAP.md - Update FEATURES.md architecture section for hybrid event model - Expand "What's New in v0.6.0" with architectural simplification, bug fixes, tech debt - Fix README roadmap: v0.6.1 → v0.7.0 for loops - Update bug report template example version to 0.6.0
## Summary - Bump version `0.5.0` → `0.6.0` (package.json + package-lock.json) - Expand release notes with all 8 PRs (#78, #85, #86, #91, #94, #100, #106, #107) — was only covering #78 - Mark v0.6.0 as released in ROADMAP.md, update status and version timeline - Update FEATURES.md architecture section for hybrid event model (was describing old fully event-driven architecture with removed services) - Expand "What's New in v0.6.0" in FEATURES.md with architectural simplification, additional bug fixes, tech debt, breaking changes, migration 9 - Fix README roadmap version: `v0.6.1` → `v0.7.0` for task/pipeline loops - Update bug report template example version `0.5.0` → `0.6.0` ### GitHub Issues - Closed #82 (cancelTasks scope — PR #106) - Closed #95 (totalSize tail-slicing — PR #106) - Updated #105 release tracker checklist (all items checked) ## Test plan - [x] `npm run build` — clean compilation - [x] `npm run test:all` — full suite passes (822 tests, 0 failures) - [x] `npx biome check src/ tests/` — no lint errors - [x] `package.json` version is `0.6.0` - [x] Release notes file exists and covers all PRs - [ ] After merge: trigger Release workflow from GitHub Actions - [ ] After release published: close #105 --------- Co-authored-by: Dean Sharon <deanshrn@gmain.com>
Summary
Fixes three correctness bugs discovered in v0.6.0 scheduled pipelines:
Changes
RecoveryManager Dependency Check (#84)
CancelSchedule Execution Scope (#82)
Output Tail-Slicing totalSize Fix (#95)
totalSizemetadata after tail-slicing outputTesting
Related Issues
Closes #84, #82, #95