diff --git a/hermes-sensor-bridge/.gitignore b/hermes-sensor-bridge/.gitignore new file mode 100644 index 0000000..06e6038 --- /dev/null +++ b/hermes-sensor-bridge/.gitignore @@ -0,0 +1,3 @@ +node_modules +dist +*.tsbuildinfo diff --git a/hermes-sensor-bridge/README.md b/hermes-sensor-bridge/README.md new file mode 100644 index 0000000..dde580e --- /dev/null +++ b/hermes-sensor-bridge/README.md @@ -0,0 +1,55 @@ +# @world2agent/hermes-sensor-bridge + +World2Agent bridge for [Hermes Agent](https://hermes-agent.nousresearch.com/). + +Runs W2A sensors as supervised Node subprocesses and delivers their signals into Hermes via the gateway's native webhook subscriptions. Each signal triggers a fresh `AIAgent.run_conversation()` with the corresponding handler skill auto-loaded by Hermes. + +> Status: in development. See [`docs/channel-hermes-agent-design.md`](../docs/channel-hermes-agent-design.md) for the design. + +## Layout + +``` +src/ + runner/ Node sensor-runner subprocess (one per enabled sensor) + supervisor/ Independent local daemon — spawns/monitors runners, + exposes 127.0.0.1 control HTTP for reload/list/health + cli/ `world2agent-hermes` CLI (start/stop/status/add/remove/list) +skills/ + world2agent-manage/ Agent-facing skill that wraps the CLI for + natural-language sensor management +``` + +## Bins + +- `world2agent-hermes` — user-facing CLI +- `world2agent-hermes-supervisor` — daemon (started by `world2agent-hermes start`) +- `world2agent-sensor-runner` — per-sensor subprocess (spawned by the supervisor) + +## Current CLI Flow + +`world2agent-hermes add` currently expects a hand-written config JSON file: + +```bash +world2agent-hermes add @world2agent/sensor-hackernews \ + --config-file ./hackernews.json +``` + +Supported add-time overrides: + +- `--config-file ` — bypasses interactive setup and writes the manifest directly +- `--webhook-url ` — provide the target webhook URL yourself +- `--hmac-secret ` — override the shared bridge HMAC secret +- `--no-hermes-subscribe` — skip the `hermes webhook subscribe` shellout entirely + +The last three flags are intended mainly for local development and testing. In +the normal path, the bridge calls `hermes webhook subscribe`, stores the +returned webhook URL in the manifest, and reloads the local supervisor. + +When a sensor package does not ship a machine-runnable setup helper, the bridge +generates a generic Hermes skill for that sensor instead of a fully customized +handler. The package's `SETUP.md` remains the source of truth for richer, +sensor-specific behavior. + +## Relation to `claude-code-channel` + +Sibling package. `claude-code-channel` is an in-process MCP channel for Claude Code; this package is an out-of-process bridge for Hermes. Both load the same `@world2agent/sensor-*` packages without modification. diff --git a/hermes-sensor-bridge/e2e/hackernews.config.json b/hermes-sensor-bridge/e2e/hackernews.config.json new file mode 100644 index 0000000..79a12cf --- /dev/null +++ b/hermes-sensor-bridge/e2e/hackernews.config.json @@ -0,0 +1,5 @@ +{ + "top_n": 5, + "min_score": 1, + "interval_seconds": 30 +} diff --git a/hermes-sensor-bridge/e2e/mock-hermes-receiver.mjs b/hermes-sensor-bridge/e2e/mock-hermes-receiver.mjs new file mode 100644 index 0000000..def2977 --- /dev/null +++ b/hermes-sensor-bridge/e2e/mock-hermes-receiver.mjs @@ -0,0 +1,86 @@ +#!/usr/bin/env node + +import { createHmac } from "node:crypto"; +import { createServer } from "node:http"; + +const port = Number(process.env.MOCK_HERMES_PORT ?? "8786"); +const secret = process.env.MOCK_HERMES_SECRET ?? "test-secret"; + +const server = createServer((req, res) => { + const chunks = []; + + req.on("data", (chunk) => { + chunks.push(chunk); + }); + + req.on("end", () => { + const body = Buffer.concat(chunks).toString("utf8"); + const signature = req.headers["x-webhook-signature"]; + const requestId = req.headers["x-request-id"]; + + if (typeof signature !== "string") { + res.statusCode = 400; + res.end("missing X-Webhook-Signature"); + return; + } + + if (signature.startsWith("sha256=")) { + res.statusCode = 400; + res.end("signature must be raw hex"); + return; + } + + const expected = createHmac("sha256", secret).update(body).digest("hex"); + if (signature !== expected) { + res.statusCode = 401; + res.end("invalid signature"); + return; + } + + let payload; + try { + payload = JSON.parse(body); + } catch { + res.statusCode = 400; + res.end("invalid json"); + return; + } + + const signalId = payload?.signal?.signal_id; + if (typeof signalId !== "string") { + res.statusCode = 400; + res.end("missing signal.signal_id"); + return; + } + + if (requestId !== signalId) { + res.statusCode = 400; + res.end("X-Request-ID mismatch"); + return; + } + + process.stdout.write( + JSON.stringify( + { + ok: true, + signature_prefix: signature.slice(0, 16), + request_id: requestId, + signal_id: signalId, + event_type: payload?.signal?.event?.type ?? null, + body: payload, + }, + null, + 2, + ) + "\n", + ); + + res.statusCode = 200; + res.end("ok"); + }); +}); + +server.listen(port, "127.0.0.1", () => { + process.stdout.write( + JSON.stringify({ ok: true, listening: `http://127.0.0.1:${port}`, secret }, null, 2) + "\n", + ); +}); diff --git a/hermes-sensor-bridge/e2e/test-delivery.mjs b/hermes-sensor-bridge/e2e/test-delivery.mjs new file mode 100644 index 0000000..b014e4f --- /dev/null +++ b/hermes-sensor-bridge/e2e/test-delivery.mjs @@ -0,0 +1,208 @@ +#!/usr/bin/env node +/** + * Smoke test for the supervisor's delivery worker. Stands up a tiny + * http.createServer that mimics the contract Hermes's webhook adapter + * imposes (HMAC raw-hex match + body shape + X-Request-ID), then drives + * `httpPost` and `renderPrompt` directly to verify: + * + * 1. Body has shape `{ prompt, signal }` with prompt ending in a JSON + * code fence containing the original signal. + * 2. X-Request-ID equals signal.signal_id. + * 3. X-Webhook-Signature is the HMAC-SHA256 of the body, raw hex (no + * `sha256=` prefix). + * 4. 5xx triggers retry; 4xx fails immediately. + * + * Usage: + * node e2e/test-delivery.mjs + */ + +import { createServer } from "node:http"; +import { createHmac } from "node:crypto"; +import { httpPost, renderPrompt } from "../dist/supervisor/spawn.js"; + +let failures = 0; +function check(label, cond, detail) { + const ok = !!cond; + process.stdout.write(`${ok ? "PASS" : "FAIL"} ${label}\n`); + if (!ok) { + failures++; + if (detail !== undefined) process.stdout.write(` ${detail}\n`); + } +} + +const SECRET = "test-secret-deadbeef"; + +function startServer(handler) { + return new Promise((resolve) => { + const srv = createServer(async (req, res) => { + let buf = ""; + for await (const chunk of req) buf += chunk; + handler(req, buf, res); + }); + srv.listen(0, "127.0.0.1", () => { + const addr = srv.address(); + resolve({ srv, url: `http://127.0.0.1:${addr.port}` }); + }); + }); +} + +const fakeSignal = { + signal_id: "test-sig-123", + schema_version: "0.1.0", + source: { sensor_id: "test-sensor" }, + event: { + type: "news.story.trending", + summary: "Test story summary", + occurred_at: "2026-04-27T12:00:00Z", + }, + attachments: [{ media_type: "text/markdown", title: "body" }], +}; + +// case 1: happy path — verify body, headers, prompt shape +{ + let captured; + const { srv, url } = await startServer((req, body, res) => { + captured = { headers: req.headers, body }; + res.statusCode = 202; + res.end("ok"); + }); + try { + const body = JSON.stringify({ + prompt: renderPrompt(fakeSignal), + signal: fakeSignal, + }); + const sig = createHmac("sha256", SECRET).update(body).digest("hex"); + await httpPost( + url, + body, + { + "content-type": "application/json", + "x-request-id": fakeSignal.signal_id, + "x-webhook-signature": sig, + }, + { timeoutMs: 5_000, maxAttempts: 1, baseDelayMs: 100 }, + ); + + check("happy: server received POST", !!captured); + check( + "happy: x-request-id == signal.signal_id", + captured.headers["x-request-id"] === fakeSignal.signal_id, + ); + check( + "happy: x-webhook-signature is raw hex (no sha256= prefix)", + typeof captured.headers["x-webhook-signature"] === "string" && + /^[0-9a-f]{64}$/.test(captured.headers["x-webhook-signature"]), + `got: ${captured.headers["x-webhook-signature"]}`, + ); + check( + "happy: signature matches recomputed HMAC", + captured.headers["x-webhook-signature"] === sig, + ); + + const parsed = JSON.parse(captured.body); + check("happy: body has prompt + signal", typeof parsed.prompt === "string" && !!parsed.signal); + check( + "happy: signal in body matches input", + parsed.signal.signal_id === fakeSignal.signal_id, + ); + check( + "happy: prompt body has type + summary", + parsed.prompt.includes("news.story.trending") && parsed.prompt.includes("Test story summary"), + ); + check( + "happy: prompt body ends with JSON code fence containing signal", + /```json[\s\S]*"signal_id": "test-sig-123"[\s\S]*```/.test(parsed.prompt), + ); + } finally { + srv.close(); + } +} + +// case 2: 4xx — fail fast, no retry +{ + let calls = 0; + const { srv, url } = await startServer((_req, _body, res) => { + calls++; + res.statusCode = 401; + res.end("unauthorized"); + }); + try { + let threw = false; + try { + await httpPost( + url, + "{}", + {}, + { timeoutMs: 2_000, maxAttempts: 3, baseDelayMs: 10 }, + ); + } catch (error) { + threw = true; + check("4xx: error mentions 401", String(error).includes("401")); + } + check("4xx: throws", threw); + check("4xx: only one call (no retry)", calls === 1); + } finally { + srv.close(); + } +} + +// case 3: 5xx — retry up to maxAttempts, eventually throws +{ + let calls = 0; + const { srv, url } = await startServer((_req, _body, res) => { + calls++; + res.statusCode = 503; + res.end("flaky"); + }); + try { + let threw = false; + try { + await httpPost( + url, + "{}", + {}, + { timeoutMs: 2_000, maxAttempts: 3, baseDelayMs: 10 }, + ); + } catch (error) { + threw = true; + check("5xx: error mentions 503", String(error).includes("503")); + } + check("5xx: throws after retries", threw); + check("5xx: called maxAttempts times", calls === 3, `calls=${calls}`); + } finally { + srv.close(); + } +} + +// case 4: 5xx then 200 — retry succeeds +{ + let calls = 0; + const { srv, url } = await startServer((_req, _body, res) => { + calls++; + if (calls < 2) { + res.statusCode = 503; + res.end("flaky"); + } else { + res.statusCode = 200; + res.end("ok"); + } + }); + try { + await httpPost( + url, + "{}", + {}, + { timeoutMs: 2_000, maxAttempts: 3, baseDelayMs: 10 }, + ); + check("5xx-then-200: succeeded after retry", true); + check("5xx-then-200: exactly 2 calls", calls === 2, `calls=${calls}`); + } finally { + srv.close(); + } +} + +if (failures > 0) { + process.stderr.write(`\n${failures} check(s) failed.\n`); + process.exit(1); +} +process.stdout.write("\nAll checks passed.\n"); diff --git a/hermes-sensor-bridge/e2e/test-ensure-hermes-webhook.mjs b/hermes-sensor-bridge/e2e/test-ensure-hermes-webhook.mjs new file mode 100644 index 0000000..a9b93db --- /dev/null +++ b/hermes-sensor-bridge/e2e/test-ensure-hermes-webhook.mjs @@ -0,0 +1,229 @@ +#!/usr/bin/env node +/** + * Smoke test for ensureHermesWebhookEnabled — exercises the four states: + * 1. Empty HERMES_HOME → block written to both config.yaml and .env. + * 2. Re-run on the same HERMES_HOME → idempotent no-op. + * 3. Hand-written `platforms.webhook.enabled: true` already → detected, no write. + * 4. Hand-written *unmanaged* top-level `platforms:` block → throws with guidance. + * + * Usage: + * node e2e/test-ensure-hermes-webhook.mjs + */ + +import { mkdtempSync, rmSync, mkdirSync, writeFileSync, readFileSync, existsSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { ensureHermesWebhookEnabled } from "../dist/cli/common.js"; +import { + getBridgePaths, + normalizeSensorEntry, + upsertSensorEntry, + readManifest, + writeManifest, + ensureBridgeDirs, +} from "../dist/supervisor/manifest.js"; + +let failures = 0; + +function check(label, condition, detail) { + const ok = !!condition; + process.stdout.write(`${ok ? "PASS" : "FAIL"} ${label}\n`); + if (!ok) { + failures++; + if (detail) process.stdout.write(` ${detail}\n`); + } +} + +function makeHome() { + const home = mkdtempSync(join(tmpdir(), "w2a-hermes-home-")); + // emulate Hermes's standard layout + mkdirSync(home, { recursive: true }); + return home; +} + +function pathsFor(home) { + return getBridgePaths({ ...process.env, HERMES_HOME: home }); +} + +async function caseFreshHome() { + const home = makeHome(); + try { + const paths = pathsFor(home); + const result = await ensureHermesWebhookEnabled(paths); + const yaml = readFileSync(paths.hermesConfigYamlFile, "utf8"); + const env = readFileSync(paths.hermesEnvFile, "utf8"); + + check("fresh: alreadyEnabled false", result.alreadyEnabled === false); + check("fresh: configYamlModified", result.configYamlModified === true); + check("fresh: envModified", result.envModified === true); + check("fresh: yaml has platforms.webhook.enabled", /platforms:\s*\n\s*webhook:\s*\n\s*enabled:\s*true/.test(yaml)); + check("fresh: env has WEBHOOK_ENABLED=true", /^WEBHOOK_ENABLED=true$/m.test(env)); + check("fresh: yaml has managed marker", yaml.includes("world2agent-hermes-bridge (managed)")); + } finally { + rmSync(home, { recursive: true, force: true }); + } +} + +async function caseIdempotent() { + const home = makeHome(); + try { + const paths = pathsFor(home); + await ensureHermesWebhookEnabled(paths); + const result = await ensureHermesWebhookEnabled(paths); + + check("idempotent: alreadyEnabled true", result.alreadyEnabled === true); + check("idempotent: detectedVia is config-yaml", result.detectedVia === "config-yaml"); + check("idempotent: configYamlModified false", result.configYamlModified === false); + check("idempotent: envModified false", result.envModified === false); + } finally { + rmSync(home, { recursive: true, force: true }); + } +} + +async function caseUserPreEnabled() { + const home = makeHome(); + try { + const paths = pathsFor(home); + writeFileSync( + paths.hermesConfigYamlFile, + 'platforms:\n webhook:\n enabled: true\n extra:\n port: 9999\n', + "utf8", + ); + const result = await ensureHermesWebhookEnabled(paths); + const yaml = readFileSync(paths.hermesConfigYamlFile, "utf8"); + + check("user-enabled: alreadyEnabled true", result.alreadyEnabled === true); + check("user-enabled: detectedVia is config-yaml", result.detectedVia === "config-yaml"); + check("user-enabled: yaml unchanged", !yaml.includes("world2agent-hermes-bridge (managed)")); + } finally { + rmSync(home, { recursive: true, force: true }); + } +} + +async function caseUserUnmanagedPlatformsRefuses() { + const home = makeHome(); + try { + const paths = pathsFor(home); + // user has top-level platforms: with telegram (no webhook), expect refusal + writeFileSync( + paths.hermesConfigYamlFile, + 'platforms:\n telegram:\n enabled: true\n', + "utf8", + ); + let threw = false; + let message = ""; + try { + await ensureHermesWebhookEnabled(paths); + } catch (error) { + threw = true; + message = error?.message ?? String(error); + } + check("unmanaged: throws", threw); + check("unmanaged: error mentions platforms", /platforms:/.test(message)); + check("unmanaged: error mentions hermes gateway setup", /hermes gateway setup/.test(message)); + } finally { + rmSync(home, { recursive: true, force: true }); + } +} + +async function casePartialStateHealed() { + const home = makeHome(); + try { + const paths = pathsFor(home); + // simulate user that hand-enabled webhook in config.yaml but never wrote .env + writeFileSync( + paths.hermesConfigYamlFile, + 'platforms:\n webhook:\n enabled: true\n extra:\n port: 9999\n', + "utf8", + ); + const result = await ensureHermesWebhookEnabled(paths); + const yaml = readFileSync(paths.hermesConfigYamlFile, "utf8"); + const env = readFileSync(paths.hermesEnvFile, "utf8"); + + check("partial: alreadyEnabled true (yaml had it)", result.alreadyEnabled === true); + check("partial: detectedVia config-yaml", result.detectedVia === "config-yaml"); + check("partial: yaml unchanged", !yaml.includes("world2agent-hermes-bridge (managed)")); + check("partial: env now patched", result.envModified === true); + check("partial: env has WEBHOOK_ENABLED=true", /^WEBHOOK_ENABLED=true$/m.test(env)); + check("partial: env has marker", env.includes("world2agent-hermes-bridge (managed)")); + } finally { + rmSync(home, { recursive: true, force: true }); + } +} + +async function caseNestedAgentPlatformsIgnored() { + const home = makeHome(); + try { + const paths = pathsFor(home); + // Mimics the real Hermes config shape where `agent.platforms: {}` is at indent 2. + // Our top-level scanner must NOT treat that as an unmanaged top-level `platforms:`. + writeFileSync( + paths.hermesConfigYamlFile, + "agent:\n platforms: {}\n some_other: value\n", + "utf8", + ); + const result = await ensureHermesWebhookEnabled(paths); + const yaml = readFileSync(paths.hermesConfigYamlFile, "utf8"); + + check("nested: alreadyEnabled false", result.alreadyEnabled === false); + check("nested: configYamlModified true (top-level platforms was missing)", result.configYamlModified === true); + check("nested: managed block appended", yaml.includes("world2agent-hermes-bridge (managed)")); + check("nested: original agent.platforms still present", yaml.includes("agent:\n platforms: {}")); + } finally { + rmSync(home, { recursive: true, force: true }); + } +} + +async function caseSkillIdRoundTrip() { + const home = makeHome(); + try { + const paths = pathsFor(home); + await ensureBridgeDirs(paths); + + const customSkillId = "my-custom-handler"; + const entry = { + sensor_id: "hn-custom", + pkg: "@world2agent/sensor-hackernews", + skill_id: customSkillId, + subscription_name: "world2agent-hn-custom", + webhook_url: "http://127.0.0.1:8644/webhooks/world2agent-hn-custom", + enabled: true, + config: { top_n: 3 }, + }; + + const normalized = normalizeSensorEntry(entry); + check("skill_id: normalize preserves custom skill_id", normalized.skill_id === customSkillId); + + const initial = await readManifest(paths); + const next = upsertSensorEntry(initial, entry); + await writeManifest(paths, next); + + const reloaded = await readManifest(paths); + const found = reloaded.sensors.find((s) => s.sensor_id === "hn-custom"); + check("skill_id: reload returns single entry", !!found); + check("skill_id: parse preserves custom skill_id", found?.skill_id === customSkillId); + + // Default fallback (no skill_id set) still derives from pkg. + const fallback = normalizeSensorEntry({ ...entry, sensor_id: "hn-default", skill_id: "" }); + check( + "skill_id: empty falls back to packageToSkillId", + fallback.skill_id === "world2agent-sensor-hackernews", + ); + } finally { + rmSync(home, { recursive: true, force: true }); + } +} + +await caseFreshHome(); +await caseIdempotent(); +await caseUserPreEnabled(); +await caseUserUnmanagedPlatformsRefuses(); +await casePartialStateHealed(); +await caseNestedAgentPlatformsIgnored(); +await caseSkillIdRoundTrip(); + +if (failures > 0) { + process.stderr.write(`\n${failures} check(s) failed.\n`); + process.exit(1); +} +process.stdout.write("\nAll checks passed.\n"); diff --git a/hermes-sensor-bridge/package.json b/hermes-sensor-bridge/package.json new file mode 100644 index 0000000..7501aa3 --- /dev/null +++ b/hermes-sensor-bridge/package.json @@ -0,0 +1,54 @@ +{ + "name": "@world2agent/hermes-sensor-bridge", + "version": "0.0.0-dev", + "description": "World2Agent bridge for Hermes Agent — runs sensors as supervised subprocesses and delivers their signals into Hermes via webhook subscriptions", + "license": "Apache-2.0", + "author": "MachinePulse Pte. Ltd.", + "homepage": "https://github.com/machinepulse-ai/world2agent", + "repository": { + "type": "git", + "url": "git+https://github.com/machinepulse-ai/world2agent-plugins.git", + "directory": "hermes-sensor-bridge" + }, + "bugs": { + "url": "https://github.com/machinepulse-ai/world2agent-plugins/issues" + }, + "keywords": [ + "world2agent", + "w2a", + "hermes", + "hermes-agent", + "webhook", + "bridge", + "sensor" + ], + "engines": { + "node": ">=20" + }, + "type": "module", + "bin": { + "world2agent-sensor-runner": "./dist/runner/bin.js", + "world2agent-hermes-supervisor": "./dist/supervisor/bin.js", + "world2agent-hermes": "./dist/cli/bin.js" + }, + "scripts": { + "build": "tsc --build", + "clean": "rm -rf dist *.tsbuildinfo", + "prepublishOnly": "pnpm run clean && pnpm run build" + }, + "dependencies": { + "@world2agent/sdk": "0.1.0-alpha.1" + }, + "devDependencies": { + "@types/node": "^25.5.0", + "typescript": "^5.8.3" + }, + "files": [ + "dist", + "skills", + "README.md" + ], + "publishConfig": { + "access": "public" + } +} diff --git a/hermes-sensor-bridge/pnpm-lock.yaml b/hermes-sensor-bridge/pnpm-lock.yaml new file mode 100644 index 0000000..0c79676 --- /dev/null +++ b/hermes-sensor-bridge/pnpm-lock.yaml @@ -0,0 +1,58 @@ +lockfileVersion: '9.0' + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + +importers: + + .: + dependencies: + '@world2agent/sdk': + specifier: 0.1.0-alpha.1 + version: 0.1.0-alpha.1(zod@3.25.76) + devDependencies: + '@types/node': + specifier: ^25.5.0 + version: 25.6.0 + typescript: + specifier: ^5.8.3 + version: 5.9.3 + +packages: + + '@types/node@25.6.0': + resolution: {integrity: sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==} + + '@world2agent/sdk@0.1.0-alpha.1': + resolution: {integrity: sha512-YfCdXPyX9Zm811fsT0kiTfCRW7iOZ4ByYZCwlqeKZbXRy8/RxJrse6KGzexfZWAXv0L8Gl8ZvOJTs4WesfIiaQ==} + engines: {node: '>=20'} + peerDependencies: + zod: ^3.25.0 + + typescript@5.9.3: + resolution: {integrity: sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==} + engines: {node: '>=14.17'} + hasBin: true + + undici-types@7.19.2: + resolution: {integrity: sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==} + + zod@3.25.76: + resolution: {integrity: sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==} + +snapshots: + + '@types/node@25.6.0': + dependencies: + undici-types: 7.19.2 + + '@world2agent/sdk@0.1.0-alpha.1(zod@3.25.76)': + dependencies: + zod: 3.25.76 + + typescript@5.9.3: {} + + undici-types@7.19.2: {} + + zod@3.25.76: {} diff --git a/hermes-sensor-bridge/skills/world2agent-manage/SKILL.md b/hermes-sensor-bridge/skills/world2agent-manage/SKILL.md new file mode 100644 index 0000000..00ad1ea --- /dev/null +++ b/hermes-sensor-bridge/skills/world2agent-manage/SKILL.md @@ -0,0 +1,64 @@ +--- +name: world2agent-manage +description: Manage World2Agent sensors for Hermes. Use when the user asks to install, list, remove, or inspect W2A sensors, or wants to subscribe to an outside-world source such as Hacker News, GitHub, RSS, calendars, or market feeds. +user-invocable: false +--- + +# World2Agent Sensor Management + +You manage the user's World2Agent sensors on this Hermes machine. + +All mutations go through the `world2agent-hermes` CLI. The shell scripts in +`scripts/` are thin wrappers that exec the CLI directly. + +## List sensors + +Run: + +```bash +bash "$W2A_PLUGIN_HOME/skills/world2agent-manage/scripts/list.sh" +``` + +The CLI prints JSON with the manifest state and any live runtime status reported +by the local supervisor. + +## Install a sensor + +1. Confirm the npm package name with the user. +2. Inspect the sensor package's `SETUP.md` to determine the config fields it + needs. The current bridge implementation does **not** run an interactive + setup helper automatically. +3. Write a temporary JSON file containing the sensor config object only. +4. Run: + +```bash +bash "$W2A_PLUGIN_HOME/skills/world2agent-manage/scripts/add.sh" --config-file +``` + +Optional flags: + +- `--sensor-id ` if the user wants a non-default instance id. +- `--webhook-url --hmac-secret --no-hermes-subscribe` for local + dev/test runs that bypass `hermes webhook subscribe`. + +Never invent credentials or secrets. Ask the user explicitly when the config +requires them. + +## Remove a sensor + +Run: + +```bash +bash "$W2A_PLUGIN_HOME/skills/world2agent-manage/scripts/remove.sh" +``` + +Pass `--purge` only if the user explicitly wants the generated Hermes skill +directory removed too. + +## Output style + +After each action, summarize: + +- which sensor ids were affected +- whether the supervisor reload succeeded +- any warnings or errors returned by the CLI diff --git a/hermes-sensor-bridge/skills/world2agent-manage/scripts/add.sh b/hermes-sensor-bridge/skills/world2agent-manage/scripts/add.sh new file mode 100755 index 0000000..ec4be34 --- /dev/null +++ b/hermes-sensor-bridge/skills/world2agent-manage/scripts/add.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +set -euo pipefail +exec world2agent-hermes add "$@" diff --git a/hermes-sensor-bridge/skills/world2agent-manage/scripts/list.sh b/hermes-sensor-bridge/skills/world2agent-manage/scripts/list.sh new file mode 100755 index 0000000..cc91815 --- /dev/null +++ b/hermes-sensor-bridge/skills/world2agent-manage/scripts/list.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +set -euo pipefail +exec world2agent-hermes list "$@" diff --git a/hermes-sensor-bridge/skills/world2agent-manage/scripts/remove.sh b/hermes-sensor-bridge/skills/world2agent-manage/scripts/remove.sh new file mode 100755 index 0000000..601e454 --- /dev/null +++ b/hermes-sensor-bridge/skills/world2agent-manage/scripts/remove.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +set -euo pipefail +exec world2agent-hermes remove "$@" diff --git a/hermes-sensor-bridge/src/cli/bin.ts b/hermes-sensor-bridge/src/cli/bin.ts new file mode 100644 index 0000000..8e03a4f --- /dev/null +++ b/hermes-sensor-bridge/src/cli/bin.ts @@ -0,0 +1,52 @@ +#!/usr/bin/env node + +import { parseArgs } from "./common.js"; +import { runAddCommand } from "./commands/add.js"; +import { runHermesInitCommand } from "./commands/hermes-init.js"; +import { runListCommand } from "./commands/list.js"; +import { runLogsCommand } from "./commands/logs.js"; +import { runRemoveCommand } from "./commands/remove.js"; +import { runStartCommand } from "./commands/start.js"; +import { runStatusCommand } from "./commands/status.js"; +import { runStopCommand } from "./commands/stop.js"; + +async function main(): Promise { + const [command, ...rest] = process.argv.slice(2); + const args = parseArgs(rest); + + switch (command) { + case "start": + await runStartCommand(args); + return; + case "stop": + await runStopCommand(); + return; + case "status": + await runStatusCommand(args); + return; + case "list": + await runListCommand(args); + return; + case "add": + await runAddCommand(args); + return; + case "remove": + await runRemoveCommand(args); + return; + case "logs": + await runLogsCommand(args); + return; + case "hermes-init": + await runHermesInitCommand(args); + return; + default: + throw new Error( + "Usage: world2agent-hermes [...]", + ); + } +} + +main().catch((error) => { + console.error(error instanceof Error ? error.message : String(error)); + process.exit(1); +}); diff --git a/hermes-sensor-bridge/src/cli/commands/add.ts b/hermes-sensor-bridge/src/cli/commands/add.ts new file mode 100644 index 0000000..9ebb0b5 --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/add.ts @@ -0,0 +1,234 @@ +import { packageToSkillId } from "@world2agent/sdk"; +import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { join } from "node:path"; +import { + defaultSensorId, + ensureBridgeDirs, + getBridgePaths, + loadOrCreateHmacSecret, + readManifest, + upsertSensorEntry, + writeManifest, +} from "../../supervisor/manifest.js"; +import { + ensureHermesWebhookEnabled, + ensurePackageInstalled, + getPort, + getStringFlag, + hasFlag, + maybeReloadSupervisor, + printJson, + runCommand, + type InstalledPackageInfo, + type ParsedArgs, +} from "../common.js"; + +export async function runAddCommand(args: ParsedArgs): Promise { + const pkg = args._[0]; + if (!pkg) { + throw new Error("Usage: world2agent-hermes add --config-file "); + } + + const paths = getBridgePaths(); + await ensureBridgeDirs(paths); + + const installed = await ensurePackageInstalled(pkg); + const config = await loadConfig(getStringFlag(args, "config-file"), installed); + const skillId = getStringFlag(args, "skill-id") ?? packageToSkillId(pkg); + const sensorId = getStringFlag(args, "sensor-id") ?? defaultSensorId(pkg); + const port = getPort(args); + const noHermesSubscribe = hasFlag(args, "no-hermes-subscribe"); + const webhookUrlFlag = getStringFlag(args, "webhook-url"); + const hmacSecret = await loadOrCreateHmacSecret( + paths, + getStringFlag(args, "hmac-secret"), + ); + + const hermesWebhook = noHermesSubscribe + ? null + : await ensureHermesWebhookEnabled(paths, { secret: hmacSecret }); + + const { webhookUrl, subscriptionName, subscribeResult } = + noHermesSubscribe + ? { + webhookUrl: requireString( + webhookUrlFlag, + "--webhook-url is required with --no-hermes-subscribe", + ), + subscriptionName: undefined, + subscribeResult: null, + } + : await subscribeWithHermes(sensorId, skillId, hmacSecret); + + await writeGenericSkill(paths.hermesSkillsDir, skillId, pkg, installed); + + const manifest = await readManifest(paths); + const nextManifest = upsertSensorEntry(manifest, { + sensor_id: sensorId, + pkg, + skill_id: skillId, + subscription_name: subscriptionName, + webhook_url: webhookUrl, + enabled: true, + config, + }); + await writeManifest(paths, nextManifest); + + const reload = await maybeReloadSupervisor(port, paths); + printJson({ + ok: true, + sensor_id: sensorId, + skill_id: skillId, + webhook_url: webhookUrl, + hmac_secret_source: getStringFlag(args, "hmac-secret") ? "override" : "stored", + subscription_name: subscriptionName ?? null, + subscribe: subscribeResult, + hermes_webhook: hermesWebhook, + reload, + }); +} + +async function loadConfig( + configFile: string | undefined, + installed: InstalledPackageInfo, +): Promise> { + if (!configFile) { + const setupPath = String( + (installed.packageJson.w2a as Record | undefined)?.setup ?? "SETUP.md", + ); + throw new Error( + `Interactive setup is not implemented; use --config-file . Sensor guidance: ${join( + installed.packageRoot, + setupPath, + )}`, + ); + } + + const raw = JSON.parse(await readFile(configFile, "utf8")) as unknown; + if (!raw || typeof raw !== "object" || Array.isArray(raw)) { + throw new Error(`Config file must contain a JSON object: ${configFile}`); + } + return raw as Record; +} + +async function subscribeWithHermes( + sensorId: string, + skillId: string, + hmacSecret: string, +): Promise<{ + webhookUrl: string; + subscriptionName: string; + subscribeResult: unknown; +}> { + const subscriptionName = `world2agent-${sensorId}`; + const { stdout } = await runCommand("hermes", [ + "webhook", + "subscribe", + subscriptionName, + "--description", + `World2Agent: ${skillId}`, + "--skills", + skillId, + "--prompt", + "{prompt}", + "--secret", + hmacSecret, + ]); + + const parsed = parseSubscribeOutput(stdout); + return { + webhookUrl: parsed.url, + subscriptionName: parsed.name ?? subscriptionName, + subscribeResult: parsed.raw, + }; +} + +function parseSubscribeOutput(stdout: string): { + url: string; + name: string | undefined; + raw: unknown; +} { + const trimmed = stdout.trim(); + + try { + const json = JSON.parse(trimmed) as Record; + const url = firstString(json, ["url", "webhook_url", "deliver_url"]); + if (url) { + const name = firstString(json, ["name", "subscription_name", "id"]); + return { url, name, raw: json }; + } + } catch { + // fall through + } + + const url = trimmed.match(/https?:\/\/\S+/)?.[0]; + if (!url) { + throw new Error(`Could not parse webhook URL from hermes subscribe output: ${trimmed}`); + } + // We do not synthesize a default name — the caller already has the name it + // passed to `hermes webhook subscribe` and is the source of truth for it. + return { url, name: undefined, raw: trimmed }; +} + +async function writeGenericSkill( + hermesSkillsDir: string, + skillId: string, + pkg: string, + installed: InstalledPackageInfo, +): Promise { + const sourceType = String( + (installed.packageJson.w2a as Record | undefined)?.source_type ?? pkg, + ); + const signals = ( + (installed.packageJson.w2a as Record | undefined)?.signals as + | string[] + | undefined + )?.join(", "); + + const skillDir = join(hermesSkillsDir, skillId); + await mkdir(skillDir, { recursive: true }); + const skillMd = [ + "---", + `name: ${skillId}`, + `description: Handle World2Agent signals from ${pkg}.`, + "user-invocable: false", + "---", + "", + `# ${skillId}`, + "", + `Handle W2A signals from \`${pkg}\` (source type: \`${sourceType}\`).`, + "", + "## Inputs", + "- The prompt body contains markdown context plus a fenced JSON copy of the full `signal` object.", + signals ? `- Common signal types: ${signals}` : "- Inspect `signal.event.type` for the exact event kind.", + "", + "## Behavior", + "- Parse the JSON when you need structured fields.", + "- If the signal is irrelevant or obviously low-value, skip silently.", + "- If it is actionable, reply briefly with the key fact, why it matters, and any obvious next step.", + "", + "## Notes", + "- This skill was generated from the bridge CLI because the sensor package does not ship a machine-runnable setup script yet.", + "- Replace it with a richer sensor-specific handler if you need more nuanced behavior.", + "", + ].join("\n"); + await writeFile(join(skillDir, "SKILL.md"), skillMd, "utf8"); +} + +function firstString( + value: Record, + keys: string[], +): string | undefined { + for (const key of keys) { + const candidate = value[key]; + if (typeof candidate === "string" && candidate) { + return candidate; + } + } + return undefined; +} + +function requireString(value: string | undefined, errorMessage: string): string { + if (!value) throw new Error(errorMessage); + return value; +} diff --git a/hermes-sensor-bridge/src/cli/commands/hermes-init.ts b/hermes-sensor-bridge/src/cli/commands/hermes-init.ts new file mode 100644 index 0000000..c29d80e --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/hermes-init.ts @@ -0,0 +1,42 @@ +import { getBridgePaths } from "../../supervisor/manifest.js"; +import { + ensureHermesWebhookEnabled, + printJson, + type ParsedArgs, +} from "../common.js"; + +export async function runHermesInitCommand(args: ParsedArgs): Promise { + const portRaw = args.flags.get("hermes-port"); + const port = typeof portRaw === "string" ? Number(portRaw) : undefined; + if (port !== undefined && (!Number.isInteger(port) || port <= 0 || port > 65535)) { + throw new Error(`Invalid --hermes-port value: ${portRaw}`); + } + + const paths = getBridgePaths(); + const result = await ensureHermesWebhookEnabled(paths, { port }); + + const nextSteps: string[] = []; + if (result.alreadyEnabled && !result.configYamlModified && !result.envModified) { + nextSteps.push( + "Hermes webhook platform was already enabled — no changes were made.", + ); + } else { + if (result.configYamlModified) { + nextSteps.push( + `Wrote a managed 'platforms.webhook' block to ${result.configYamlFile}.`, + ); + } + if (result.envModified) { + nextSteps.push( + `Wrote managed WEBHOOK_* env vars to ${result.hermesEnvFile}.`, + ); + } + nextSteps.push( + result.gatewayRestartRequired + ? "Restart the Hermes gateway so the new config is picked up." + : "Start the Hermes gateway: 'hermes gateway run'.", + ); + } + + printJson({ ok: true, ...result, next_steps: nextSteps }); +} diff --git a/hermes-sensor-bridge/src/cli/commands/list.ts b/hermes-sensor-bridge/src/cli/commands/list.ts new file mode 100644 index 0000000..53bc91b --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/list.ts @@ -0,0 +1,28 @@ +import { getBridgePaths, readManifest } from "../../supervisor/manifest.js"; +import { + getPort, + printJson, + readRuntimeState, + type ParsedArgs, +} from "../common.js"; + +export async function runListCommand(args: ParsedArgs): Promise { + const paths = getBridgePaths(); + const port = getPort(args); + const manifest = await readManifest(paths); + const runtime = await readRuntimeState(port, paths); + + const handles = new Map(); + const runtimeHandles = ((runtime?.list as { handles?: any[] } | undefined)?.handles ?? []); + for (const handle of runtimeHandles) { + handles.set(handle.sensor_id, handle); + } + + printJson({ + ok: true, + sensors: manifest.sensors.map((entry) => ({ + ...entry, + runtime: handles.get(entry.sensor_id) ?? null, + })), + }); +} diff --git a/hermes-sensor-bridge/src/cli/commands/logs.ts b/hermes-sensor-bridge/src/cli/commands/logs.ts new file mode 100644 index 0000000..b50549f --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/logs.ts @@ -0,0 +1,16 @@ +import { readFile } from "node:fs/promises"; +import { getBridgePaths } from "../../supervisor/manifest.js"; +import { getStringFlag, type ParsedArgs } from "../common.js"; + +export async function runLogsCommand(args: ParsedArgs): Promise { + const sensorId = args._[0]; + const lineLimit = Number(getStringFlag(args, "lines") ?? "100"); + const paths = getBridgePaths(); + const raw = await readFile(paths.supervisorLogFile, "utf8"); + const lines = raw + .trimEnd() + .split("\n") + .filter((line) => !sensorId || line.includes(`[w2a/${sensorId}]`)); + const sliced = lines.slice(-Math.max(1, lineLimit)); + process.stdout.write(sliced.join("\n") + (sliced.length > 0 ? "\n" : "")); +} diff --git a/hermes-sensor-bridge/src/cli/commands/remove.ts b/hermes-sensor-bridge/src/cli/commands/remove.ts new file mode 100644 index 0000000..66af238 --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/remove.ts @@ -0,0 +1,140 @@ +import { readFile } from "node:fs/promises"; +import { join } from "node:path"; +import { + getBridgePaths, + readManifest, + removeSensorEntry, + writeManifest, +} from "../../supervisor/manifest.js"; +import { + getPort, + maybeReloadSupervisor, + printJson, + bridgePackageRoot, + removePath, + runCommand, + type ParsedArgs, +} from "../common.js"; + +export async function runRemoveCommand(args: ParsedArgs): Promise { + const sensorId = args._[0]; + if (!sensorId) { + throw new Error("Usage: world2agent-hermes remove [--purge]"); + } + + const paths = getBridgePaths(); + const manifest = await readManifest(paths); + const { manifest: nextManifest, removed } = removeSensorEntry(manifest, sensorId); + if (!removed) { + throw new Error(`Sensor not found: ${sensorId}`); + } + + if (removed.subscription_name) { + try { + await runCommand("hermes", ["webhook", "unsubscribe", removed.subscription_name]); + } catch (error) { + await removeSubscriptionFromFile(paths.webhookSubscriptionsFile, removed.subscription_name); + if (!(await subscriptionStillPresent(paths.webhookSubscriptionsFile, removed.subscription_name))) { + // fallback succeeded + } else { + throw error; + } + } + } + + await writeManifest(paths, nextManifest); + + const purge = args.flags.get("purge") === true; + if (purge) { + await removePath(join(paths.hermesSkillsDir, removed.skill_id)); + + const stillUsesPackage = nextManifest.sensors.some((entry) => entry.pkg === removed.pkg); + if (!stillUsesPackage) { + try { + await runCommand("npm", ["uninstall", "--no-save", removed.pkg], { + cwd: bridgePackageRoot(), + }); + } catch { + // best effort + } + } + } + + const reload = await maybeReloadSupervisor(getPort(args), paths); + printJson({ + ok: true, + removed, + purge, + reload, + }); +} + +async function removeSubscriptionFromFile( + file: string, + name: string, +): Promise { + try { + const raw = JSON.parse(await readFile(file, "utf8")) as unknown; + const next = stripSubscription(raw, name); + if (next === raw) return; + await import("../../supervisor/manifest.js").then(({ writeTextAtomic }) => + writeTextAtomic(file, JSON.stringify(next, null, 2) + "\n"), + ); + } catch { + // best effort + } +} + +async function subscriptionStillPresent( + file: string, + name: string, +): Promise { + try { + const raw = JSON.parse(await readFile(file, "utf8")) as unknown; + return containsSubscription(raw, name); + } catch { + return false; + } +} + +function stripSubscription(value: unknown, name: string): unknown { + if (Array.isArray(value)) { + return value.filter((item) => !matchesSubscription(item, name)); + } + if (!value || typeof value !== "object") { + return value; + } + + const obj = { ...(value as Record) }; + if (Array.isArray(obj.subscriptions)) { + obj.subscriptions = obj.subscriptions.filter((item) => !matchesSubscription(item, name)); + } + if (name in obj) { + delete obj[name]; + } + return obj; +} + +function containsSubscription(value: unknown, name: string): boolean { + if (Array.isArray(value)) { + return value.some((item) => matchesSubscription(item, name)); + } + if (!value || typeof value !== "object") { + return false; + } + + const obj = value as Record; + if (Array.isArray(obj.subscriptions)) { + return obj.subscriptions.some((item) => matchesSubscription(item, name)); + } + return name in obj; +} + +function matchesSubscription(value: unknown, name: string): boolean { + return ( + !!value && + typeof value === "object" && + ((value as Record).name === name || + (value as Record).subscription_name === name) + ); +} diff --git a/hermes-sensor-bridge/src/cli/commands/start.ts b/hermes-sensor-bridge/src/cli/commands/start.ts new file mode 100644 index 0000000..515f44e --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/start.ts @@ -0,0 +1,50 @@ +import { spawn } from "node:child_process"; +import { getBridgePaths } from "../../supervisor/manifest.js"; +import { + getPort, + hasFlag, + isSupervisorRunning, + printJson, + resolveSupervisorBin, + type ParsedArgs, +} from "../common.js"; + +export async function runStartCommand(args: ParsedArgs): Promise { + const port = getPort(args); + const detach = hasFlag(args, "detach"); + const paths = getBridgePaths(); + const existing = await isSupervisorRunning(paths); + if (existing.running) { + printJson({ + ok: true, + already_running: true, + pid: existing.pid, + }); + return; + } + + const child = spawn(process.execPath, [resolveSupervisorBin(), "--port", String(port)], { + cwd: process.cwd(), + detached: detach, + stdio: detach ? "ignore" : "inherit", + }); + + if (detach) { + child.unref(); + printJson({ + ok: true, + detached: true, + pid: child.pid, + port, + }); + return; + } + + await new Promise((resolve, reject) => { + child.on("error", reject); + child.on("close", (code) => { + if (code === 0) resolve(); + else reject(new Error(`Supervisor exited with code ${code}`)); + }); + }); +} diff --git a/hermes-sensor-bridge/src/cli/commands/status.ts b/hermes-sensor-bridge/src/cli/commands/status.ts new file mode 100644 index 0000000..84764a6 --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/status.ts @@ -0,0 +1,21 @@ +import { getBridgePaths } from "../../supervisor/manifest.js"; +import { + getPort, + isSupervisorRunning, + printJson, + readRuntimeState, + type ParsedArgs, +} from "../common.js"; + +export async function runStatusCommand(args: ParsedArgs): Promise { + const paths = getBridgePaths(); + const port = getPort(args); + const processState = await isSupervisorRunning(paths); + const runtime = await readRuntimeState(port, paths); + + printJson({ + ok: true, + process: processState, + runtime, + }); +} diff --git a/hermes-sensor-bridge/src/cli/commands/stop.ts b/hermes-sensor-bridge/src/cli/commands/stop.ts new file mode 100644 index 0000000..91ebf51 --- /dev/null +++ b/hermes-sensor-bridge/src/cli/commands/stop.ts @@ -0,0 +1,42 @@ +import { + getBridgePaths, + readPidFile, +} from "../../supervisor/manifest.js"; +import { printJson, waitForProcessExit } from "../common.js"; + +export async function runStopCommand(): Promise { + const paths = getBridgePaths(); + const pid = await readPidFile(paths); + if (!pid) { + printJson({ ok: true, stopped: false, reason: "not running" }); + return; + } + + try { + process.kill(pid, "SIGTERM"); + } catch (error) { + printJson({ + ok: true, + stopped: false, + pid, + reason: error instanceof Error ? error.message : String(error), + }); + return; + } + + const exited = await waitForProcessExit(pid, 5_000); + if (!exited) { + try { + process.kill(pid, "SIGKILL"); + } catch { + // no-op + } + } + + printJson({ + ok: true, + stopped: true, + pid, + forced: !exited, + }); +} diff --git a/hermes-sensor-bridge/src/cli/common.ts b/hermes-sensor-bridge/src/cli/common.ts new file mode 100644 index 0000000..2b5f67d --- /dev/null +++ b/hermes-sensor-bridge/src/cli/common.ts @@ -0,0 +1,581 @@ +import { spawn } from "node:child_process"; +import { randomBytes } from "node:crypto"; +import { createRequire } from "node:module"; +import { + appendFile, + mkdir, + readFile, + rm, + symlink, + writeFile, +} from "node:fs/promises"; +import { dirname, join, resolve } from "node:path"; +import { fileURLToPath } from "node:url"; +import { + getBridgePaths, + pathExists, + readPidFile, + readTrimmedText, + type BridgePaths, +} from "../supervisor/manifest.js"; + +export interface ParsedArgs { + _: string[]; + flags: Map; +} + +export interface InstalledPackageInfo { + packageJsonPath: string; + packageRoot: string; + packageJson: Record; +} + +export function parseArgs(argv: string[]): ParsedArgs { + const positionals: string[] = []; + const flags = new Map(); + + for (let index = 0; index < argv.length; index++) { + const arg = argv[index]!; + if (!arg.startsWith("--")) { + positionals.push(arg); + continue; + } + + const [name, inlineValue] = arg.slice(2).split("=", 2); + if (inlineValue !== undefined) { + flags.set(name, inlineValue); + continue; + } + + const next = argv[index + 1]; + if (next && !next.startsWith("--")) { + flags.set(name, next); + index += 1; + continue; + } + + flags.set(name, true); + } + + return { _: positionals, flags }; +} + +export function getStringFlag( + args: ParsedArgs, + name: string, +): string | undefined { + const value = args.flags.get(name); + return typeof value === "string" ? value : undefined; +} + +export function hasFlag(args: ParsedArgs, name: string): boolean { + return args.flags.get(name) === true; +} + +export function getPort(args: ParsedArgs): number { + const raw = getStringFlag(args, "port"); + if (!raw) return 8645; + + const port = Number(raw); + if (!Number.isInteger(port) || port <= 0 || port > 65535) { + throw new Error(`Invalid --port value: ${raw}`); + } + return port; +} + +export function printJson(value: unknown): void { + process.stdout.write(JSON.stringify(value, null, 2) + "\n"); +} + +export function bridgePackageRoot(): string { + return fileURLToPath(new URL("../../", import.meta.url)); +} + +export function resolveSupervisorBin(): string { + return fileURLToPath(new URL("../supervisor/bin.js", import.meta.url)); +} + +export async function resolveInstalledPackage( + pkg: string, +): Promise { + const require = createRequire(import.meta.url); + try { + const entryPath = require.resolve(pkg, { + paths: [bridgePackageRoot()], + }); + const packageJsonPath = await findNearestPackageJson(dirname(entryPath)); + const raw = JSON.parse( + await readFile(packageJsonPath, "utf8"), + ) as Record; + return { + packageJsonPath, + packageRoot: dirname(packageJsonPath), + packageJson: raw, + }; + } catch { + return null; + } +} + +export async function ensurePackageInstalled( + pkg: string, +): Promise { + const existing = await resolveInstalledPackage(pkg); + if (existing) return existing; + + const localRepo = await findLocalSensorRepo(pkg); + if (localRepo) { + await linkLocalPackage(pkg, localRepo); + const linked = await resolveInstalledPackage(pkg); + if (linked) return linked; + } + + await runCommand("npm", ["install", "--no-save", pkg], { + cwd: bridgePackageRoot(), + }); + const installed = await resolveInstalledPackage(pkg); + if (!installed) { + throw new Error(`Failed to resolve installed package ${pkg}`); + } + return installed; +} + +export async function callControl( + pathname: string, + options: { + method?: string; + port?: number; + paths?: BridgePaths; + } = {}, +): Promise { + const paths = options.paths ?? getBridgePaths(); + const token = await readTrimmedText(paths.controlTokenFile); + if (!token) { + throw new Error("Control token not found"); + } + + return fetch(`http://127.0.0.1:${options.port ?? 8645}${pathname}`, { + method: options.method ?? "GET", + headers: { + "X-W2A-Token": token, + }, + signal: AbortSignal.timeout(2_000), + }); +} + +export async function readRuntimeState( + port: number, + paths: BridgePaths, +): Promise<{ + health: unknown; + list: unknown; +} | null> { + try { + const [healthRes, listRes] = await Promise.all([ + callControl("/_w2a/health", { port, paths }), + callControl("/_w2a/list", { port, paths }), + ]); + if (!healthRes.ok || !listRes.ok) return null; + return { + health: await healthRes.json(), + list: await listRes.json(), + }; + } catch { + return null; + } +} + +export async function maybeReloadSupervisor( + port: number, + paths: BridgePaths, +): Promise { + try { + const response = await callControl("/_w2a/reload", { + method: "POST", + port, + paths, + }); + const payload = await response.json(); + if (!response.ok) { + throw new Error( + typeof payload?.error === "string" + ? payload.error + : `Reload failed with HTTP ${response.status}`, + ); + } + return payload; + } catch (error) { + return { + ok: false, + error: error instanceof Error ? error.message : String(error), + }; + } +} + +export async function isSupervisorRunning(paths: BridgePaths): Promise<{ + pid: number | null; + running: boolean; +}> { + const pid = await readPidFile(paths); + if (!pid) return { pid: null, running: false }; + + try { + process.kill(pid, 0); + return { pid, running: true }; + } catch (error) { + if (error instanceof Error && "code" in error && error.code === "EPERM") { + return { pid, running: true }; + } + return { pid, running: false }; + } +} + +export async function waitForProcessExit( + pid: number, + timeoutMs: number, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + try { + process.kill(pid, 0); + } catch (error) { + if (!(error instanceof Error && "code" in error && error.code === "EPERM")) { + return true; + } + } + await delay(100); + } + return false; +} + +export async function runCommand( + command: string, + args: string[], + options: { + cwd?: string; + env?: NodeJS.ProcessEnv; + } = {}, +): Promise<{ stdout: string; stderr: string }> { + return new Promise((resolvePromise, reject) => { + const child = spawn(command, args, { + cwd: options.cwd, + env: options.env ?? process.env, + stdio: ["ignore", "pipe", "pipe"], + }); + + let stdout = ""; + let stderr = ""; + child.stdout.setEncoding("utf8"); + child.stderr.setEncoding("utf8"); + child.stdout.on("data", (chunk) => { + stdout += chunk; + }); + child.stderr.on("data", (chunk) => { + stderr += chunk; + }); + child.on("error", reject); + child.on("close", (code) => { + if (code === 0) { + resolvePromise({ stdout, stderr }); + return; + } + reject( + new Error( + `${command} ${args.join(" ")} failed with code ${code}: ${ + stderr.trim() || stdout.trim() + }`, + ), + ); + }); + }); +} + +export async function removePath(path: string): Promise { + await rm(path, { force: true, recursive: true }); +} + +async function findLocalSensorRepo(pkg: string): Promise { + if (!pkg.startsWith("@world2agent/sensor-")) return null; + + const slug = pkg.split("/").pop()?.replace(/^sensor-/, ""); + if (!slug) return null; + + const candidate = resolve(bridgePackageRoot(), "..", "..", "world2agent-sensors", slug); + return (await pathExists(join(candidate, "package.json"))) ? candidate : null; +} + +async function linkLocalPackage(pkg: string, sourceDir: string): Promise { + const scope = pkg.split("/")[0]; + const name = pkg.split("/")[1]; + if (!scope || !name) { + throw new Error(`Invalid package name: ${pkg}`); + } + + const target = join(bridgePackageRoot(), "node_modules", scope, name); + await mkdir(dirname(target), { recursive: true }); + await removePath(target); + await symlink(sourceDir, target, "dir"); +} + +async function findNearestPackageJson(startDir: string): Promise { + let current = startDir; + for (;;) { + const candidate = join(current, "package.json"); + if (await pathExists(candidate)) { + return candidate; + } + const parent = dirname(current); + if (parent === current) { + throw new Error(`Could not find package.json above ${startDir}`); + } + current = parent; + } +} + +function delay(ms: number): Promise { + return new Promise((resolvePromise) => setTimeout(resolvePromise, ms)); +} + +const MANAGED_BLOCK_BEGIN = "# >>> world2agent-hermes-bridge (managed) >>>"; +const MANAGED_BLOCK_END = "# <<< world2agent-hermes-bridge (managed) <<<"; + +export interface EnsureHermesWebhookResult { + /** Webhook platform was already enabled before this call. */ + alreadyEnabled: boolean; + /** Where enablement was detected (or null when we just enabled it). */ + detectedVia: "config-yaml" | "managed-block" | null; + configYamlModified: boolean; + envModified: boolean; + configYamlFile: string; + hermesEnvFile: string; + webhookPort: number; + /** True if a Hermes gateway is running and needs a restart for new config. */ + gatewayRestartRequired: boolean; + /** True when this call wrote the WEBHOOK_SECRET / extra.secret. */ + secretWritten: boolean; +} + +/** + * Make sure Hermes's webhook platform is enabled and a top-level + * `platforms.webhook.*` config exists. + * + * Hermes's CLI (e.g. `hermes webhook subscribe`) reads `~/.hermes/config.yaml` + * to decide whether the webhook platform is configured; the gateway runtime + * additionally honours `WEBHOOK_*` env vars. We write both, fenced by marker + * comments so the change is idempotent and easy to revert by hand. + */ +export async function ensureHermesWebhookEnabled( + paths: BridgePaths = getBridgePaths(), + opts: { port?: number; secret?: string } = {}, +): Promise { + const port = opts.port ?? 8644; + + const yamlAlreadyEnabled = await detectWebhookEnabledInConfigYaml( + paths.hermesConfigYamlFile, + ); + const secret = opts.secret ?? randomBytes(32).toString("hex"); + + // YAML is the canonical source for the CLI: only patch it if not already + // declared. We never touch a hand-managed top-level `platforms:` block — + // ensureManagedBlockInConfigYaml throws in that case. + const configYamlModified = yamlAlreadyEnabled + ? false + : await ensureManagedBlockInConfigYaml(paths.hermesConfigYamlFile, port, secret); + + // Env is patched independently so we self-heal partial state (e.g. someone + // hand-enabled webhook in config.yaml but the gateway runtime still expects + // WEBHOOK_*). The block is marker-fenced and idempotent. + const envModified = await ensureManagedBlockInEnv(paths.hermesEnvFile, port, secret); + + const alreadyEnabled = yamlAlreadyEnabled; + const detectedVia: "config-yaml" | "managed-block" | null = alreadyEnabled + ? "config-yaml" + : !configYamlModified && !envModified + ? "managed-block" + : null; + const gatewayRestartRequired = + (configYamlModified || envModified) && (await isHermesGatewayRunning()); + + return { + alreadyEnabled, + detectedVia, + configYamlModified, + envModified, + configYamlFile: paths.hermesConfigYamlFile, + hermesEnvFile: paths.hermesEnvFile, + webhookPort: port, + gatewayRestartRequired, + secretWritten: configYamlModified || envModified, + }; +} + +async function ensureManagedBlockInConfigYaml( + configFile: string, + port: number, + secret: string, +): Promise { + let current = ""; + try { + current = await readFile(configFile, "utf8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") throw error; + } + + if (current.includes(MANAGED_BLOCK_BEGIN)) return false; + + if (hasUnmanagedTopLevelPlatforms(current)) { + throw new Error( + `~/.hermes/config.yaml already declares a top-level 'platforms:' block. ` + + `Add 'webhook: { enabled: true, extra: { host: "127.0.0.1", port: ${port}, secret: "" } }' under it manually, ` + + `or run 'hermes gateway setup' to use the wizard. ` + + `world2agent-hermes will not modify a hand-managed platforms section.`, + ); + } + + const block = [ + MANAGED_BLOCK_BEGIN, + "# Enables Hermes's webhook platform so world2agent-hermes can subscribe routes.", + "platforms:", + " webhook:", + " enabled: true", + " extra:", + ' host: "127.0.0.1"', + ` port: ${port}`, + ` secret: "${secret}"`, + MANAGED_BLOCK_END, + "", + ].join("\n"); + + await mkdir(dirname(configFile), { recursive: true }); + if (current.length === 0) { + await writeFile(configFile, block, "utf8"); + } else { + const prefix = current.endsWith("\n") ? "\n" : "\n\n"; + await appendFile(configFile, prefix + block, "utf8"); + } + return true; +} + +async function ensureManagedBlockInEnv( + envFile: string, + port: number, + secret: string, +): Promise { + let current = ""; + try { + current = await readFile(envFile, "utf8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") throw error; + } + if (current.includes(MANAGED_BLOCK_BEGIN)) return false; + + const block = [ + MANAGED_BLOCK_BEGIN, + "# Enables Hermes's webhook platform at the gateway runtime layer.", + "WEBHOOK_ENABLED=true", + `WEBHOOK_PORT=${port}`, + `WEBHOOK_SECRET=${secret}`, + MANAGED_BLOCK_END, + "", + ].join("\n"); + + await mkdir(dirname(envFile), { recursive: true }); + if (current.length === 0) { + await writeFile(envFile, block, "utf8"); + } else { + const prefix = current.endsWith("\n") ? "\n" : "\n\n"; + await appendFile(envFile, prefix + block, "utf8"); + } + return true; +} + +/** + * Returns true when a top-level `platforms:` key exists in the YAML and is NOT + * managed by us (so we should not touch it). A literal empty mapping + * (`platforms: {}`) is treated as unmanaged too — refuse to mutate it. + */ +function hasUnmanagedTopLevelPlatforms(yamlText: string): boolean { + if (!yamlText) return false; + const lines = yamlText.split(/\r?\n/); + let insideManaged = false; + for (const rawLine of lines) { + if (rawLine.includes(MANAGED_BLOCK_BEGIN)) { + insideManaged = true; + continue; + } + if (rawLine.includes(MANAGED_BLOCK_END)) { + insideManaged = false; + continue; + } + if (insideManaged) continue; + if (/^platforms\s*:/.test(rawLine)) return true; + } + return false; +} + +async function detectWebhookEnabledInConfigYaml(configFile: string): Promise { + let text: string; + try { + text = await readFile(configFile, "utf8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") return false; + throw error; + } + + const lines = text.split(/\r?\n/); + let topLevelPlatformsIndent = -1; + let webhookIndent = -1; + let inWebhookBlock = false; + + for (const rawLine of lines) { + const line = rawLine.replace(/\t/g, " "); + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + const indent = line.length - line.trimStart().length; + + if (topLevelPlatformsIndent === -1) { + if (indent === 0 && /^platforms\s*:/.test(trimmed)) { + topLevelPlatformsIndent = 0; + } + continue; + } + + if (indent <= topLevelPlatformsIndent && !/^platforms\s*:/.test(trimmed)) { + // exited the platforms block before finding webhook.enabled + topLevelPlatformsIndent = -1; + inWebhookBlock = false; + continue; + } + + if (!inWebhookBlock) { + const match = /^webhook\s*:\s*$/.exec(trimmed); + if (match) { + inWebhookBlock = true; + webhookIndent = indent; + } + continue; + } + + if (indent <= webhookIndent) { + inWebhookBlock = false; + continue; + } + + const enabledMatch = /^enabled\s*:\s*(\S+)/.exec(trimmed); + if (enabledMatch) { + const value = enabledMatch[1]!.replace(/[",]/g, "").toLowerCase(); + return value === "true" || value === "yes" || value === "1"; + } + } + return false; +} + +async function isHermesGatewayRunning(): Promise { + return new Promise((resolvePromise) => { + const child = spawn("pgrep", ["-fl", "hermes gateway run"], { + stdio: ["ignore", "ignore", "ignore"], + }); + child.on("error", () => resolvePromise(false)); + child.on("close", (code) => resolvePromise(code === 0)); + }); +} diff --git a/hermes-sensor-bridge/src/index.ts b/hermes-sensor-bridge/src/index.ts new file mode 100644 index 0000000..cb0ff5c --- /dev/null +++ b/hermes-sensor-bridge/src/index.ts @@ -0,0 +1 @@ +export {}; diff --git a/hermes-sensor-bridge/src/runner/bin.ts b/hermes-sensor-bridge/src/runner/bin.ts new file mode 100644 index 0000000..ea82a58 --- /dev/null +++ b/hermes-sensor-bridge/src/runner/bin.ts @@ -0,0 +1,136 @@ +#!/usr/bin/env node + +import { FileSensorStore, startSensor, type SensorSpec } from "@world2agent/sdk"; +import { stdoutTransport } from "@world2agent/sdk/transports"; +import { pathToFileURL } from "node:url"; +import { isAbsolute, resolve } from "node:path"; +import { readJsonFromStdin } from "./config-stream.js"; + +const EXIT_CONFIG_ERROR = 10; +const EXIT_IMPORT_ERROR = 11; +const EXIT_START_ERROR = 12; + +/** + * Sensor subprocess. The runner is intentionally channel-agnostic: + * + * - signals → one JSON line per signal on **stdout** (via SDK stdoutTransport) + * - diagnostics / sensor logs → **stderr** (via stderrLogger below) + * + * The supervisor parent reads stdout line-by-line as W2A signals and POSTs + * them to Hermes; stderr is appended to supervisor.log with a `[w2a/]` + * prefix. Mixing log text into stdout would break the parser, so every log + * path here goes through stderrLogger — even `console.log` / `console.info` + * are NOT used in this file. + */ +const stderrLogger = { + log: (...args: unknown[]) => console.error(...args), + info: (...args: unknown[]) => console.error(...args), + warn: (...args: unknown[]) => console.error(...args), + error: (...args: unknown[]) => console.error(...args), + debug: (...args: unknown[]) => console.error(...args), +}; + +async function main(): Promise { + const env = requireEnv(["W2A_PACKAGE", "W2A_SENSOR_ID", "W2A_STATE_PATH"]); + + let config: Record; + try { + config = await readJsonFromStdin(); + } catch (error) { + console.error(error); + process.exit(EXIT_CONFIG_ERROR); + } + + let spec: SensorSpec>; + try { + spec = await loadSensorSpec(env.W2A_PACKAGE); + } catch (error) { + console.error(error); + process.exit(EXIT_IMPORT_ERROR); + } + + const store = new FileSensorStore({ path: env.W2A_STATE_PATH }); + + let cleanup: (() => Promise | void) | undefined; + try { + cleanup = await startSensor(spec, { + config, + onSignal: stdoutTransport(), + store, + logger: stderrLogger, + logEmits: true, + }); + } catch (error) { + console.error(error); + await store.flush().catch(() => {}); + process.exit(EXIT_START_ERROR); + } + + let shuttingDown = false; + const shutdown = async () => { + if (shuttingDown) return; + shuttingDown = true; + + try { + await cleanup?.(); + await store.flush(); + } catch (error) { + console.error(error); + process.exit(1); + } + + process.exit(0); + }; + + process.on("SIGTERM", () => { + void shutdown(); + }); + process.on("SIGINT", () => { + void shutdown(); + }); + + const watchdog = setInterval(() => { + if (process.ppid === 1) { + console.error("[w2a-runner] parent died; shutting down"); + void shutdown(); + } + }, 5_000); + watchdog.unref(); + + await new Promise(() => {}); +} + +async function loadSensorSpec(pkg: string): Promise>> { + const module = await import(resolveImportTarget(pkg)); + const spec = module.default as SensorSpec> | undefined; + + if (!spec || typeof spec.start !== "function") { + throw new Error(`${pkg} does not export a valid default SensorSpec`); + } + + return spec; +} + +function resolveImportTarget(pkg: string): string { + if (pkg.startsWith(".") || pkg.startsWith("/") || isAbsolute(pkg)) { + return pathToFileURL(resolve(pkg)).href; + } + return pkg; +} + +function requireEnv(keys: string[]): Record { + const values: Record = {}; + for (const key of keys) { + const value = process.env[key]; + if (!value) { + throw new Error(`Missing required env var: ${key}`); + } + values[key] = value; + } + return values; +} + +main().catch((error) => { + console.error(error); + process.exit(99); +}); diff --git a/hermes-sensor-bridge/src/runner/config-stream.ts b/hermes-sensor-bridge/src/runner/config-stream.ts new file mode 100644 index 0000000..e845bf9 --- /dev/null +++ b/hermes-sensor-bridge/src/runner/config-stream.ts @@ -0,0 +1,26 @@ +export async function readJsonFromStdin(): Promise> { + process.stdin.setEncoding("utf8"); + + let raw = ""; + for await (const chunk of process.stdin) { + raw += chunk; + } + + const text = raw.trim(); + if (!text) return {}; + + let parsed: unknown; + try { + parsed = JSON.parse(text); + } catch (error) { + throw new Error( + `Invalid sensor config JSON on stdin: ${error instanceof Error ? error.message : String(error)}`, + ); + } + + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + throw new Error("Sensor config JSON must be an object"); + } + + return parsed as Record; +} diff --git a/hermes-sensor-bridge/src/supervisor/bin.ts b/hermes-sensor-bridge/src/supervisor/bin.ts new file mode 100644 index 0000000..a8d8c13 --- /dev/null +++ b/hermes-sensor-bridge/src/supervisor/bin.ts @@ -0,0 +1,118 @@ +#!/usr/bin/env node + +import { createWriteStream, type WriteStream } from "node:fs"; +import { + ensureBridgeDirs, + getBridgePaths, + isProcessAlive, + loadOrCreateControlToken, + loadOrCreateHmacSecret, + readManifest, + readPidFile, + removePidFile, + writePidFile, +} from "./manifest.js"; +import { SensorSupervisor } from "./spawn.js"; +import { startControlServer } from "./control-server.js"; +import { startGatewayWatch } from "./gateway-watch.js"; + +async function main(): Promise { + const port = parsePort(process.argv.slice(2)); + const paths = getBridgePaths(); + await ensureBridgeDirs(paths); + + const existingPid = await readPidFile(paths); + if (existingPid && existingPid !== process.pid && (await isProcessAlive(existingPid))) { + throw new Error(`Supervisor already running with pid ${existingPid}`); + } + + const logStream = createWriteStream(paths.supervisorLogFile, { flags: "a" }); + const log = createLogger(logStream); + + try { + await writePidFile(paths, process.pid); + + const hmacSecret = await loadOrCreateHmacSecret(paths); + const controlToken = await loadOrCreateControlToken(paths); + const supervisor = new SensorSupervisor({ paths, hmacSecret, log }); + const startedAt = Date.now(); + + const manifest = await readManifest(paths); + const controlServer = await startControlServer({ + paths, + supervisor, + token: controlToken, + port, + startedAt, + log, + }); + + let shuttingDown = false; + const shutdown = async (reason: string) => { + if (shuttingDown) return; + shuttingDown = true; + log(`[w2a/supervisor] shutting down (${reason})`); + + stopGatewayWatch(); + await controlServer.close().catch(() => {}); + await supervisor.terminateAll().catch((error) => { + log( + `[w2a/supervisor] terminateAll failed: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + }); + await removePidFile(paths).catch(() => {}); + await new Promise((resolve) => logStream.end(resolve)); + process.exit(0); + }; + + const stopGatewayWatch = await startGatewayWatch({ + gatewayPidFile: paths.gatewayPidFile, + log, + onGatewayExit: () => shutdown("gateway exited"), + }); + + process.on("SIGTERM", () => { + void shutdown("SIGTERM"); + }); + process.on("SIGINT", () => { + void shutdown("SIGINT"); + }); + + const applied = await supervisor.applyConfig(manifest.sensors); + log(`[w2a/supervisor] initial apply: ${JSON.stringify(applied)}`); + + await new Promise(() => {}); + } catch (error) { + log(`[w2a/supervisor] fatal: ${error instanceof Error ? error.stack ?? error.message : String(error)}`); + await removePidFile(paths).catch(() => {}); + await new Promise((resolve) => logStream.end(resolve)); + throw error; + } +} + +function createLogger(stream: WriteStream): (line: string) => void { + return (line: string) => { + const formatted = `[${new Date().toISOString()}] ${line}\n`; + process.stdout.write(formatted); + stream.write(formatted); + }; +} + +function parsePort(args: string[]): number { + const index = args.indexOf("--port"); + if (index === -1) return 8645; + + const raw = args[index + 1]; + const port = Number(raw); + if (!Number.isInteger(port) || port <= 0 || port > 65535) { + throw new Error(`Invalid --port value: ${String(raw)}`); + } + return port; +} + +main().catch((error) => { + console.error(error); + process.exit(1); +}); diff --git a/hermes-sensor-bridge/src/supervisor/control-server.ts b/hermes-sensor-bridge/src/supervisor/control-server.ts new file mode 100644 index 0000000..622a4e0 --- /dev/null +++ b/hermes-sensor-bridge/src/supervisor/control-server.ts @@ -0,0 +1,104 @@ +import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; +import type { SensorSupervisor } from "./spawn.js"; +import type { BridgePaths } from "./manifest.js"; +import { readManifest } from "./manifest.js"; + +interface ControlServerOptions { + paths: BridgePaths; + supervisor: SensorSupervisor; + token: string; + port: number; + startedAt: number; + log: (line: string) => void; +} + +export interface RunningControlServer { + close(): Promise; +} + +export async function startControlServer( + options: ControlServerOptions, +): Promise { + const server = createServer((req, res) => { + void handleRequest(req, res, options); + }); + + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(options.port, "127.0.0.1", () => { + server.off("error", reject); + resolve(); + }); + }); + + options.log(`[w2a/control] listening on http://127.0.0.1:${options.port}`); + + return { + close: () => + new Promise((resolve, reject) => { + server.close((error) => { + if (error) reject(error); + else resolve(); + }); + }), + }; +} + +async function handleRequest( + req: IncomingMessage, + res: ServerResponse, + options: ControlServerOptions, +): Promise { + if (!authorize(req, options.token)) { + writeJson(res, 401, { ok: false, error: "unauthorized" }); + return; + } + + const url = new URL(req.url ?? "/", "http://127.0.0.1"); + if (req.method === "GET" && url.pathname === "/_w2a/health") { + writeJson(res, 200, { + ok: true, + uptime_ms: Date.now() - options.startedAt, + child_count: options.supervisor.snapshot().length, + }); + return; + } + + if (req.method === "GET" && url.pathname === "/_w2a/list") { + writeJson(res, 200, { + ok: true, + handles: options.supervisor.snapshot(), + }); + return; + } + + if (req.method === "POST" && url.pathname === "/_w2a/reload") { + try { + const manifest = await readManifest(options.paths); + const applied = await options.supervisor.applyConfig(manifest.sensors); + writeJson(res, 200, { + ok: true, + applied, + }); + } catch (error) { + writeJson(res, 422, { + ok: false, + error: error instanceof Error ? error.message : String(error), + }); + } + return; + } + + writeJson(res, 404, { ok: false, error: "not found" }); +} + +function authorize(req: IncomingMessage, token: string): boolean { + return req.headers["x-w2a-token"] === token; +} + +function writeJson(res: ServerResponse, status: number, body: unknown): void { + const payload = JSON.stringify(body, null, 2); + res.statusCode = status; + res.setHeader("content-type", "application/json; charset=utf-8"); + res.end(payload); +} diff --git a/hermes-sensor-bridge/src/supervisor/gateway-watch.ts b/hermes-sensor-bridge/src/supervisor/gateway-watch.ts new file mode 100644 index 0000000..ba69442 --- /dev/null +++ b/hermes-sensor-bridge/src/supervisor/gateway-watch.ts @@ -0,0 +1,51 @@ +import { pathExists, readTrimmedText } from "./manifest.js"; + +interface GatewayWatchOptions { + gatewayPidFile: string; + log: (line: string) => void; + onGatewayExit: () => Promise | void; +} + +export async function startGatewayWatch( + options: GatewayWatchOptions, +): Promise<() => void> { + if (!(await pathExists(options.gatewayPidFile))) { + return () => {}; + } + + let stopping = false; + const timer = setInterval(() => { + void checkGatewayPid(options).catch((error) => { + options.log( + `[w2a/gateway-watch] error: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + }); + }, 10_000); + timer.unref(); + + return () => { + if (stopping) return; + stopping = true; + clearInterval(timer); + }; +} + +async function checkGatewayPid(options: GatewayWatchOptions): Promise { + const raw = await readTrimmedText(options.gatewayPidFile); + if (!raw) return; + + const pid = Number(raw); + if (!Number.isInteger(pid) || pid <= 0) return; + + try { + process.kill(pid, 0); + } catch (error) { + if (error instanceof Error && "code" in error && error.code === "EPERM") { + return; + } + options.log(`[w2a/gateway-watch] gateway pid ${pid} is gone; shutting down`); + await options.onGatewayExit(); + } +} diff --git a/hermes-sensor-bridge/src/supervisor/manifest.ts b/hermes-sensor-bridge/src/supervisor/manifest.ts new file mode 100644 index 0000000..aa11380 --- /dev/null +++ b/hermes-sensor-bridge/src/supervisor/manifest.ts @@ -0,0 +1,310 @@ +import { createHash, randomBytes } from "node:crypto"; +import { access, mkdir, readFile, rename, rm, writeFile } from "node:fs/promises"; +import { homedir } from "node:os"; +import { dirname, join } from "node:path"; +import { packageToSkillId } from "@world2agent/sdk"; + +export interface SensorEntry { + sensor_id: string; + pkg: string; + skill_id: string; + subscription_name?: string; + webhook_url: string; + enabled: boolean; + config: Record; +} + +export interface SensorManifest { + version: 1; + sensors: SensorEntry[]; +} + +export interface BridgePaths { + baseDir: string; + manifestFile: string; + hmacSecretFile: string; + controlTokenFile: string; + supervisorPidFile: string; + supervisorLogFile: string; + stateDir: string; + hermesHome: string; + hermesSkillsDir: string; + gatewayPidFile: string; + webhookSubscriptionsFile: string; + hermesEnvFile: string; + hermesConfigYamlFile: string; +} + +const DEFAULT_MANIFEST: SensorManifest = { + version: 1, + sensors: [], +}; + +export function getBridgePaths(env: NodeJS.ProcessEnv = process.env): BridgePaths { + const hermesHome = env.HERMES_HOME ?? join(homedir(), ".hermes"); + const baseDir = env.HERMES_HOME + ? join(hermesHome, "world2agent") + : join(homedir(), ".world2agent"); + + return { + baseDir, + manifestFile: join(baseDir, "sensors.json"), + hmacSecretFile: join(baseDir, ".hmac_secret"), + controlTokenFile: join(baseDir, ".control_token"), + supervisorPidFile: join(baseDir, "supervisor.pid"), + supervisorLogFile: join(baseDir, "supervisor.log"), + stateDir: join(baseDir, "state"), + hermesHome, + hermesSkillsDir: join(hermesHome, "skills"), + gatewayPidFile: join(hermesHome, "gateway.pid"), + webhookSubscriptionsFile: join(hermesHome, "webhook_subscriptions.json"), + hermesEnvFile: join(hermesHome, ".env"), + hermesConfigYamlFile: join(hermesHome, "config.yaml"), + }; +} + +export async function ensureBridgeDirs(paths: BridgePaths): Promise { + await mkdir(paths.baseDir, { recursive: true }); + await mkdir(paths.stateDir, { recursive: true }); + await mkdir(paths.hermesSkillsDir, { recursive: true }); +} + +export async function readManifest(paths: BridgePaths): Promise { + try { + const raw = await readFile(paths.manifestFile, "utf8"); + return parseManifest(JSON.parse(raw) as unknown); + } catch (error) { + if (isMissingFile(error)) { + return structuredClone(DEFAULT_MANIFEST); + } + throw error; + } +} + +export async function writeManifest( + paths: BridgePaths, + manifest: SensorManifest, +): Promise { + await ensureBridgeDirs(paths); + const normalized: SensorManifest = { + version: 1, + sensors: manifest.sensors.map(normalizeSensorEntry), + }; + await writeTextAtomic(paths.manifestFile, JSON.stringify(normalized, null, 2) + "\n"); +} + +export function upsertSensorEntry( + manifest: SensorManifest, + entry: SensorEntry, +): SensorManifest { + const normalized = normalizeSensorEntry(entry); + const sensors = manifest.sensors.filter((item) => item.sensor_id !== normalized.sensor_id); + sensors.push(normalized); + sensors.sort((a, b) => a.sensor_id.localeCompare(b.sensor_id)); + return { + version: 1, + sensors, + }; +} + +export function removeSensorEntry( + manifest: SensorManifest, + sensorId: string, +): { + manifest: SensorManifest; + removed: SensorEntry | null; +} { + const removed = manifest.sensors.find((entry) => entry.sensor_id === sensorId) ?? null; + return { + manifest: { + version: 1, + sensors: manifest.sensors.filter((entry) => entry.sensor_id !== sensorId), + }, + removed, + }; +} + +export function normalizeSensorEntry(entry: SensorEntry): SensorEntry { + return { + sensor_id: entry.sensor_id, + pkg: entry.pkg, + skill_id: entry.skill_id?.trim() ? entry.skill_id : packageToSkillId(entry.pkg), + subscription_name: entry.subscription_name, + webhook_url: entry.webhook_url, + enabled: entry.enabled !== false, + config: entry.config ?? {}, + }; +} + +export function defaultSensorId(pkg: string): string { + const suffix = pkg.split("/").pop() ?? pkg; + return suffix.replace(/^sensor-/, ""); +} + +export function stableStringify(value: unknown): string { + if (value === null || typeof value !== "object") { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return `[${value.map((item) => stableStringify(item)).join(",")}]`; + } + const obj = value as Record; + return `{${Object.keys(obj) + .sort() + .map((key) => `${JSON.stringify(key)}:${stableStringify(obj[key])}`) + .join(",")}}`; +} + +export function hashConfig(config: unknown): string { + return createHash("sha1").update(stableStringify(config)).digest("hex"); +} + +export async function loadOrCreateHmacSecret( + paths: BridgePaths, + override?: string, +): Promise { + if (override) { + await writeTextAtomic(paths.hmacSecretFile, `${override}\n`); + return override; + } + + const existing = await readTrimmedText(paths.hmacSecretFile); + if (existing) return existing; + + const secret = randomBytes(32).toString("hex"); + await writeTextAtomic(paths.hmacSecretFile, `${secret}\n`); + return secret; +} + +export async function loadOrCreateControlToken(paths: BridgePaths): Promise { + const existing = await readTrimmedText(paths.controlTokenFile); + if (existing) return existing; + + const token = randomBytes(32).toString("hex"); + await writeTextAtomic(paths.controlTokenFile, `${token}\n`); + return token; +} + +export async function readTrimmedText(path: string): Promise { + try { + return (await readFile(path, "utf8")).trim() || null; + } catch (error) { + if (isMissingFile(error)) return null; + throw error; + } +} + +export async function writeTextAtomic(path: string, content: string): Promise { + await mkdir(dirname(path), { recursive: true }); + const tmp = `${path}.${process.pid}.${Date.now()}.tmp`; + await writeFile(tmp, content, "utf8"); + await rename(tmp, path); +} + +export async function writePidFile(paths: BridgePaths, pid: number): Promise { + await writeTextAtomic(paths.supervisorPidFile, `${pid}\n`); +} + +export async function readPidFile(paths: BridgePaths): Promise { + const raw = await readTrimmedText(paths.supervisorPidFile); + if (!raw) return null; + + const pid = Number(raw); + return Number.isInteger(pid) && pid > 0 ? pid : null; +} + +export async function removePidFile(paths: BridgePaths): Promise { + await rm(paths.supervisorPidFile, { force: true }); +} + +export async function isProcessAlive(pid: number): Promise { + try { + process.kill(pid, 0); + return true; + } catch (error) { + if (isNodeError(error) && error.code === "EPERM") return true; + return false; + } +} + +export async function pathExists(path: string): Promise { + try { + await access(path); + return true; + } catch { + return false; + } +} + +function parseManifest(raw: unknown): SensorManifest { + if (!raw || typeof raw !== "object" || Array.isArray(raw)) { + throw new Error("Manifest must be a JSON object"); + } + + const version = (raw as Record).version; + const sensors = (raw as Record).sensors; + if (version !== 1) { + throw new Error(`Unsupported manifest version: ${String(version)}`); + } + if (!Array.isArray(sensors)) { + throw new Error("Manifest field `sensors` must be an array"); + } + + return { + version: 1, + sensors: sensors.map((entry, index) => parseSensorEntry(entry, index)), + }; +} + +function parseSensorEntry(raw: unknown, index: number): SensorEntry { + if (!raw || typeof raw !== "object" || Array.isArray(raw)) { + throw new Error(`Manifest sensor[${index}] must be an object`); + } + + const entry = raw as Record; + const sensorId = expectString(entry.sensor_id, `sensor[${index}].sensor_id`); + const pkg = expectString(entry.pkg, `sensor[${index}].pkg`); + const webhookUrl = expectString(entry.webhook_url, `sensor[${index}].webhook_url`); + const enabled = entry.enabled === undefined ? true : Boolean(entry.enabled); + const config = entry.config; + + if (!config || typeof config !== "object" || Array.isArray(config)) { + throw new Error(`sensor[${index}].config must be an object`); + } + + const subscriptionName = + entry.subscription_name === undefined + ? undefined + : expectString(entry.subscription_name, `sensor[${index}].subscription_name`); + + const skillIdRaw = entry.skill_id; + const skillId = + typeof skillIdRaw === "string" && skillIdRaw.trim() !== "" + ? skillIdRaw + : packageToSkillId(pkg); + + return { + sensor_id: sensorId, + pkg, + skill_id: skillId, + subscription_name: subscriptionName, + webhook_url: webhookUrl, + enabled, + config: config as Record, + }; +} + +function expectString(value: unknown, label: string): string { + if (typeof value !== "string" || value.trim() === "") { + throw new Error(`${label} must be a non-empty string`); + } + return value; +} + +function isMissingFile(error: unknown): boolean { + return isNodeError(error) && error.code === "ENOENT"; +} + +function isNodeError(error: unknown): error is NodeJS.ErrnoException { + return error instanceof Error && "code" in error; +} diff --git a/hermes-sensor-bridge/src/supervisor/spawn.ts b/hermes-sensor-bridge/src/supervisor/spawn.ts new file mode 100644 index 0000000..ccb608a --- /dev/null +++ b/hermes-sensor-bridge/src/supervisor/spawn.ts @@ -0,0 +1,476 @@ +import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; +import { createHmac } from "node:crypto"; +import { once } from "node:events"; +import { fileURLToPath } from "node:url"; +import type { BridgePaths, SensorEntry } from "./manifest.js"; +import { hashConfig } from "./manifest.js"; + +export interface ChildHandle { + sensorId: string; + pkg: string; + skillId: string; + configHash: string; + webhookUrl: string; + process: ChildProcessWithoutNullStreams; + startedAt: number; + restartCount: number; + lastExitCode: number | null; + stopping: boolean; +} + +export interface ApplyResult { + started: string[]; + restarted: string[]; + stopped: string[]; + failed: Array<{ sensor_id: string; error: string }>; +} + +export interface HandleSnapshot { + sensor_id: string; + pkg: string; + skill_id: string; + webhook_url: string; + config_hash: string; + pid: number | undefined; + started_at: number; + restart_count: number; + last_exit_code: number | null; +} + +interface SensorSupervisorOptions { + paths: BridgePaths; + hmacSecret: string; + log: (line: string) => void; +} + +// Exit codes the runner produces deliberately and which should NOT trigger +// a backoff restart loop: +// 0 = clean shutdown (SIGTERM after cleanup) +// 10 = config parse failure +// 11 = sensor package import / SensorSpec validation failure +const NO_RESTART_EXIT_CODES = new Set([0, 10, 11]); + +export class SensorSupervisor { + private readonly paths: BridgePaths; + private readonly hmacSecret: string; + private readonly log: (line: string) => void; + private readonly handles = new Map(); + private readonly desiredEntries = new Map(); + private readonly restartTimers = new Map(); + private readonly runnerBin = fileURLToPath(new URL("../runner/bin.js", import.meta.url)); + + constructor(options: SensorSupervisorOptions) { + this.paths = options.paths; + this.hmacSecret = options.hmacSecret; + this.log = options.log; + } + + snapshot(): HandleSnapshot[] { + return [...this.handles.values()] + .map((handle) => ({ + sensor_id: handle.sensorId, + pkg: handle.pkg, + skill_id: handle.skillId, + webhook_url: handle.webhookUrl, + config_hash: handle.configHash, + pid: handle.process.pid, + started_at: handle.startedAt, + restart_count: handle.restartCount, + last_exit_code: handle.lastExitCode, + })) + .sort((a, b) => a.sensor_id.localeCompare(b.sensor_id)); + } + + async spawn(entry: SensorEntry, restartCount = 0): Promise { + this.clearRestartTimer(entry.sensor_id); + + // The runner does not need webhook URL or HMAC secret — those live in + // the supervisor where signal delivery happens. Keeping secrets out of + // the child env reduces leak surface. + const proc = spawn(process.execPath, [this.runnerBin], { + env: { + ...process.env, + W2A_PACKAGE: entry.pkg, + W2A_SENSOR_ID: entry.sensor_id, + W2A_STATE_PATH: `${this.paths.stateDir}/${entry.sensor_id}.json`, + W2A_LOG_LEVEL: process.env.W2A_LOG_LEVEL ?? "info", + }, + stdio: ["pipe", "pipe", "pipe"], + }); + + const handle: ChildHandle = { + sensorId: entry.sensor_id, + pkg: entry.pkg, + skillId: entry.skill_id, + configHash: hashConfig(entry.config), + webhookUrl: entry.webhook_url, + process: proc, + startedAt: Date.now(), + restartCount, + lastExitCode: null, + stopping: false, + }; + + this.handles.set(entry.sensor_id, handle); + this.attachChildStreams(handle); + proc.on("exit", (code, signal) => { + void this.handleExit(handle, code, signal); + }); + + proc.stdin.end(JSON.stringify(entry.config ?? {}) + "\n"); + this.log( + `[w2a/${entry.sensor_id}] spawned pid=${proc.pid ?? "unknown"} pkg=${entry.pkg}`, + ); + return handle; + } + + async terminate(handle: ChildHandle, graceMs = 5_000): Promise { + this.clearRestartTimer(handle.sensorId); + handle.stopping = true; + + if (handle.process.exitCode !== null || handle.process.killed) { + this.handles.delete(handle.sensorId); + return; + } + + const exitPromise = once(handle.process, "exit").catch(() => []); + + try { + handle.process.kill("SIGTERM"); + } catch { + this.handles.delete(handle.sensorId); + return; + } + + const timedOut = await Promise.race([ + exitPromise.then(() => false), + delay(graceMs).then(() => true), + ]); + + if (timedOut) { + try { + handle.process.kill("SIGKILL"); + } catch { + // no-op + } + await exitPromise; + } + + this.handles.delete(handle.sensorId); + } + + async applyConfig(entries: SensorEntry[]): Promise { + const result: ApplyResult = { + started: [], + restarted: [], + stopped: [], + failed: [], + }; + + this.desiredEntries.clear(); + for (const entry of entries) { + if (entry.enabled !== false) { + this.desiredEntries.set(entry.sensor_id, entry); + } + } + + for (const sensorId of this.restartTimers.keys()) { + if (!this.desiredEntries.has(sensorId)) { + this.clearRestartTimer(sensorId); + } + } + + for (const [sensorId, handle] of [...this.handles.entries()]) { + if (!this.desiredEntries.has(sensorId)) { + await this.terminate(handle); + result.stopped.push(sensorId); + } + } + + for (const [sensorId, entry] of this.desiredEntries.entries()) { + this.clearRestartTimer(sensorId); + + const handle = this.handles.get(sensorId); + if (!handle) { + try { + await this.spawn(entry); + result.started.push(sensorId); + } catch (error) { + result.failed.push({ sensor_id: sensorId, error: errorMessage(error) }); + } + continue; + } + + if (this.matchesEntry(handle, entry)) { + continue; + } + + try { + await this.terminate(handle); + await this.spawn(entry); + result.restarted.push(sensorId); + } catch (error) { + result.failed.push({ sensor_id: sensorId, error: errorMessage(error) }); + } + } + + return result; + } + + async terminateAll(graceMs = 5_000): Promise { + this.desiredEntries.clear(); + for (const sensorId of this.restartTimers.keys()) { + this.clearRestartTimer(sensorId); + } + for (const handle of [...this.handles.values()]) { + await this.terminate(handle, graceMs); + } + } + + private matchesEntry(handle: ChildHandle, entry: SensorEntry): boolean { + return ( + handle.pkg === entry.pkg && + handle.skillId === entry.skill_id && + handle.webhookUrl === entry.webhook_url && + handle.configHash === hashConfig(entry.config) + ); + } + + private attachChildStreams(handle: ChildHandle): void { + // stdout: every line is a W2A signal as JSON. Parse and dispatch. + pipeStream(handle.process.stdout, (line) => { + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch (error) { + this.log( + `[w2a/${handle.sensorId}] dropped non-JSON line on stdout: ${truncate(line, 240)}`, + ); + return; + } + + void this.deliverSignal(handle, parsed).catch((error) => { + this.log( + `[w2a/${handle.sensorId}] delivery error: ${errorMessage(error)}`, + ); + }); + }); + + // stderr: sensor / runner diagnostics. Forward verbatim with prefix. + pipeStream(handle.process.stderr, (line) => { + this.log(`[w2a/${handle.sensorId}] ${line}`); + }); + } + + /** + * Render a signal into a Hermes-shaped POST and ship it to the route + * recorded for this sensor. Retries on 5xx / network errors, fails fast + * on 4xx (including 401 from a HMAC mismatch — those are configuration + * problems, not transient). + */ + private async deliverSignal(handle: ChildHandle, signal: unknown): Promise { + if (!signal || typeof signal !== "object") { + this.log(`[w2a/${handle.sensorId}] dropped non-object signal`); + return; + } + const obj = signal as Record; + const signalId = typeof obj.signal_id === "string" ? obj.signal_id : undefined; + if (!signalId) { + this.log(`[w2a/${handle.sensorId}] dropped signal missing signal_id`); + return; + } + + const body = JSON.stringify({ + prompt: renderPrompt(obj), + signal: obj, + }); + + const headers: Record = { + "content-type": "application/json", + "x-request-id": signalId, + }; + if (this.hmacSecret && this.hmacSecret !== "INSECURE_NO_AUTH") { + headers["x-webhook-signature"] = createHmac("sha256", this.hmacSecret) + .update(body) + .digest("hex"); + } + + try { + await httpPost(handle.webhookUrl, body, headers, { + timeoutMs: DELIVERY_TIMEOUT_MS, + maxAttempts: DELIVERY_MAX_ATTEMPTS, + baseDelayMs: DELIVERY_BASE_DELAY_MS, + }); + } catch (error) { + this.log( + `[w2a/${handle.sensorId}] POST failed for signal ${signalId}: ${errorMessage(error)}`, + ); + } + } + + private async handleExit( + handle: ChildHandle, + code: number | null, + signal: NodeJS.Signals | null, + ): Promise { + handle.lastExitCode = code; + + const current = this.handles.get(handle.sensorId); + if (current !== handle) return; + + this.handles.delete(handle.sensorId); + this.log( + `[w2a/${handle.sensorId}] exited code=${String(code)} signal=${String(signal)}`, + ); + + if (handle.stopping) return; + if (code !== null && NO_RESTART_EXIT_CODES.has(code)) return; + + const nextEntry = this.desiredEntries.get(handle.sensorId); + if (!nextEntry) return; + + const nextRestartCount = handle.restartCount + 1; + const delayMs = restartDelayMs(nextRestartCount); + this.log( + `[w2a/${handle.sensorId}] scheduling restart in ${delayMs}ms (restart #${nextRestartCount})`, + ); + + const timer = setTimeout(() => { + this.restartTimers.delete(handle.sensorId); + void this.spawn(nextEntry, nextRestartCount).catch((error) => { + this.log( + `[w2a/${handle.sensorId}] restart failed: ${errorMessage(error)}`, + ); + }); + }, delayMs); + timer.unref(); + this.restartTimers.set(handle.sensorId, timer); + } + + private clearRestartTimer(sensorId: string): void { + const timer = this.restartTimers.get(sensorId); + if (!timer) return; + clearTimeout(timer); + this.restartTimers.delete(sensorId); + } +} + +const DELIVERY_TIMEOUT_MS = 10_000; +const DELIVERY_MAX_ATTEMPTS = 3; // initial + 2 retries +const DELIVERY_BASE_DELAY_MS = 500; + +interface HttpPostOptions { + timeoutMs: number; + maxAttempts: number; + baseDelayMs: number; +} + +/** + * POST a body with retry on transient failures (network errors and 5xx). + * 4xx is treated as permanent and propagated immediately. + */ +export async function httpPost( + url: string, + body: string, + headers: Record, + opts: HttpPostOptions, +): Promise { + let lastError: unknown; + for (let attempt = 0; attempt < opts.maxAttempts; attempt++) { + let res: Response; + try { + res = await fetch(url, { + method: "POST", + headers, + body, + signal: AbortSignal.timeout(opts.timeoutMs), + }); + } catch (error) { + lastError = error; + if (attempt < opts.maxAttempts - 1) { + await delay(opts.baseDelayMs * 2 ** attempt); + } + continue; + } + + if (res.ok) return; + + if (res.status >= 400 && res.status < 500) { + const text = await res.text().catch(() => ""); + throw new Error(`HTTP ${res.status}: ${text}`); + } + + lastError = new Error(`HTTP ${res.status}`); + if (attempt < opts.maxAttempts - 1) { + await delay(opts.baseDelayMs * 2 ** attempt); + } + } + throw lastError; +} + +/** + * Render a W2A signal into a Markdown prompt body that the Hermes-side + * skill can read directly. The full signal is appended as a fenced JSON + * block so the skill can parse structured fields when it needs to. + */ +export function renderPrompt(signal: Record): string { + const event = (signal.event ?? {}) as Record; + const type = typeof event.type === "string" ? event.type : "unknown"; + const summary = typeof event.summary === "string" ? event.summary : ""; + const attachments = Array.isArray(signal.attachments) ? signal.attachments : []; + const attachmentLines = attachments + .map((a) => { + const obj = (a ?? {}) as Record; + const media = typeof obj.media_type === "string" ? obj.media_type : "text/plain"; + const title = typeof obj.title === "string" ? obj.title : ""; + return `[${media}] ${title}`.trimEnd(); + }) + .filter(Boolean); + + const parts: string[] = [`[W2A Signal] ${type}`, ""]; + if (summary) parts.push(summary, ""); + if (attachmentLines.length) { + parts.push("Attachments:", ...attachmentLines, ""); + } + parts.push("Signal JSON:", "```json", JSON.stringify(signal, null, 2), "```"); + return parts.join("\n"); +} + +function truncate(text: string, max: number): string { + return text.length <= max ? text : `${text.slice(0, max)}...[+${text.length - max}]`; +} + +function pipeStream( + stream: NodeJS.ReadableStream, + onLine: (line: string) => void, +): void { + let buffer = ""; + stream.setEncoding?.("utf8"); + stream.on("data", (chunk: string | Buffer) => { + buffer += String(chunk); + for (;;) { + const index = buffer.indexOf("\n"); + if (index === -1) break; + const line = buffer.slice(0, index).replace(/\r$/, ""); + buffer = buffer.slice(index + 1); + if (line) onLine(line); + } + }); + stream.on("end", () => { + const line = buffer.replace(/\r$/, ""); + if (line) onLine(line); + }); +} + +function restartDelayMs(restartCount: number): number { + if (restartCount >= 10) return 60 * 60 * 1000; + return Math.min(1_000 * 2 ** Math.max(0, restartCount - 1), 300_000); +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/hermes-sensor-bridge/tsconfig.json b/hermes-sensor-bridge/tsconfig.json new file mode 100644 index 0000000..1c630f6 --- /dev/null +++ b/hermes-sensor-bridge/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ESNext", + "moduleResolution": "bundler", + "esModuleInterop": true, + "strict": true, + "skipLibCheck": true, + "outDir": "dist", + "declaration": true, + "types": ["node"] + }, + "include": ["src/**/*"] +}