Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openrouter/spawn",
"version": "0.26.11",
"version": "0.27.0",
"type": "module",
"bin": {
"spawn": "cli.js"
Expand Down
210 changes: 210 additions & 0 deletions packages/cli/src/__tests__/pull-history.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
import { mkdirSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { parseAndMergeChildHistory } from "../commands/pull-history.js";
import { loadHistory } from "../history.js";

// ─── parseAndMergeChildHistory tests ─────────────────────────────────────────

describe("parseAndMergeChildHistory", () => {
let origSpawnHome: string | undefined;

beforeEach(() => {
origSpawnHome = process.env.SPAWN_HOME;
// Use isolated temp dir for history (preload sets HOME to a temp dir)
const tmpHome = process.env.HOME ?? "/tmp";
const spawnDir = join(tmpHome, `.spawn-test-${Date.now()}-${Math.random()}`);
mkdirSync(spawnDir, {
recursive: true,
});
process.env.SPAWN_HOME = spawnDir;
// Write empty history
writeFileSync(
join(spawnDir, "history.json"),
JSON.stringify({
version: 1,
records: [],
}),
);
});

afterEach(() => {
if (origSpawnHome === undefined) {
delete process.env.SPAWN_HOME;
} else {
process.env.SPAWN_HOME = origSpawnHome;
}
});

it("returns 0 for empty string", () => {
expect(parseAndMergeChildHistory("", "parent-123")).toBe(0);
});

it("returns 0 for empty object", () => {
expect(parseAndMergeChildHistory("{}", "parent-123")).toBe(0);
});

it("returns 0 for invalid JSON", () => {
expect(parseAndMergeChildHistory("not json", "parent-123")).toBe(0);
});

it("returns 0 for empty records array", () => {
const json = JSON.stringify({
version: 1,
records: [],
});
expect(parseAndMergeChildHistory(json, "parent-123")).toBe(0);
});

it("parses and merges valid child records", () => {
const json = JSON.stringify({
version: 1,
records: [
{
id: "child-1",
agent: "claude",
cloud: "hetzner",
timestamp: "2026-03-26T00:00:00Z",
},
{
id: "child-2",
agent: "codex",
cloud: "digitalocean",
timestamp: "2026-03-26T00:01:00Z",
name: "test-spawn",
},
],
});

const count = parseAndMergeChildHistory(json, "parent-123");
expect(count).toBe(2);

// Verify records were merged into history
const history = loadHistory();
const child1 = history.find((r) => r.id === "child-1");
const child2 = history.find((r) => r.id === "child-2");
expect(child1).toBeDefined();
expect(child1!.agent).toBe("claude");
expect(child1!.parent_id).toBe("parent-123");
expect(child2).toBeDefined();
expect(child2!.name).toBe("test-spawn");
expect(child2!.parent_id).toBe("parent-123");
});

it("preserves existing parent_id from child records", () => {
const json = JSON.stringify({
version: 1,
records: [
{
id: "grandchild-1",
agent: "claude",
cloud: "aws",
timestamp: "2026-03-26T00:00:00Z",
parent_id: "child-abc",
depth: 2,
},
],
});

const count = parseAndMergeChildHistory(json, "parent-123");
expect(count).toBe(1);

const history = loadHistory();
const gc = history.find((r) => r.id === "grandchild-1");
expect(gc).toBeDefined();
// parent_id should be preserved from the child record, not overwritten
// (mergeChildHistory only sets parent_id if it's not already set)
expect(gc!.parent_id).toBe("child-abc");
expect(gc!.depth).toBe(2);
});

it("skips records without an id", () => {
const json = JSON.stringify({
version: 1,
records: [
{
agent: "claude",
cloud: "hetzner",
timestamp: "2026-03-26T00:00:00Z",
},
{
id: "valid-1",
agent: "codex",
cloud: "gcp",
timestamp: "2026-03-26T00:01:00Z",
},
],
});

const count = parseAndMergeChildHistory(json, "parent-123");
expect(count).toBe(1);
});

it("preserves connection info from child records", () => {
const json = JSON.stringify({
version: 1,
records: [
{
id: "child-conn",
agent: "claude",
cloud: "digitalocean",
timestamp: "2026-03-26T00:00:00Z",
connection: {
ip: "10.0.0.1",
user: "root",
server_id: "12345",
},
},
],
});

const count = parseAndMergeChildHistory(json, "parent-123");
expect(count).toBe(1);

const history = loadHistory();
const child = history.find((r) => r.id === "child-conn");
expect(child!.connection?.ip).toBe("10.0.0.1");
expect(child!.connection?.server_id).toBe("12345");
});

it("deduplicates — calling twice with same records only merges once", () => {
const json = JSON.stringify({
version: 1,
records: [
{
id: "dedup-1",
agent: "claude",
cloud: "hetzner",
timestamp: "2026-03-26T00:00:00Z",
},
],
});

parseAndMergeChildHistory(json, "parent-123");
parseAndMergeChildHistory(json, "parent-123");

const history = loadHistory();
const matches = history.filter((r) => r.id === "dedup-1");
expect(matches.length).toBe(1);
});

it("handles whitespace-only input", () => {
expect(parseAndMergeChildHistory(" \n ", "parent-123")).toBe(0);
});

it("handles history without version field", () => {
const json = JSON.stringify({
records: [
{
id: "no-version",
agent: "hermes",
cloud: "sprite",
timestamp: "2026-03-26T00:00:00Z",
},
],
});

const count = parseAndMergeChildHistory(json, "parent-123");
expect(count).toBe(1);
});
});
2 changes: 2 additions & 0 deletions packages/cli/src/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export {
} from "./list.js";
// pick.ts — cmdPick
export { cmdPick } from "./pick.js";
// pull-history.ts — cmdPullHistory (recursive child history pull)
export { cmdPullHistory } from "./pull-history.js";
// run.ts — cmdRun, cmdRunHeadless, script failure guidance
export {
cmdRun,
Expand Down
168 changes: 168 additions & 0 deletions packages/cli/src/commands/pull-history.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// commands/pull-history.ts — `spawn pull-history`: recursively pull child spawn history
// Called automatically by the parent after a session ends, or manually.
// SSHes into each active child, tells it to pull from ITS children first,
// then downloads its history.json and merges into local history.

import type { SpawnRecord } from "../history.js";

import * as v from "valibot";
import { getActiveServers, mergeChildHistory, SpawnRecordSchema } from "../history.js";
import { parseJsonWith } from "../shared/parse.js";
import { asyncTryCatch } from "../shared/result.js";
import { ensureSshKeys, getSshKeyOpts } from "../shared/ssh-keys.js";
import { logDebug, logInfo } from "../shared/ui.js";

const ChildHistorySchema = v.object({
version: v.optional(v.number()),
records: v.array(SpawnRecordSchema),
});

/**
* Parse a child's history.json content and merge valid records into local history.
* Exported for testing — the SSH transport is in cmdPullHistory/pullFromChild.
*/
export function parseAndMergeChildHistory(json: string, parentSpawnId: string): number {
if (!json.trim() || json.trim() === "{}") {
return 0;
}

const parsed = parseJsonWith(json, ChildHistorySchema);
if (!parsed || parsed.records.length === 0) {
return 0;
}

const validRecords: SpawnRecord[] = [];
for (const r of parsed.records) {
if (r.id) {
validRecords.push({
id: r.id,
agent: r.agent,
cloud: r.cloud,
timestamp: r.timestamp,
...(r.name
? {
name: r.name,
}
: {}),
...(r.parent_id
? {
parent_id: r.parent_id,
}
: {}),
...(r.depth !== undefined
? {
depth: r.depth,
}
: {}),
...(r.connection
? {
connection: r.connection,
}
: {}),
});
}
}

if (validRecords.length > 0) {
mergeChildHistory(parentSpawnId, validRecords);
}
return validRecords.length;
}

/**
* Pull history from all active child VMs recursively.
* For each active child:
* 1. SSH in, run `spawn pull-history` (recurse into grandchildren)
* 2. Download the child's history.json
* 3. Merge into local history with parent_id links
*/
export async function cmdPullHistory(): Promise<void> {
const active = getActiveServers();

if (active.length === 0) {
return;
}

const keysResult = await asyncTryCatch(() => ensureSshKeys());
if (!keysResult.ok) {
logDebug("Could not load SSH keys for history pull");
return;
}
const sshKeyOpts = getSshKeyOpts(keysResult.data);

for (const record of active) {
if (!record.connection?.ip || !record.connection?.user) {
continue;
}

const { ip, user } = record.connection;
const spawnId = record.id;

await pullFromChild(ip, user, spawnId, sshKeyOpts);
}
}

async function pullFromChild(ip: string, user: string, parentSpawnId: string, sshKeyOpts: string[]): Promise<void> {
const result = await asyncTryCatch(async () => {
const sshBase = [
"ssh",
"-o",
"StrictHostKeyChecking=no",
"-o",
"ConnectTimeout=10",
"-o",
"BatchMode=yes",
...sshKeyOpts,
`${user}@${ip}`,
];

// Step 1: Tell the child to recursively pull from its own children
const recurseProc = Bun.spawnSync(
[
...sshBase,
'export PATH="$HOME/.local/bin:$HOME/.bun/bin:$PATH"; spawn pull-history 2>/dev/null || true',
],
{
stdio: [
"ignore",
"ignore",
"ignore",
],
timeout: 60_000,
},
);
if (recurseProc.exitCode !== 0) {
logDebug(`Recursive pull on ${ip} returned ${recurseProc.exitCode} (may not support pull-history)`);
}

// Step 2: Download the child's history.json via SSH + cat
const catProc = Bun.spawnSync(
[
...sshBase,
"cat ~/.spawn/history.json 2>/dev/null || cat ~/.config/spawn/history.json 2>/dev/null || echo '{}'",
],
{
stdio: [
"ignore",
"pipe",
"ignore",
],
timeout: 30_000,
},
);

if (catProc.exitCode !== 0) {
return;
}

const json = new TextDecoder().decode(catProc.stdout);
const merged = parseAndMergeChildHistory(json, parentSpawnId);
if (merged > 0) {
logInfo(`Pulled ${merged} record(s) from ${ip}`);
}
});

if (!result.ok) {
logDebug(`Could not pull history from ${ip}`);
}
}
Loading
Loading