Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
"type": "module",
"private": true,
"scripts": {
"build": "bun build --compile --outfile loop src/loop.ts",
"build": "bun build --compile --outfile loop src/cli.ts",
"install:global": "bun run build && bun run src/install.ts",
"release:patch": "npm version patch && git push --follow-tags",
"check": "ultracite check",
"fix": "ultracite fix"
},
"bin": {
"loop": "./src/loop.ts"
"loop": "./src/cli.ts"
},
"devDependencies": {
"@biomejs/biome": "^2.4.0",
Expand Down
File renamed without changes.
8 changes: 8 additions & 0 deletions src/loop/args.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ const applyValueFlag = (
opts.model = trimmed;
return;
}
if (flag === "session") {
const trimmed = value.trim();
if (!trimmed) {
throw new Error("Invalid --session value: cannot be empty");
}
opts.sessionId = trimmed;
return;
}
opts.format = parseFormat(value);
};

Expand Down
173 changes: 161 additions & 12 deletions src/loop/claude-sdk-server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import { type Server, type ServerWebSocket, serve, spawn } from "bun";
import {
type Server,
type ServerWebSocket,
serve,
spawn,
spawnSync,
} from "bun";
import { DEFAULT_CLAUDE_MODEL } from "./constants";
import { findFreePort } from "./ports";
import { DETACH_CHILD_PROCESS, killChildProcess } from "./process";
import type { Options, RunResult } from "./types";

type ExitSignal = "SIGINT" | "SIGTERM";
type Callback = (text: string) => void;
type ServeFn = (...args: Parameters<typeof serve>) => ReturnType<typeof serve>;
type SpawnFn = (...args: Parameters<typeof spawn>) => ReturnType<typeof spawn>;
type WSRole = "claude" | "frontend";
interface WSData {
Expand Down Expand Up @@ -40,7 +47,9 @@ interface NdjsonMessage {
}

interface TurnState {
backgroundTaskSeen: boolean;
combined: string;
drainingBackground: boolean;
hasStreamed: boolean;
onDelta: Callback;
onParsed: Callback;
Expand All @@ -52,8 +61,53 @@ interface TurnState {

const CLAUDE_SDK_BASE_PORT = 8765;
const CLAUDE_SDK_PORT_RANGE = 100;
const BACKGROUND_TASK_CONTINUATION =
"Background tasks are complete. Continue with the task.";
const DEFAULT_CHILD_POLL_INTERVAL_MS = 2000;
const START_TIMEOUT_MS = 60_000;
const WAIT_TIMEOUT_MS = 600_000;
const DEFAULT_WAIT_TIMEOUT_MS = 600_000;

let childPollIntervalMs = DEFAULT_CHILD_POLL_INTERVAL_MS;
let waitTimeoutMs = DEFAULT_WAIT_TIMEOUT_MS;

type CountChildProcessesFn = (pid: number) => number;

const asRecord = (value: unknown): Record<string, unknown> => {
if (typeof value === "object" && value !== null) {
return value as Record<string, unknown>;
}
return {};
};

const wait = async (ms: number): Promise<void> => {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
};

const countChildProcesses = (pid: number): number => {
if (process.platform === "win32" || !Number.isInteger(pid) || pid <= 0) {
return 0;
}
try {
const proc = spawnSync({
cmd: ["pgrep", "-g", String(pid)],
stderr: "ignore",
stdout: "pipe",
});
const output = new TextDecoder().decode(proc.stdout).trim();
if (!output) {
return 0;
}
return output
.split("\n")
.map((rawPid) => Number.parseInt(rawPid.trim(), 10))
.filter((childPid) => Number.isInteger(childPid) && childPid > 0)
.filter((childPid) => childPid !== pid).length;
} catch {
return 0;
}
};

let countChildProcessesFn: CountChildProcessesFn = countChildProcesses;

const drainStream = (stream: ReadableStream<Uint8Array>): void => {
const reader = stream.getReader();
Expand Down Expand Up @@ -109,22 +163,51 @@ const isValidNdjson = (text: string): boolean => {
};

let spawnFn: SpawnFn = spawn;
let serveFn: ServeFn = serve;

export const claudeSdkInternals = {
BACKGROUND_TASK_CONTINUATION,
countChildProcesses,
restoreSpawnFn(): void {
spawnFn = spawn;
},
setSpawnFn(next: SpawnFn): void {
spawnFn = next;
},
restoreServeFn(): void {
serveFn = serve;
},
setServeFn(next: ServeFn): void {
serveFn = next;
},
restoreCountChildProcessesFn(): void {
countChildProcessesFn = countChildProcesses;
},
setCountChildProcessesFn(next: CountChildProcessesFn): void {
countChildProcessesFn = next;
},
restoreChildPollIntervalMs(): void {
childPollIntervalMs = DEFAULT_CHILD_POLL_INTERVAL_MS;
},
setChildPollIntervalMs(next: number): void {
childPollIntervalMs = next;
},
restoreWaitTimeoutMs(): void {
waitTimeoutMs = DEFAULT_WAIT_TIMEOUT_MS;
},
setWaitTimeoutMs(next: number): void {
waitTimeoutMs = next;
},
};

class ClaudeSdkClient {
private child: ReturnType<typeof spawn> | undefined;
private closed = false;
private lastSessionId = "";
private lock: Promise<void> = Promise.resolve();
private port = 0;
private ready = false;
private resumeId = "";
private server: Server | undefined;
private sessionId = "";
private started = false;
Expand All @@ -143,6 +226,14 @@ class ClaudeSdkClient {
return this.child !== undefined;
}

getLastSessionId(): string {
return this.lastSessionId;
}

setResumeId(id: string): void {
this.resumeId = id;
}

async start(): Promise<void> {
if (this.started) {
return;
Expand All @@ -163,6 +254,8 @@ class ClaudeSdkClient {
});

const url = `ws://localhost:${this.port}`;
const resumeArgs = this.resumeId ? ["--resume", this.resumeId] : [];
this.resumeId = "";

this.child = spawnFn(
[
Expand All @@ -179,6 +272,7 @@ class ClaudeSdkClient {
"--dangerously-skip-permissions",
"--sdk-url",
url,
...resumeArgs,
],
{
detached: DETACH_CHILD_PROCESS,
Expand Down Expand Up @@ -257,7 +351,7 @@ class ClaudeSdkClient {

private createServer(): void {
const self = this;
this.server = serve({
this.server = serveFn({
port: this.port,
fetch(req, server) {
const path = new URL(req.url).pathname;
Expand Down Expand Up @@ -350,6 +444,10 @@ class ClaudeSdkClient {
case "system":
if (msg.subtype === "init") {
this.sessionId = msg.session_id || "";
this.lastSessionId = this.sessionId || this.lastSessionId;
if (this.sessionId) {
console.error(`[loop] claude session: ${this.sessionId}`);
}
}
return;
case "control_response":
Expand Down Expand Up @@ -424,6 +522,16 @@ class ClaudeSdkClient {
return;
}
const state = this.turn;
if (state.backgroundTaskSeen) {
if (state.drainingBackground) {
return;
}
state.drainingBackground = true;
this.drainAndContinue(state).catch(() => {
// timeout and close handlers reject turn state; ignore drain errors
});
return;
}
this.turn = undefined;
state.resolve({
combined: state.combined,
Expand All @@ -436,6 +544,14 @@ class ClaudeSdkClient {
if (!(this.ws && msg.request_id)) {
return;
}
const input = asRecord(msg.request?.input);
if (
this.turn &&
msg.request?.tool_name === "Task" &&
input.run_in_background === true
) {
this.turn.backgroundTaskSeen = true;
}
if (msg.request?.subtype === "can_use_tool") {
this.sendJson({
type: "control_response",
Expand All @@ -451,10 +567,37 @@ class ClaudeSdkClient {
}
}

private async drainAndContinue(state: TurnState): Promise<void> {
while (this.turn === state) {
const pid = this.child?.pid;
const remaining =
typeof pid === "number" ? countChildProcessesFn(pid) : 0;
if (remaining <= 0) {
if (this.turn !== state) {
return;
}
state.backgroundTaskSeen = false;
state.drainingBackground = false;
this.sendUserMessage(BACKGROUND_TASK_CONTINUATION);
return;
}
await wait(childPollIntervalMs);
}
}

private sendJson(data: Record<string, unknown>): void {
this.ws?.send(`${JSON.stringify(data)}\n`);
}

private sendUserMessage(content: string): void {
this.sendJson({
type: "user",
message: { role: "user", content },
parent_tool_use_id: null,
session_id: this.sessionId,
});
}

private async runTurnExclusive(
prompt: string,
_opts: Options,
Expand All @@ -475,10 +618,12 @@ class ClaudeSdkClient {
this.turn = undefined;
reject(new Error("claude sdk turn timed out"));
}
}, WAIT_TIMEOUT_MS);
}, waitTimeoutMs);

this.turn = {
backgroundTaskSeen: false,
combined: "",
drainingBackground: false,
hasStreamed: false,
onDelta,
onParsed,
Expand All @@ -494,12 +639,7 @@ class ClaudeSdkClient {
},
};

this.sendJson({
type: "user",
message: { role: "user", content: prompt },
parent_tool_use_id: null,
session_id: this.sessionId,
});
this.sendUserMessage(prompt);
});

// Claude SDK session state is process-bound, so restart per turn to force a
Expand Down Expand Up @@ -566,8 +706,14 @@ const getClient = (): ClaudeSdkClient => {
return singleton;
};

export const startClaudeSdk = async (): Promise<void> => {
await getClient().start();
export const startClaudeSdk = async (
resumeSessionId?: string
): Promise<void> => {
const client = getClient();
if (resumeSessionId) {
client.setResumeId(resumeSessionId);
}
await client.start();
};

export const runClaudeTurn = (
Expand All @@ -590,6 +736,9 @@ export const interruptClaudeSdk = (signal: ExitSignal): void => {

export const hasClaudeSdkProcess = (): boolean => getClient().hasProcess();

export const getLastClaudeSessionId = (): string =>
singleton?.getLastSessionId() ?? "";

export const closeClaudeSdk = async (): Promise<void> => {
if (!singleton) {
return;
Expand Down
Loading