feat: SQLite worker coordination + output persistence (#89)#94
Conversation
Add cross-process worker coordination via a `workers` table and wire output persistence through the existing SQLiteOutputRepository. Core changes: - WorkerRegistration type + WorkerRepository interface (DIP pattern) - Migration v9: workers table with FK to tasks, indexes on owner_pid/pid - SQLiteWorkerRepository: plain INSERT (not REPLACE) for UNIQUE safety - WorkerPool: register on spawn, unregister on kill/completion - ResourceMonitor: DB-based global count for max workers check (settling workers still used for resource projections only) - RecoveryManager: PID-based crash detection replaces 30-min heuristic - ProcessConnector: periodic 500ms flush + final flush on exit + clear - TaskManager.getLogs(): in-memory → DB fallback for cross-process reads - Bootstrap: wire WorkerRepository + OutputRepository as required deps Edge cases addressed: - Settling workers double-counting (split max-workers vs projections) - Shutdown race (stopFlushing before kill prevents post-close DB writes) - UNIQUE constraint on register detects cross-process task conflicts - In-memory buffer freed after final flush (prevents memory leak)
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant WH as WorkerHandler
participant RM as ResourceMonitor
participant WR as WorkerRepository (SQLite)
participant PC as ProcessConnector
participant OR as OutputRepository (SQLite)
WH->>RM: canSpawnWorker()
RM->>WR: getGlobalCount()
WR-->>RM: count (cross-process truth)
RM-->>WH: ok(true)
WH->>WH: spawn process (async)
WH->>WR: register(WorkerRegistration)
Note over WH,WR: UNIQUE(task_id) — fails if another process owns task
WH->>PC: connect(process, taskId, onExit)
PC->>PC: setInterval(flushIntervalMs)
loop every flushIntervalMs
PC->>OR: save(taskId, buffer snapshot)
end
alt Process exits normally
PC->>PC: stopFlushing(taskId)
PC->>OR: flushOutput(taskId) [final flush]
PC->>PC: outputCapture.clear(taskId) [.finally]
PC->>WH: onExit(code)
WH->>WR: unregister(workerId)
else kill() called
WH->>PC: prepareForKill(taskId)
PC->>PC: stopFlushing(taskId)
PC->>OR: flushOutput(taskId)
WH->>WH: SIGTERM → process
WH->>WR: unregister(workerId) [via cleanupWorkerState]
end
Note over WH,WR: On next startup — RecoveryManager
WH->>WR: findAll()
loop each WorkerRegistration
WH->>WH: isProcessAlive(ownerPid)?
alt dead PID
WH->>WR: unregister(workerId)
WH->>WH: repository.update(taskId, FAILED)
end
end
Last reviewed commit: "style: fix Biome for..." |
| if (tail && tail > 0) { | ||
| return ok({ | ||
| taskId: output.taskId, | ||
| stdout: output.stdout.slice(-tail), | ||
| stderr: output.stderr.slice(-tail), | ||
| totalSize: output.totalSize, | ||
| }); | ||
| } |
There was a problem hiding this comment.
totalSize is not updated after tail-slicing the DB result
When the tail parameter is applied to the DB output, stdout and stderr are sliced to tail entries, but totalSize still reflects the full persisted output. Callers that use totalSize to understand the volume of the returned data (e.g. for display or deciding whether to request more) will see an inflated count relative to what was actually returned.
return ok({
taskId: output.taskId,
stdout: output.stdout.slice(-tail),
stderr: output.stderr.slice(-tail),
// totalSize should reflect the sliced content, not the full persisted output
totalSize: output.stdout.slice(-tail).reduce((sum, l) => sum + l.length, 0)
+ output.stderr.slice(-tail).reduce((sum, l) => sum + l.length, 0),
});(Or simply keep totalSize as a "full size" sentinel by documenting that behaviour explicitly — but the current code is silent about the discrepancy.)
- Remove unused err/ok imports from worker-repository.ts - Add error checking for unregister/update in recovery Phase 0 - Extract cleanupWorkerState() to DRY kill/completion paths - Extract pruneExpiredTimestamps() to DRY settling cleanup
|
|
||
| /** | ||
| * Database row type for workers table | ||
| */ |
There was a problem hiding this comment.
Pattern Violation: Missing Zod Row Validation Schema
Every other SQLite repository uses Zod to validate database rows at the system boundary (SQLiteTaskRepository has TaskRowSchema, SQLiteDependencyRepository has DependencyRowSchema, etc.). This repository uses unchecked as casts instead.
Impact: Data corruption or unexpected DB values will produce silent type mismatches rather than early parse errors. This breaks the "parse, don't validate" convention.
Fix: Add a WorkerRowSchema at the top of the file:
import { z } from "zod";
const WorkerRowSchema = z.object({
worker_id: z.string().min(1),
task_id: z.string().min(1),
pid: z.number(),
owner_pid: z.number(),
agent: z.string(),
started_at: z.number(),
});Then validate in rowToRegistration():
private rowToRegistration(row: WorkerRow): WorkerRegistration {
const data = WorkerRowSchema.parse(row);
return {
workerId: WorkerId(data.worker_id),
taskId: TaskId(data.task_id),
// ...
};
}Confidence: 95% (Critical pattern deviation from all 4 other repositories)
| * Uses plain INSERT (NOT INSERT OR REPLACE) — UNIQUE violation on task_id | ||
| * means another process already owns this task, which is a real coordination error. | ||
| */ | ||
| register(registration: WorkerRegistration): Result<void> { |
There was a problem hiding this comment.
Pattern Violation: Missing operationErrorHandler
All other repositories use operationErrorHandler() from core/errors.js as the centralized error mapping function. This repository manually constructs BackbeatError inline in every method, which is more verbose and inconsistent.
Fix: Import and use operationErrorHandler:
import { operationErrorHandler } from '../core/errors.js';
// Example for register():
register(registration: WorkerRegistration): Result<void> {
return tryCatch(
() => { this.registerStmt.run({...}); },
operationErrorHandler('register worker', { workerId: registration.workerId }),
);
}Note: The register() method has special UNIQUE constraint detection logic which may justify keeping a custom error mapper for that one method. But the other 6 methods should use operationErrorHandler.
Confidence: 90% (Critical pattern deviation from all 4 other repositories)
| } | ||
|
|
||
| // Start periodic output flushing to DB (every 500ms) | ||
| const interval = setInterval(() => { |
There was a problem hiding this comment.
Hardcoded 500ms Flush Interval Not Configurable
The periodic output flush interval is hardcoded as 500 milliseconds with no Configuration override. With N concurrent workers, this generates N * (M / 0.5) DB write operations. For 5 workers running 10 minutes each, that is 6,000 save operations.
Impact: Heavy SQLite write amplification under load. Each save() does a full snapshot write, not an incremental append, which is wasteful when output hasn't changed.
Fix: Make the interval configurable via Configuration:
constructor(
outputCapture: OutputCapture,
logger: Logger,
outputRepository: OutputRepository,
configuration?: Configuration, // Add this
) {
// ... existing code ...
const flushIntervalMs = configuration?.get('output.flushInterval') ?? 5000;
// Then use in the interval callback
const interval = setInterval(() => {
this.flushOutput(taskId).catch(/* ... */);
}, flushIntervalMs);
}Consider increasing default to 5-10 seconds (10x fewer writes) and adding a dirty flag to skip flushes when output hasn't changed.
Confidence: 95% (Blocking per performance and architecture reviews)
src/services/recovery-manager.ts
Outdated
| try { | ||
| process.kill(pid, 0); | ||
| return true; | ||
| } catch { |
There was a problem hiding this comment.
recover() Method at 153 Lines with 4 Nesting Levels (3x Over Complexity Threshold)
This method handles three distinct recovery phases (dead worker cleanup, QUEUED task re-queue, RUNNING task PID check) in a single 153-line function with 4 levels of nesting. This exceeds the 50-line critical threshold by 3x and makes the method hard to test and understand.
Fix: Extract each phase into a named private method to preserve sequential ordering while making each phase independently readable and testable:
async recover(): Promise<Result<void>> {
this.logger.info('Starting recovery process');
this.cleanDeadWorkerRegistrations(); // Phase 0
await this.cleanupOldCompletedTasks(); // Cleanup
const queuedResult = await this.requeueQueuedTasks(); // Phase 1
const failedResult = await this.failCrashedRunningTasks(); // Phase 2
// ... summary log
return ok(undefined);
}
private cleanDeadWorkerRegistrations(): void { /* Phase 0 logic */ }
private async cleanupOldCompletedTasks(): Promise<void> { /* cleanup */ }
private async requeueQueuedTasks(): Promise<{ count: number }> { /* Phase 1 */ }
private async failCrashedRunningTasks(): Promise<{ count: number }> { /* Phase 2 */ }Confidence: 98% (Blocking per complexity review - metric violation)
| this.processConnector = new ProcessConnector(outputCapture, logger, outputRepository); | ||
| } | ||
|
|
||
| async spawn(task: Task): Promise<Result<Worker>> { |
There was a problem hiding this comment.
spawn() Method at 99 Lines with 8 Error Paths (Complexity Over Threshold)
The spawn() method now has 99 lines and handles agent resolution, resource checking, process spawning, DB registration with rollback, timeout setup, and output connection all in one method. Each cross-cutting concern adds another error path and cleanup responsibility.
Fix: Extract the DB registration + rollback into a helper, or group the post-spawn setup into a finalizeWorkerSetup method:
private finalizeWorkerSetup(
worker: WorkerState,
task: Task,
childProcess: ChildProcess,
): Result<void> {
const regResult = this.registerWorkerInDb(worker, task);
if (!regResult.ok) {
childProcess.kill('SIGTERM');
this.workers.delete(worker.id);
this.taskToWorker.delete(task.id);
return err(regResult.error);
}
this.setupTimeoutForWorker(worker);
this.processConnector.connect(childProcess, task.id, (exitCode) => {
this.handleWorkerCompletion(task.id, exitCode ?? 0);
});
return ok(undefined);
}Confidence: 95% (Blocking per complexity review)
package.json
Outdated
| "test:repositories": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/implementations/dependency-repository.test.ts tests/unit/implementations/task-repository.test.ts tests/unit/implementations/database.test.ts tests/unit/implementations/checkpoint-repository.test.ts tests/unit/implementations/output-repository.test.ts --no-file-parallelism", | ||
| "test:repositories": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/implementations/dependency-repository.test.ts tests/unit/implementations/task-repository.test.ts tests/unit/implementations/database.test.ts tests/unit/implementations/checkpoint-repository.test.ts tests/unit/implementations/output-repository.test.ts tests/unit/implementations/worker-repository.test.ts --no-file-parallelism", | ||
| "test:adapters": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/adapters --no-file-parallelism", | ||
| "test:implementations": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/implementations --exclude='**/dependency-repository.test.ts' --exclude='**/task-repository.test.ts' --exclude='**/database.test.ts' --exclude='**/checkpoint-repository.test.ts' --exclude='**/output-repository.test.ts' --no-file-parallelism", |
There was a problem hiding this comment.
Missing Test Script Exclusion: worker-repository.test.ts
The new worker-repository.test.ts was added to the test:repositories script but NOT excluded from test:implementations. When running test:all, these tests execute twice, wasting time and memory.
Fix: Add the exclusion to the test:implementations script:
"test:implementations": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/implementations --exclude='**/dependency-repository.test.ts' --exclude='**/task-repository.test.ts' --exclude='**/database.test.ts' --exclude='**/checkpoint-repository.test.ts' --exclude='**/output-repository.test.ts' --exclude='**/worker-repository.test.ts' --no-file-parallelism",Confidence: 95% (Test setup regression - impacts CI/memory)
Code Review Summary: PR #94 - SQLite Worker CoordinationHigh-Confidence Blocking Issues (≥90%)Inline comments have been created for 6 blocking issues:
Medium/Lower-Confidence Findings (60-79%)These are flagged in the summary but not as blocking inline comments: Consolidation Strategy (Duplicate Issues Across Reviewers):
Analysis by Review Discipline
Reviewer ConsensusAll 11 reviewers flagged:
Recommended Merge PathDo not merge without addressing:
Can defer to follow-up:
Inline comments: 6 created Attribution: Claude Code review agent, compiled from architecture, complexity, consistency, database, dependencies, documentation, performance, regression, security, tests, and typescript reviews. |
Inline the hasLiveWorker boolean check into the if-condition so TypeScript can narrow workerRegistration to non-null within the block, eliminating the need for the ! assertion operator. Co-Authored-By: Claude <noreply@anthropic.com>
…r consistency Align SQLiteWorkerRepository with the pattern used by all 4 other repositories (Task, Dependency, Schedule, Checkpoint): - Add WorkerRowSchema (Zod) for system-boundary row validation in rowToRegistration(), replacing unchecked 'as' cast - Replace 6 inline BackbeatError constructions with operationErrorHandler() - Preserve register()'s custom error handler for UNIQUE constraint detection Co-Authored-By: Claude <noreply@anthropic.com>
…te recovery docs - Add --exclude='**/worker-repository.test.ts' to test:implementations script to prevent duplicate test execution in test:all - Rewrite EVENT_FLOW.md Recovery Flow to document PID-based crash detection (replaces outdated 30-minute staleness heuristic) - Update Stale Task Detection safeguard section to PID-Based Crash Detection with two-phase recovery description - Remove stale detection from Future Improvements (already replaced) - CLAUDE.md: add worker-repository.ts to File Locations table - CLAUDE.md: document workers table (migration v9) in Database section Co-Authored-By: Claude <noreply@anthropic.com>
…uard - Add outputFlushIntervalMs to Configuration (default 5000ms, was hardcoded 500ms) Reduces DB write load 10x while maintaining cross-process output visibility. Configurable via env var OUTPUT_FLUSH_INTERVAL_MS or config file. - Add backpressure guard to prevent overlapping flush writes. Uses a flushingInProgress Set to skip intervals when a previous flush is still in-flight. Cleaned up in stopFlushing() to prevent leaks. - Wire config value through bootstrap -> EventDrivenWorkerPool -> ProcessConnector. Co-Authored-By: Claude <noreply@anthropic.com>
Remove duplicated createMockWorkerRepository() and createMockOutputRepository() factory functions from individual test files. All tests now import from the shared tests/fixtures/mocks.ts, eliminating copy-paste duplication and standardizing naming (createMockOutputRepo -> createMockOutputRepository). Co-Authored-By: Claude <noreply@anthropic.com>
Extract 4 private methods from the 153-line recover() orchestrator: - cleanDeadWorkerRegistrations(): Phase 0 PID-based crash detection - cleanupOldCompletedTasks(): Phase 1 old task cleanup - recoverQueuedTasks(): Phase 2 re-queue with duplicate check - recoverRunningTasks(): Phase 3 PID-based RUNNING task recovery Pure extract-method refactor — zero behavior change, all 24 recovery tests pass unmodified. Closes tech debt item #5 from PR #94 review.
…l status overwrite Two P1 data-correctness fixes in recovery: 1. EPERM handling: process.kill(pid, 0) throws EPERM when a process exists but the caller lacks signal permission. The catch-all previously returned false, treating live processes as dead. Now EPERM returns true. 2. Terminal state guard: Both Phase 0 (cleanDeadWorkerRegistrations) and Phase 3 (recoverRunningTasks) called repo.update(FAILED) without checking if the task had already reached a terminal state. A TOCTOU race could overwrite COMPLETED/CANCELLED with FAILED. Now both phases call findById + isTerminalState before updating.
outputCapture.clear(taskId) was chained in .then() after flushOutput(). If flush rejected, .then was skipped and the buffer leaked. Moved clear() into .finally() so memory is freed regardless of flush outcome.
- Bump version 0.5.0 → 0.6.0 - Update release notes with all 8 PRs (was missing #85, #86, #91, #94, #100, #106, #107) - Mark v0.6.0 as released in ROADMAP.md - Update FEATURES.md architecture section for hybrid event model - Expand "What's New in v0.6.0" with architectural simplification, bug fixes, tech debt - Fix README roadmap: v0.6.1 → v0.7.0 for loops - Update bug report template example version to 0.6.0
## Summary - Bump version `0.5.0` → `0.6.0` (package.json + package-lock.json) - Expand release notes with all 8 PRs (#78, #85, #86, #91, #94, #100, #106, #107) — was only covering #78 - Mark v0.6.0 as released in ROADMAP.md, update status and version timeline - Update FEATURES.md architecture section for hybrid event model (was describing old fully event-driven architecture with removed services) - Expand "What's New in v0.6.0" in FEATURES.md with architectural simplification, additional bug fixes, tech debt, breaking changes, migration 9 - Fix README roadmap version: `v0.6.1` → `v0.7.0` for task/pipeline loops - Update bug report template example version `0.5.0` → `0.6.0` ### GitHub Issues - Closed #82 (cancelTasks scope — PR #106) - Closed #95 (totalSize tail-slicing — PR #106) - Updated #105 release tracker checklist (all items checked) ## Test plan - [x] `npm run build` — clean compilation - [x] `npm run test:all` — full suite passes (822 tests, 0 failures) - [x] `npx biome check src/ tests/` — no lint errors - [x] `package.json` version is `0.6.0` - [x] Release notes file exists and covers all PRs - [ ] After merge: trigger Release workflow from GitHub Actions - [ ] After release published: close #105 --------- Co-authored-by: Dean Sharon <deanshrn@gmain.com>
Summary
workerstable (migration v9) for cross-process worker coordination — tracks which workers exist across all processes sharing the same SQLite DBSQLiteOutputRepositoryinto the live capture path via periodic 500ms flushes fromProcessConnector, enabling cross-process output visibility throughTaskManager.getLogs()RecoveryManagerwith definitive PID-based crash detection (process.kill(pid, 0))ResourceMonitor.canSpawnWorker()to prevent cross-process over-spawning (settling workers still used for resource projections only)Key design decisions
task_idis a real coordination error (another process owns this task), not something to silently overwritespawnLockmutex), then INSERT; can't hold sync transaction across async spawnOutputRepository.append()does O(n) read-modify-write per call; periodicsave()withINSERT OR REPLACEis O(1)WorkerRepositoryandOutputRepositoryare required in all constructorsEdge cases addressed
Files changed (23 files, +1826 / -261)
domain.ts,interfaces.tsworker-repository.tsdatabase.ts(v9)event-driven-worker-pool.tsresource-monitor.tsrecovery-manager.tsprocess-connector.ts,task-manager.tsbootstrap.tsTest plan
npm run build— clean compilationnpx biome check src/ tests/— no lint issuesnpm run test:repositories— WorkerRepository tests pass (12 tests)npm run test:implementations— WorkerPool tests passnpm run test:services— RecoveryManager, ProcessConnector, TaskManager tests passnpm run test:integration— Cross-process coordination tests passnpm run test:core— No regressions