diff --git a/docs/plans/2026-03-06-agentty-reliability-implementation.md b/docs/plans/2026-03-06-agentty-reliability-implementation.md new file mode 100644 index 0000000..860ce71 --- /dev/null +++ b/docs/plans/2026-03-06-agentty-reliability-implementation.md @@ -0,0 +1,67 @@ +# Agentty Reliability Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Fix the highest-priority reliability and CLI contract issues in `agentty` without broad refactoring. + +**Architecture:** Tighten the session startup contract so `start` preserves argv boundaries and only reports success once the PTY is ready. Add cross-process state serialization so concurrent CLI invocations do not lose session records. Validate `attach` targets and make `kill` fail when the session has not actually exited. + +**Tech Stack:** TypeScript, Vitest, node-pty, execa + +--- + +### Task 1: Preserve argv boundaries and delay `start` success until PTY readiness + +**Files:** +- Modify: `src/index.ts` +- Modify: `src/sessionRuntime.ts` +- Modify: `src/worker.ts` +- Modify: `src/ipc.ts` +- Test: `tests/sessionRuntime.start.test.ts` +- Test: `tests/e2e.start-argv.test.ts` + +**Steps:** +1. Write a failing test that proves `start` preserves quoted and empty argv entries. +2. Run the targeted test and confirm it fails for the expected reason. +3. Write a failing test that proves `startSession()` returns the PTY pid rather than the worker pid. +4. Run the targeted test and confirm it fails. +5. Implement the minimal `file + args[]` start contract and a worker readiness handshake. +6. Re-run the targeted tests until green. + +### Task 2: Prevent session state loss across concurrent CLI invocations + +**Files:** +- Modify: `src/state.ts` +- Test: `tests/e2e.concurrent-start.test.ts` + +**Steps:** +1. Write a failing concurrency test that starts multiple sessions in parallel and asserts all session records remain present. +2. Run the targeted test and confirm it fails. +3. Add minimal cross-process state serialization around shared state mutations. +4. Re-run the targeted test until green. + +### Task 3: Tighten `attach` validation and `kill` semantics + +**Files:** +- Modify: `src/resolveSession.ts` +- Modify: `src/sessionRuntime.ts` +- Modify: `tests/attach.test.ts` +- Modify: `tests/kill.test.ts` + +**Steps:** +1. Write failing tests for attaching nonexistent or exited sessions. +2. Run the targeted test and confirm it fails. +3. Write a failing unit test that proves `killSession()` should reject when exit confirmation never arrives. +4. Run the targeted test and confirm it fails. +5. Implement minimal validation for `attach` and make `kill` timeout explicit. +6. Re-run the targeted tests until green. + +### Task 4: Verify the full suite + +**Files:** +- Test: `tests/*.test.ts` + +**Steps:** +1. Run the full test suite in the same environment used for real `agentty` socket access. +2. Confirm exit code `0` and zero failing tests. +3. Review diffs for unintended changes before reporting completion. diff --git a/src/index.ts b/src/index.ts index ee5a400..f68bc85 100644 --- a/src/index.ts +++ b/src/index.ts @@ -16,8 +16,10 @@ interface GetOptionResult extends SessionOptionResult { interface StartOptionResult { command: string; + args?: string[]; cwd: string; name?: string; + displayCommand: string; } export interface CliIo { @@ -125,6 +127,18 @@ function parseGetOptions(args: string[]): GetOptionResult { }; } +function formatCommandForDisplay(commandParts: string[]): string { + return commandParts + .map((part) => { + if (part.length === 0 || /[\s"'\\]/.test(part)) { + return JSON.stringify(part); + } + + return part; + }) + .join(' '); +} + function parseStartOptions(args: string[]): StartOptionResult { let cwd = process.cwd(); let name: string | undefined; @@ -177,9 +191,11 @@ function parseStartOptions(args: string[]): StartOptionResult { } return { - command: commandParts.join(' '), + command: commandParts[0], + args: commandParts.length > 1 ? commandParts.slice(1) : undefined, cwd, name, + displayCommand: formatCommandForDisplay(commandParts), }; } @@ -262,8 +278,8 @@ export async function runCli(argv: string[] = process.argv.slice(2), io: CliIo = } if (command === 'start') { - const { command: startCommand, cwd, name } = parseStartOptions(argv.slice(1)); - const session = await startSession({ command: startCommand, cwd, name }); + const { command: startCommand, args, cwd, name, displayCommand } = parseStartOptions(argv.slice(1)); + const session = await startSession({ command: startCommand, args, cwd, name, displayCommand }); io.stdout(session.id); return; diff --git a/src/resolveSession.ts b/src/resolveSession.ts index 016a478..b4c0190 100644 --- a/src/resolveSession.ts +++ b/src/resolveSession.ts @@ -1,6 +1,15 @@ -import { readActiveSessionId, writeActiveSessionId } from './state'; +import { readActiveSessionId, readSessionById, writeActiveSessionId } from './state'; + +async function ensureRunningSession(sessionId: string): Promise { + const session = await readSessionById(sessionId); + + if (!session || session.status !== 'running') { + throw new Error(`session is not running: ${sessionId}`); + } +} export async function attachSession(id: string): Promise { + await ensureRunningSession(id); await writeActiveSessionId(id); } diff --git a/src/sessionRuntime.ts b/src/sessionRuntime.ts index 952e247..27513ae 100644 --- a/src/sessionRuntime.ts +++ b/src/sessionRuntime.ts @@ -15,8 +15,10 @@ import { export interface StartSessionInput { command: string; + args?: string[]; cwd: string; name?: string; + displayCommand?: string; } export interface SessionMetadata { @@ -37,8 +39,8 @@ const WORKER_ENTRY_PATH = path.resolve(__dirname, '../dist/worker.js'); const LOGS_DIR = 'logs'; const KILL_WAIT_TIMEOUT_MS = 3_000; const KILL_WAIT_INTERVAL_MS = 50; -const SOCKET_READY_TIMEOUT_MS = 1_000; -const SOCKET_READY_POLL_INTERVAL_MS = 50; +const START_READY_TIMEOUT_MS = 10_000; +const START_READY_POLL_INTERVAL_MS = 50; function isUnavailableIpcError(error: unknown): boolean { const code = (error as NodeJS.ErrnoException)?.code; @@ -85,18 +87,20 @@ async function markSessionExited(sessionId: string, exitCode: number | null): Pr } } -async function waitForExited(sessionId: string): Promise { +async function waitForExited(sessionId: string): Promise { const deadline = Date.now() + KILL_WAIT_TIMEOUT_MS; while (Date.now() < deadline) { const session = await readSessionById(sessionId); if (!session || session.status === 'exited') { - return; + return true; } await new Promise((resolve) => setTimeout(resolve, KILL_WAIT_INTERVAL_MS)); } + + return false; } function getWorkerLogPath(sessionId: string): string { @@ -104,7 +108,7 @@ function getWorkerLogPath(sessionId: string): string { } async function waitForSocketReady(socketPath: string, didWorkerExit: () => boolean): Promise { - const deadline = Date.now() + SOCKET_READY_TIMEOUT_MS; + const deadline = Date.now() + START_READY_TIMEOUT_MS; while (Date.now() < deadline) { try { @@ -118,7 +122,7 @@ async function waitForSocketReady(socketPath: string, didWorkerExit: () => boole break; } - await new Promise((resolve) => setTimeout(resolve, SOCKET_READY_POLL_INTERVAL_MS)); + await new Promise((resolve) => setTimeout(resolve, START_READY_POLL_INTERVAL_MS)); } try { @@ -129,7 +133,37 @@ async function waitForSocketReady(socketPath: string, didWorkerExit: () => boole } } -export async function startSession({ command, cwd, name }: StartSessionInput): Promise { +async function waitForSessionReady( + sessionId: string, + workerPid: number, + didWorkerExit: () => boolean, +): Promise { + const deadline = Date.now() + START_READY_TIMEOUT_MS; + + while (Date.now() < deadline) { + const session = await readSessionById(sessionId); + + if ( + session && + session.status === 'running' && + typeof session.pid === 'number' && + typeof session.workerPid === 'number' && + session.pid !== workerPid + ) { + return session as SessionMetadata; + } + + if (didWorkerExit()) { + break; + } + + await new Promise((resolve) => setTimeout(resolve, START_READY_POLL_INTERVAL_MS)); + } + + return null; +} + +export async function startSession({ command, args, cwd, name, displayCommand }: StartSessionInput): Promise { const trimmedCommand = command.trim(); if (!trimmedCommand) { @@ -149,9 +183,11 @@ export async function startSession({ command, cwd, name }: StartSessionInput): P const workerSpec = { id: sessionId, command: trimmedCommand, + ...(args ? { args } : {}), cwd, socketPath, startedAt: now, + ...(displayCommand ? { displayCommand } : {}), ...(name ? { name } : {}), }; @@ -195,7 +231,7 @@ export async function startSession({ command, cwd, name }: StartSessionInput): P id: sessionId, pid: child.pid, workerPid: child.pid, - command: trimmedCommand, + command: displayCommand ?? trimmedCommand, cwd, startedAt: now, lastActiveAt: now, @@ -229,13 +265,29 @@ export async function startSession({ command, cwd, name }: StartSessionInput): P await markSessionExited(sessionId, workerExitCode); throw new Error( - `session worker failed to start (socket was not created within ${SOCKET_READY_TIMEOUT_MS}ms): ${socketPath}. Check worker log: ${logFilePath}`, + `session worker failed to start (socket was not created within ${START_READY_TIMEOUT_MS}ms): ${socketPath}. Check worker log: ${logFilePath}`, + ); + } + + const readySession = await waitForSessionReady(sessionId, child.pid, () => workerExited); + + if (!readySession) { + try { + process.kill(child.pid, 'SIGTERM'); + } catch { + // ignore cleanup errors + } + + await markSessionExited(sessionId, workerExitCode); + + throw new Error( + `session worker failed to become ready within ${START_READY_TIMEOUT_MS}ms: ${socketPath}. Check worker log: ${logFilePath}`, ); } child.unref(); - return session; + return readySession; } export async function sendText(sessionId: string, payload: string): Promise { @@ -301,7 +353,11 @@ export async function killSession(sessionId: string): Promise { await requestIpc(session.socketPath, { method: 'kill', }); - await waitForExited(sessionId); + const exited = await waitForExited(sessionId); + + if (!exited) { + throw new Error(`session did not exit within ${KILL_WAIT_TIMEOUT_MS}ms: ${sessionId}`); + } } catch (error) { if (isUnavailableIpcError(error)) { await markSessionExited(sessionId, null); diff --git a/src/state.ts b/src/state.ts index fc46787..b5c88a6 100644 --- a/src/state.ts +++ b/src/state.ts @@ -1,4 +1,4 @@ -import { mkdir, readFile, rename, rm, writeFile } from 'node:fs/promises'; +import { mkdir, readFile, rename, rm, stat, writeFile } from 'node:fs/promises'; import { createHash } from 'node:crypto'; import os from 'node:os'; import path from 'node:path'; @@ -8,6 +8,10 @@ import type { ActiveSessionId, SessionRecord, SessionRecordList } from './types' const SESSIONS_FILE = 'sessions.json'; const ACTIVE_SESSION_FILE = 'active-session-id'; const SOCKETS_DIR = 'sockets'; +const STATE_LOCK_DIR = '.state-lock'; +const STATE_LOCK_TIMEOUT_MS = 5_000; +const STATE_LOCK_RETRY_MS = 25; +const STATE_LOCK_STALE_MS = 10_000; export function getStateRoot(): string { const overridden = process.env.AGENTTY_HOME?.trim(); @@ -27,6 +31,10 @@ function getActiveSessionPath(): string { return path.join(getStateRoot(), ACTIVE_SESSION_FILE); } +function getStateLockPath(): string { + return path.join(getStateRoot(), STATE_LOCK_DIR); +} + export function getSocketsRoot(): string { if (process.platform === 'darwin') { const uid = typeof process.getuid === 'function' ? String(process.getuid()) : 'unknown'; @@ -68,6 +76,65 @@ function validateSessions(value: unknown): SessionRecordList { return value as SessionRecordList; } +async function acquireStateLock(): Promise { + await ensureStateRoot(); + const lockPath = getStateLockPath(); + const deadline = Date.now() + STATE_LOCK_TIMEOUT_MS; + + while (Date.now() < deadline) { + try { + await mkdir(lockPath); + return; + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + + if (code !== 'EEXIST') { + throw error; + } + + try { + const lockStat = await stat(lockPath); + + if (Date.now() - lockStat.mtimeMs > STATE_LOCK_STALE_MS) { + await rm(lockPath, { recursive: true, force: true }); + continue; + } + } catch { + // ignore stale check failures and retry + } + + await new Promise((resolve) => setTimeout(resolve, STATE_LOCK_RETRY_MS)); + } + } + + throw new Error(`Timed out waiting for state lock: ${lockPath}`); +} + +async function releaseStateLock(): Promise { + await rm(getStateLockPath(), { recursive: true, force: true }); +} + +export async function withStateLock(operation: () => Promise): Promise { + await acquireStateLock(); + + try { + return await operation(); + } finally { + await releaseStateLock(); + } +} + +async function writeSessionsUnlocked(sessions: SessionRecordList): Promise { + await ensureStateRoot(); + + const sessionsPath = getSessionsPath(); + const tempPath = `${sessionsPath}.${process.pid}.${Date.now()}.tmp`; + const serialized = JSON.stringify(sessions, null, 2); + + await writeFile(tempPath, serialized, 'utf8'); + await rename(tempPath, sessionsPath); +} + export async function readSessions(): Promise { try { const raw = await readFile(getSessionsPath(), 'utf8'); @@ -94,14 +161,9 @@ export async function readSessions(): Promise { } export async function writeSessions(sessions: SessionRecordList): Promise { - await ensureStateRoot(); - - const sessionsPath = getSessionsPath(); - const tempPath = `${sessionsPath}.${process.pid}.${Date.now()}.tmp`; - const serialized = JSON.stringify(sessions, null, 2); - - await writeFile(tempPath, serialized, 'utf8'); - await rename(tempPath, sessionsPath); + await withStateLock(async () => { + await writeSessionsUnlocked(sessions); + }); } export async function readSessionById(sessionId: string): Promise { @@ -110,19 +172,21 @@ export async function readSessionById(sessionId: string): Promise { - const sessions = await readSessions(); - const index = sessions.findIndex((session) => session.id === sessionRecord.id); - - if (index === -1) { - sessions.push(sessionRecord); - } else { - sessions[index] = { - ...sessions[index], - ...sessionRecord, - }; - } + await withStateLock(async () => { + const sessions = await readSessions(); + const index = sessions.findIndex((session) => session.id === sessionRecord.id); + + if (index === -1) { + sessions.push(sessionRecord); + } else { + sessions[index] = { + ...sessions[index], + ...sessionRecord, + }; + } - await writeSessions(sessions); + await writeSessionsUnlocked(sessions); + }); } export async function readActiveSessionId(): Promise { @@ -141,12 +205,14 @@ export async function readActiveSessionId(): Promise { } export async function writeActiveSessionId(sessionId: ActiveSessionId): Promise { - await ensureStateRoot(); + await withStateLock(async () => { + await ensureStateRoot(); - if (!sessionId) { - await rm(getActiveSessionPath(), { force: true }); - return; - } + if (!sessionId) { + await rm(getActiveSessionPath(), { force: true }); + return; + } - await writeFile(getActiveSessionPath(), sessionId, 'utf8'); + await writeFile(getActiveSessionPath(), sessionId, 'utf8'); + }); } diff --git a/src/worker.ts b/src/worker.ts index 746c1ee..379692f 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -12,10 +12,12 @@ import type { SessionRecord } from './types'; interface WorkerSpec { id: string; command: string; + args?: string[]; cwd: string; name?: string; socketPath: string; startedAt: string; + displayCommand?: string; } interface IpcResponse { @@ -62,9 +64,11 @@ function parseWorkerSpec(): WorkerSpec { return { id: spec.id, command: spec.command, + ...(Array.isArray(spec.args) ? { args: spec.args } : {}), cwd: spec.cwd, socketPath: spec.socketPath, startedAt: spec.startedAt, + ...(spec.displayCommand ? { displayCommand: spec.displayCommand } : {}), ...(spec.name ? { name: spec.name } : {}), }; } @@ -72,7 +76,7 @@ function parseWorkerSpec(): WorkerSpec { function toSessionRecord(spec: WorkerSpec, patch: Partial): SessionRecord { return { id: spec.id, - command: spec.command, + command: spec.displayCommand ?? spec.command, cwd: spec.cwd, socketPath: spec.socketPath, workerPid: process.pid, @@ -342,13 +346,21 @@ async function main(): Promise { await ensureNodePtySpawnHelperExecutable(); - ptyProcess = spawn(shell, ['-lc', spec.command], { - cwd: spec.cwd, - env: process.env, - name: 'xterm-256color', - cols: 80, - rows: 24, - }); + ptyProcess = Array.isArray(spec.args) + ? spawn(spec.command, spec.args, { + cwd: spec.cwd, + env: process.env, + name: 'xterm-256color', + cols: 80, + rows: 24, + }) + : spawn(shell, ['-lc', spec.command], { + cwd: spec.cwd, + env: process.env, + name: 'xterm-256color', + cols: 80, + rows: 24, + }); ptyProcess.onData?.((chunk) => { appendOutput(chunk); diff --git a/tests/attach.test.ts b/tests/attach.test.ts index 750d263..f15fa71 100644 --- a/tests/attach.test.ts +++ b/tests/attach.test.ts @@ -5,7 +5,7 @@ import path from 'node:path'; import { execa } from 'execa'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import { readActiveSessionId } from '../src/state'; +import { readActiveSessionId, upsertSession, writeActiveSessionId } from '../src/state'; import { attachSession, resolveTargetSessionId } from '../src/resolveSession'; describe('attach and session resolution', () => { @@ -29,19 +29,25 @@ describe('attach and session resolution', () => { }); it('attachSession sets active session pointer', async () => { + await upsertSession({ + id: 'session-1', + status: 'running', + socketPath: '/tmp/session-1.sock', + }); + await attachSession('session-1'); expect(await readActiveSessionId()).toBe('session-1'); }); it('resolveTargetSessionId returns explicit id when provided', async () => { - await attachSession('active-session'); + await writeActiveSessionId('active-session'); await expect(resolveTargetSessionId('explicit-session')).resolves.toBe('explicit-session'); }); it('resolveTargetSessionId falls back to active session id', async () => { - await attachSession('active-session'); + await writeActiveSessionId('active-session'); await expect(resolveTargetSessionId()).resolves.toBe('active-session'); }); @@ -51,6 +57,12 @@ describe('attach and session resolution', () => { }); it('agentty attach writes pointer and prints attached id', async () => { + await upsertSession({ + id: 'session-cli', + status: 'running', + socketPath: '/tmp/session-cli.sock', + }); + const { stdout } = await execa('node', ['dist/index.js', 'attach', 'session-cli'], { env: { ...process.env, @@ -61,4 +73,31 @@ describe('attach and session resolution', () => { expect(stdout).toBe('session-cli'); expect(await readActiveSessionId()).toBe('session-cli'); }); + + it('attachSession rejects nonexistent sessions', async () => { + await expect(attachSession('missing-session')).rejects.toThrow( + 'session is not running: missing-session', + ); + await expect(readActiveSessionId()).resolves.toBeNull(); + }); + + it('agentty attach rejects exited sessions', async () => { + await upsertSession({ + id: 'exited-session', + status: 'exited', + socketPath: '/tmp/exited-session.sock', + }); + + const result = await execa('node', ['dist/index.js', 'attach', 'exited-session'], { + env: { + ...process.env, + AGENTTY_HOME: tempHome, + }, + reject: false, + }); + + expect(result.exitCode).toBe(1); + expect(result.stderr).toContain('session is not running: exited-session'); + await expect(readActiveSessionId()).resolves.toBeNull(); + }); }); diff --git a/tests/e2e.start-argv.test.ts b/tests/e2e.start-argv.test.ts new file mode 100644 index 0000000..6b1edb4 --- /dev/null +++ b/tests/e2e.start-argv.test.ts @@ -0,0 +1,108 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import os from 'node:os'; +import path from 'node:path'; + +import { execa } from 'execa'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +interface CommandResult { + exitCode: number; + stdout: string; + stderr: string; +} + +async function sleep(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe('e2e: start argv preservation', () => { + let tempHome: string; + let originalEnv: string | undefined; + + beforeEach(async () => { + originalEnv = process.env.AGENTTY_HOME; + tempHome = await mkdtemp(path.join(os.tmpdir(), 'agentty-e2e-start-argv-')); + process.env.AGENTTY_HOME = tempHome; + }); + + afterEach(async () => { + if (originalEnv === undefined) { + delete process.env.AGENTTY_HOME; + } else { + process.env.AGENTTY_HOME = originalEnv; + } + + await rm(tempHome, { recursive: true, force: true }); + }); + + async function runCommand(args: string[]): Promise { + const result = await execa('node', ['dist/index.js', ...args], { + env: { + ...process.env, + AGENTTY_HOME: tempHome, + }, + reject: false, + timeout: 10_000, + }); + + return { + exitCode: result.exitCode, + stdout: result.stdout.trim(), + stderr: result.stderr.trim(), + }; + } + + async function waitForSnapshotContains(sessionId: string, needle: string): Promise { + const timeoutMs = 8_000; + const intervalMs = 50; + const startedAt = Date.now(); + + while (Date.now() - startedAt < timeoutMs) { + const result = await runCommand(['get', '--session', sessionId, '--lines', '200']); + + if (result.exitCode === 0 && result.stdout.includes(needle)) { + return result.stdout; + } + + await sleep(intervalMs); + } + + throw new Error(`Timed out waiting for snapshot to contain ${needle}`); + } + + it('preserves quoted and empty argv entries passed to start', async () => { + const printedArgs = JSON.stringify(['hello world', '', 'literal"quote']); + const script = [ + 'console.log(JSON.stringify(process.argv.slice(1)));', + 'setTimeout(() => {}, 30000);', + ].join(' '); + + let sessionId: string | undefined; + + try { + const startResult = await runCommand([ + 'start', + '--name', + 'argv-test', + '--', + 'node', + '-e', + script, + 'hello world', + '', + 'literal"quote', + ]); + + expect(startResult.exitCode).toBe(0); + sessionId = startResult.stdout; + expect(sessionId).toBeTruthy(); + + const snapshot = await waitForSnapshotContains(sessionId, printedArgs); + expect(snapshot).toContain(printedArgs); + } finally { + if (sessionId) { + await runCommand(['kill', '--session', sessionId]); + } + } + }); +}); diff --git a/tests/e2e.vim.test.ts b/tests/e2e.vim.test.ts index 6000a51..8dcfec7 100644 --- a/tests/e2e.vim.test.ts +++ b/tests/e2e.vim.test.ts @@ -100,17 +100,9 @@ describe('e2e: vim (cli process invocations)', () => { try { const startResult = await runCommand([ 'start', - 'command', - '-v', - 'vim', - '>/dev/null', - '2>&1', - '&&', - 'exec', - 'vim', - '||', - 'exec', - 'vi', + 'sh', + '-lc', + 'command -v vim >/dev/null 2>&1 && exec vim || exec vi', ]); expect(startResult.exitCode).toBe(0); diff --git a/tests/sessionRuntime.kill-timeout.test.ts b/tests/sessionRuntime.kill-timeout.test.ts new file mode 100644 index 0000000..3ca0bb2 --- /dev/null +++ b/tests/sessionRuntime.kill-timeout.test.ts @@ -0,0 +1,43 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +describe('sessionRuntime.killSession timeout handling', () => { + beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + vi.useRealTimers(); + }); + + it('rejects when the session does not exit before the timeout', async () => { + vi.useFakeTimers(); + + const writeActiveSessionIdMock = vi.fn(); + + vi.doMock('../src/ipc', () => ({ + requestIpc: vi.fn(async () => true), + })); + + vi.doMock('../src/state', () => ({ + readSessionById: vi.fn(async (sessionId: string) => ({ + id: sessionId, + status: 'running', + socketPath: '/tmp/mock.sock', + })), + readActiveSessionId: vi.fn(async () => 'session-timeout'), + writeActiveSessionId: writeActiveSessionIdMock, + upsertSession: vi.fn(), + getSessionSocketPath: vi.fn(), + getStateRoot: vi.fn(), + })); + + const { killSession } = await import('../src/sessionRuntime'); + + const killPromise = killSession('session-timeout'); + const capturedError = killPromise.catch((error) => error); + await vi.advanceTimersByTimeAsync(3_500); + + await expect(capturedError).resolves.toMatchObject({ + message: 'session did not exit within 3000ms: session-timeout', + }); + expect(writeActiveSessionIdMock).not.toHaveBeenCalled(); + }); +}); diff --git a/tests/sessionRuntime.start.test.ts b/tests/sessionRuntime.start.test.ts index 4eb80ff..85c834a 100644 --- a/tests/sessionRuntime.start.test.ts +++ b/tests/sessionRuntime.start.test.ts @@ -40,6 +40,8 @@ describe('sessionRuntime.startSession', () => { expect(session.id).toEqual(expect.any(String)); expect(session.pid).toEqual(expect.any(Number)); + expect(session.workerPid).toEqual(expect.any(Number)); + expect(session.pid).not.toBe(session.workerPid); expect(session.command).toBe('sleep 30'); expect(session.cwd).toBe(process.cwd()); expect(session.status).toBe('running'); @@ -55,6 +57,8 @@ describe('sessionRuntime.startSession', () => { cwd: process.cwd(), status: 'running', exitCode: null, + pid: session.pid, + workerPid: session.workerPid, }); expect(typeof (sessions[0] as { pid?: unknown }).pid).toBe('number'); } finally { diff --git a/tests/state.concurrent.test.ts b/tests/state.concurrent.test.ts new file mode 100644 index 0000000..79e46fa --- /dev/null +++ b/tests/state.concurrent.test.ts @@ -0,0 +1,52 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import os from 'node:os'; +import path from 'node:path'; + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { readSessions, upsertSession } from '../src/state'; + +describe('state concurrency', () => { + let tempHome: string; + let originalEnv: string | undefined; + + beforeEach(async () => { + originalEnv = process.env.AGENTTY_HOME; + tempHome = await mkdtemp(path.join(os.tmpdir(), 'agentty-state-concurrent-')); + process.env.AGENTTY_HOME = tempHome; + }); + + afterEach(async () => { + if (originalEnv === undefined) { + delete process.env.AGENTTY_HOME; + } else { + process.env.AGENTTY_HOME = originalEnv; + } + + await rm(tempHome, { recursive: true, force: true }); + }); + + it('preserves every session record across concurrent upserts', async () => { + const sessionCount = 24; + + await Promise.all( + Array.from({ length: sessionCount }, (_, index) => + upsertSession({ + id: `session-${index}`, + status: 'running', + socketPath: `/tmp/session-${index}.sock`, + }), + ), + ); + + const sessions = await readSessions(); + const sessionIds = sessions + .map((session) => session.id) + .filter((id): id is string => typeof id === 'string') + .sort(); + + expect(sessionIds).toEqual( + Array.from({ length: sessionCount }, (_, index) => `session-${index}`).sort(), + ); + }); +});