diff --git a/README.ja.md b/README.ja.md index e0becca8..5c212188 100644 --- a/README.ja.md +++ b/README.ja.md @@ -100,7 +100,7 @@ CodeMux はチャットにとどまりません — 開発ワークフローを - **LAN**: IPアドレスの自動検出 + QRコードで、数秒で準備完了 - **パブリックインターネット**: ワンクリックで [Cloudflare Tunnel](https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/) — ポート転送、VPN、ファイアウォール変更は一切不要。**クイックトンネル**(ランダムな一時URL、設定不要)と**ネームドトンネル**(`~/.cloudflared/` 認証情報による永続カスタムドメイン)の両方をサポート -- **セキュリティ内蔵**: デバイス認証、JWT トークン、Cloudflare 経由のHTTPS; クイックトンネルURLは再起動ごとにローテーション、ネームドトンネルはカスタムホスト名を維持 +- **セキュリティ内蔵**: デバイス認証、JWT トークン、Cloudflare 経由のHTTPS; クイックトンネルURLはトンネル自体を作り直したときにローテーションし、ネームドトンネルはカスタムホスト名を維持 #### IM ボットチャネル diff --git a/README.ko.md b/README.ko.md index 166f1d33..4be3971c 100644 --- a/README.ko.md +++ b/README.ko.md @@ -100,7 +100,7 @@ CodeMux는 채팅을 넘어 — 개발 워크플로를 인터페이스에서 직 - **LAN**: 자동 감지된 IP + QR 코드, 수초 내 준비 완료 - **공용 인터넷**: 원클릭 [Cloudflare Tunnel](https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/) — 포트 포워딩, VPN, 방화벽 변경 불필요. **퀵 터널**(랜덤 임시 URL, 제로 설정)과 **네임드 터널**(`~/.cloudflared/` 인증 정보를 통한 영구 커스텀 도메인) 모두 지원 -- **내장 보안**: 기기 인증, JWT 토큰, Cloudflare를 통한 HTTPS; 퀵 터널 URL은 재시작마다 변경, 네임드 터널은 커스텀 호스트명 유지 +- **내장 보안**: 기기 인증, JWT 토큰, Cloudflare를 통한 HTTPS; 퀵 터널 URL은 터널 자체를 다시 만들 때 변경되며, 네임드 터널은 커스텀 호스트명을 유지합니다 #### IM 봇 채널 diff --git a/README.md b/README.md index 61029397..a273856e 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Access your coding agents from any device — phone, tablet, or another machine - **LAN**: Auto-detected IP + QR code, ready in seconds - **Public Internet**: One-click [Cloudflare Tunnel](https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/) — no port forwarding, no VPN, no firewall changes. Supports both **quick tunnels** (random ephemeral URL, zero config) and **named tunnels** (persistent custom domain via `~/.cloudflared/` credentials) -- **Security built-in**: Device authorization, JWT tokens, HTTPS via Cloudflare; quick tunnel URLs rotate on every restart, named tunnels preserve your custom hostname +- **Security built-in**: Device authorization, JWT tokens, HTTPS via Cloudflare; quick tunnel URLs rotate whenever the tunnel itself is recreated, while named tunnels preserve your custom hostname #### IM Bot Channels @@ -201,6 +201,7 @@ bun run server:dev # Run the same headless dev stack in the background bun run server:up bun run server:status +bun run server:restart bun run server:down # Run the headless dev stack and start a Cloudflare quick tunnel @@ -215,6 +216,8 @@ bun run server:access-requests `bun run start` is still the lightest option for a web-only standalone server. The desktop app's "Public Access" toggle manages Cloudflare inside the packaged app; on a headless dev server, `bun run server:tunnel` provides the equivalent quick-tunnel workflow from the shell. +If you want to restart CodeMux itself without rotating the current quick-tunnel URL, use `bun run server:restart`. It restarts the managed app process and keeps the existing `cloudflared` process alive whenever possible, so remote browsers can usually stay on the same public origin. + `bun run server:tunnel` now prints the access code after startup. When a remote browser submits that code, you can stay entirely in SSH and run `bun run server:access-requests` to review and interactively approve or deny pending requests. If you started CodeMux with `bun run server:dev`, open a second SSH session and run `bun run server:access-code` / `bun run server:access-requests`. @@ -301,6 +304,7 @@ bun run dev # Electron + Vite HMR bun run server:dev # Foreground headless Electron dev bun run server:up # Background headless Electron dev bun run server:tunnel # Background headless Electron dev + quick tunnel +bun run server:restart # Restart app only; preserve managed quick tunnel when possible bun run server:access-code # Print the current 6-digit access code bun run server:access-requests # Interactively review pending remote access requests bun run server:down # Stop background headless Electron dev diff --git a/README.ru.md b/README.ru.md index 8e8db12c..05b33203 100644 --- a/README.ru.md +++ b/README.ru.md @@ -100,7 +100,7 @@ CodeMux выходит за рамки чата — предоставляет - **LAN**: Автоматически определённый IP + QR-код, готово за секунды - **Публичный интернет**: Одним кликом [Cloudflare Tunnel](https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/) — без проброса портов, VPN и изменений файрвола. Поддерживаются как **быстрые туннели** (случайный временный URL, без настройки), так и **именованные туннели** (постоянный пользовательский домен через учётные данные `~/.cloudflared/`) -- **Встроенная безопасность**: Авторизация устройств, JWT-токены, HTTPS через Cloudflare; URL быстрых туннелей меняются при каждом перезапуске, именованные туннели сохраняют ваш домен +- **Встроенная безопасность**: Авторизация устройств, JWT-токены, HTTPS через Cloudflare; URL быстрых туннелей меняются при пересоздании самого туннеля, а именованные туннели сохраняют ваш домен #### Каналы IM-ботов diff --git a/README.zh-CN.md b/README.zh-CN.md index 480f77a0..c5b82736 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -100,7 +100,7 @@ CodeMux 不只是聊天 —— 它提供集成工具,让你直接在界面中 - **局域网**:自动检测 IP + 二维码,几秒内即可就绪 - **公网**:一键 [Cloudflare Tunnel](https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/) —— 无需端口转发、无需 VPN、无需防火墙更改。支持**快速隧道**(随机临时 URL,零配置)和**命名隧道**(通过 `~/.cloudflared/` 凭证持久化自定义域名) -- **内置安全机制**:设备授权、JWT 令牌、通过 Cloudflare 的 HTTPS;快速隧道 URL 每次重启时轮换,命名隧道保留你的自定义主机名 +- **内置安全机制**:设备授权、JWT 令牌、通过 Cloudflare 的 HTTPS;快速隧道 URL 会在隧道本身被重建时轮换,命名隧道保留你的自定义主机名 #### IM 机器人渠道 diff --git a/bun.lock b/bun.lock index dd788c88..8e7df935 100644 --- a/bun.lock +++ b/bun.lock @@ -6,7 +6,7 @@ "name": "codemux", "dependencies": { "@anthropic-ai/claude-agent-sdk": "^0.2.63", - "@github/copilot-sdk": "0.2.0", + "@github/copilot-sdk": "0.2.2", "@larksuiteoapi/node-sdk": "^1.42.0", "@opencode-ai/sdk": "^1.2.15", "@parcel/watcher": "^2.5.1", @@ -190,21 +190,21 @@ "@eslint/plugin-kit": ["@eslint/plugin-kit@0.6.1", "", { "dependencies": { "@eslint/core": "^1.1.1", "levn": "^0.4.1" } }, "sha512-iH1B076HoAshH1mLpHMgwdGeTs0CYwL0SPMkGuSebZrwBp16v415e9NZXg2jtrqPVQjf6IANe2Vtlr5KswtcZQ=="], - "@github/copilot": ["@github/copilot@1.0.11", "", { "optionalDependencies": { "@github/copilot-darwin-arm64": "1.0.11", "@github/copilot-darwin-x64": "1.0.11", "@github/copilot-linux-arm64": "1.0.11", "@github/copilot-linux-x64": "1.0.11", "@github/copilot-win32-arm64": "1.0.11", "@github/copilot-win32-x64": "1.0.11" }, "bin": { "copilot": "npm-loader.js" } }, "sha512-cptVopko/tNKEXyBP174yBjHQBEwg6CqaKN2S0M3J+5LEB8u31bLL75ioOPd+5vubqBrA0liyTdcHeZ8UTRbmg=="], + "@github/copilot": ["@github/copilot@1.0.31", "", { "optionalDependencies": { "@github/copilot-darwin-arm64": "1.0.31", "@github/copilot-darwin-x64": "1.0.31", "@github/copilot-linux-arm64": "1.0.31", "@github/copilot-linux-x64": "1.0.31", "@github/copilot-win32-arm64": "1.0.31", "@github/copilot-win32-x64": "1.0.31" }, "bin": { "copilot": "npm-loader.js" } }, "sha512-AfoVW9pHsKQGtLCpPcvQ8TOwBVF8meo5srle/8cqRSsx882CpIQx5C4uNs6zwrCtqMTo8M8D6zlDIbXkLudrXw=="], - "@github/copilot-darwin-arm64": ["@github/copilot-darwin-arm64@1.0.11", "", { "os": "darwin", "cpu": "arm64", "bin": { "copilot-darwin-arm64": "copilot" } }, "sha512-wdKimjtbsVeXqMqQSnGpGBPFEYHljxXNuWeH8EIJTNRgFpAsimcivsFgql3Twq4YOp0AxfsH36icG4IEen30mA=="], + "@github/copilot-darwin-arm64": ["@github/copilot-darwin-arm64@1.0.31", "", { "os": "darwin", "cpu": "arm64", "bin": { "copilot-darwin-arm64": "copilot" } }, "sha512-DnAbe87U55/egBu/SFdMniQfhnYjfP3ZXXhrba3DZMXQI+91iRAGfPFKAsSlekl0zfNFw8toOkiafr9Hu2lHvA=="], - "@github/copilot-darwin-x64": ["@github/copilot-darwin-x64@1.0.11", "", { "os": "darwin", "cpu": "x64", "bin": { "copilot-darwin-x64": "copilot" } }, "sha512-VeuPv8rzBVGBB8uDwMEhcHBpldoKaq26yZ5YQm+G9Ka5QIF+1DMah8ZNRMVsTeNKkb1ji9G8vcuCsaPbnG3fKg=="], + "@github/copilot-darwin-x64": ["@github/copilot-darwin-x64@1.0.31", "", { "os": "darwin", "cpu": "x64", "bin": { "copilot-darwin-x64": "copilot" } }, "sha512-mFmuYT3N1JE3zRIwCAPaXGDstL8Npa62Jey3vT4Lo003NfzQrBzvZ4ObAVMTmFQ6pRZzj39rTTKp1vLYGg+K0w=="], - "@github/copilot-linux-arm64": ["@github/copilot-linux-arm64@1.0.11", "", { "os": "linux", "cpu": "arm64", "bin": { "copilot-linux-arm64": "copilot" } }, "sha512-/d8p6RlFYKj1Va2hekFIcYNMHWagcEkaxgcllUNXSyQLnmEtXUkaWtz62VKGWE+n/UMkEwCB6vI2xEwPTlUNBQ=="], + "@github/copilot-linux-arm64": ["@github/copilot-linux-arm64@1.0.31", "", { "os": "linux", "cpu": "arm64", "bin": { "copilot-linux-arm64": "copilot" } }, "sha512-R5V7EIqn92f9YMe3zbQkW++Mw8WErDy6hA8Rr95bSJGiTVyWdj5kqPWSAPH6MLjFbC1T5cJQm/1we+QP3XO3Cw=="], - "@github/copilot-linux-x64": ["@github/copilot-linux-x64@1.0.11", "", { "os": "linux", "cpu": "x64", "bin": { "copilot-linux-x64": "copilot" } }, "sha512-UujTRO3xkPFC1CybchBbCnaTEAG6JrH0etIst07JvfekMWgvRxbiCHQPpDPSzBCPiBcGu0gba0/IT+vUCORuIw=="], + "@github/copilot-linux-x64": ["@github/copilot-linux-x64@1.0.31", "", { "os": "linux", "cpu": "x64", "bin": { "copilot-linux-x64": "copilot" } }, "sha512-LmcCGmYP9QLim/YMu5e1UlVeqCt/cuMI0fIqkdHs68h+0FGreSnHpn7nA9RbjAbQuPq9HFWeFjG5UpbAHM71Xg=="], - "@github/copilot-sdk": ["@github/copilot-sdk@0.2.0", "", { "dependencies": { "@github/copilot": "^1.0.10", "vscode-jsonrpc": "^8.2.1", "zod": "^4.3.6" } }, "sha512-fCEpD9W9xqcaCAJmatyNQ1PkET9P9liK2P4Vk0raDFoMXcvpIdqewa5JQeKtWCBUsN/HCz7ExkkFP8peQuo+DA=="], + "@github/copilot-sdk": ["@github/copilot-sdk@0.2.2", "", { "dependencies": { "@github/copilot": "^1.0.21", "vscode-jsonrpc": "^8.2.1", "zod": "^4.3.6" } }, "sha512-VZCqS08YlUM90bUKJ7VLeIxgTTEHtfXBo84T1IUMNvXRREX2csjPH6Z+CPw3S2468RcCLvzBXcc9LtJJTLIWFw=="], - "@github/copilot-win32-arm64": ["@github/copilot-win32-arm64@1.0.11", "", { "os": "win32", "cpu": "arm64", "bin": { "copilot-win32-arm64": "copilot.exe" } }, "sha512-EOW8HUM+EmnHEZEa+iUMl4pP1+2eZUk2XCbynYiMehwX9sidc4BxEHp2RuxADSzFPTieQEWzgjQmHWrtet8pQg=="], + "@github/copilot-win32-arm64": ["@github/copilot-win32-arm64@1.0.31", "", { "os": "win32", "cpu": "arm64", "bin": { "copilot-win32-arm64": "copilot.exe" } }, "sha512-OlMPsQYFbl1hzrE0t703BwB9k8lQauQ4ETiiKpXSV4FxUb3DAU9PqWcy1pZoBjmLCni9h1ASQQKmPQ9ERJPm3g=="], - "@github/copilot-win32-x64": ["@github/copilot-win32-x64@1.0.11", "", { "os": "win32", "cpu": "x64", "bin": { "copilot-win32-x64": "copilot.exe" } }, "sha512-fKGkSNamzs3h9AbmswNvPYJBORCb2Y8CbusijU3C7fT3ohvqnHJwKo5iHhJXLOKZNOpFZgq9YKha410u9sIs6Q=="], + "@github/copilot-win32-x64": ["@github/copilot-win32-x64@1.0.31", "", { "os": "win32", "cpu": "x64", "bin": { "copilot-win32-x64": "copilot.exe" } }, "sha512-nK8uRdlKH6TNk1cjBqEPTvzWQxwnDPgNN3M5bB7TBXL6EsaFdUJePz4tqutUPoPbSKQqo+DtmJGT3/+A30ZcXg=="], "@humanfs/core": ["@humanfs/core@0.19.1", "", {}, "sha512-5DyQ4+1JEUzejeK1JGICcideyfUbGixgS9jNgex5nqkW+cY7WZhxBigmieN5Qnw9ZosSNVC9KQKyb+GUaGyKUA=="], diff --git a/docs/orchestration.md b/docs/orchestration.md new file mode 100644 index 00000000..cd262c28 --- /dev/null +++ b/docs/orchestration.md @@ -0,0 +1,160 @@ +# Orchestration + +The Orchestration system is a multi-agent layer that lets a single user prompt fan +out into a DAG of subtasks executed by one or more engine sessions. It fuses two +designs: + +1. **fridayliu/feat/agentteam** — Light/Heavy brain split with a DAG executor, + guardrails, and a user-channel relay for human-in-the-loop Heavy Brain runs. +2. **PR #117 (realDuang/feat/agent-team)** — role-based orchestrator with + plan-confirmation UI, role→engine mapping settings, and team worktrees. + +Both histories are preserved via a merge commit; the merged system keeps +fridayliu's Light/Heavy brain core and absorbs PR #117's plan-confirm, role, and +team-worktree concepts. + +## Two Brains + +| Brain | Entry file | Behavior | +| ------ | -------------------------------------------- | ------------------------------------------------------------------------ | +| Light | `electron/main/services/orchestration/light-brain.ts` | One-shot planner: asks an engine to produce a DAG, then executes it. | +| Heavy | `electron/main/services/orchestration/heavy-brain.ts` | Persistent orchestrator: drives dispatch iteratively; supports UserChannel relays for clarification from the human. | + +Both brains share: + +- **DAG executor** (`dag-executor.ts`) — honors `dependsOn`, runs tasks in + parallel up to a concurrency cap. +- **TaskExecutor** (`task-executor.ts`) — runs a single task inside a dedicated + engine session, retries once on transient failure, and now calls an optional + `RoleResolver` to map `task.role` → `{engineType, modelId}` before dispatch. +- **Guardrails** (`guardrails.ts`) — rate limits, loop detection, max turns. + +## Plan Confirmation + +The Light Brain pauses between planning and execution when +`orchestrationRun.requirePlanConfirmation` is true (default for Light, off for Heavy). + +Flow: + +1. Light Brain generates a DAG and sets `orchestrationRun.status = "awaiting-confirmation"`. +2. Gateway emits `orchestration.updated`; UI shows the plan in `OrchestrationCards` with a + "Confirm & execute" button. +3. User inspects/edits tasks in `OrchestrationCards`, then fires + `gateway.confirmTeamPlan(runId, tasks)`. +4. The service's `confirmPlan(runId, tasks)` resolves the pending gate; + Light Brain resumes with the user-edited DAG. + +Rejection (run cancellation) rejects the pending gate and marks the run failed. + +Gateway keys: `TEAM_CONFIRM_PLAN`. Relevant types live in `src/types/unified.ts`. + +## Role Resolution + +Tasks can declare a semantic role instead of a concrete engine: + +```ts +{ role: "explorer", ... } // read-only investigation, prefer fast engine +{ role: "coder", ... } // read/write, prefer capable engine +``` + +Built-in roles (see `DEFAULT_ROLE_MAPPINGS` in +`electron/main/services/orchestration/index.ts`): + +| role | read-only | intended use | +| ---------- | --------- | ------------------------------------ | +| explorer | yes | codebase reconnaissance | +| researcher | yes | external docs / web research | +| reviewer | yes | code review pass | +| designer | no | design/architecture drafts | +| coder | no | implementation | + +Mappings are persisted in `settings.json` under `team.roleMappings`. They can be +overridden at runtime via `OrchestrationService.updateRoleMappings()` / gateway +`TEAM_UPDATE_ROLE_MAPPINGS`. + +Resolution order in `TaskExecutor`: + +1. If `task.engineType` is explicitly set → use it verbatim. +2. Else if `task.role` is set and `resolveRole` returns a mapping → use it. +3. Else fall back to the run's `defaultEngineType`. + +## Team Worktree + +Read/write tasks share a single git worktree so successive tasks see each +other's edits. `OrchestrationRun.teamWorktreeName` / `teamWorktreeDir` carry the +shared worktree through the run, and `DAGExecutor.runSingleTask` +routes each task based on its read/write intent: + +- **Write-capable task** (`needsWorktree !== false`, or role mapping + has `readOnly: false`) → runs with `directory = teamWorktreeDir` + and `defaultWorktreeId = teamWorktreeName`, so all writers share a + single worktree. +- **Read-only task** (`needsWorktree === false`, or role mapping has + `readOnly: true`) → runs in the run's primary directory, avoiding + contention with writers. + +When `teamWorktreeDir` is unset, all tasks use the run's directory +and existing `run.worktreeId`. + +The gateway `WORKTREE_CREATE` handler whitelists `team-*` names so the +orchestrator can provision worktrees even when the global worktree +feature flag is off. + +## Result Aggregation to Parent + +When a run reaches a terminal state (`completed` / `failed`) and has a +`parentSessionId`, `OrchestrationService.relayResultsToParentSession()` +sends the aggregated `finalResult` + failed-task list as a user message +to the parent session. The parent engine then summarizes for the user, +keeping everything in one conversation. + +Gated by `OrchestrationRun.aggregateToParent` (defaults to `true`; set `false` +to disable). Failures are swallowed with a warn log so a broken parent +session cannot corrupt run state. + +## Sidebar Grouping + +Chat wraps `connectOrchestrationHandlers()` to mirror every TeamRun update into +PR #117's orchestration sidebar registry (`registerTeam` + +`associateRunWithTeam` + `associateChildSession`), so Light/Heavy brain +child sessions collapse under their parent session in +`SessionSidebar` with the same UX as PR #117's orchestrator-service +flow. + +## Service Coexistence + +During the merge we kept PR #117's `orchestrator-service.ts` (role-based +orchestrator) alongside fridayliu's `orchestration/index.ts` (Light/Heavy brain). +Both are wired into `ws-server.ts` and both have a UI surface in `Chat.tsx`. +This lets the two flows ship side-by-side; a future cleanup pass can decide +whether to delete the PR #117 service once the Light/Heavy flow owns all the +PR #117 use cases. + +## Request Types + +| Gateway key | Purpose | +| ------------------------------ | ------------------------------------------ | +| `TEAM_RUN_CREATE` | Create a new team run | +| `TEAM_RUN_LIST` | List all runs | +| `TEAM_RUN_CANCEL` | Cancel an active run | +| `TEAM_RUN_DELETE` | Delete a completed run | +| `TEAM_CONFIRM_PLAN` | Resolve the plan-confirmation gate | +| `TEAM_GET_ROLE_MAPPINGS` | Read current role→engine mappings | +| `TEAM_UPDATE_ROLE_MAPPINGS` | Persist role→engine mapping edits | + +See `src/types/unified.ts` for the payload schemas. + +## Tests + +Primary specs: + +- `tests/unit/electron/services/orchestration/index.test.ts` — lifecycle, + persistence, role mappings, plan-confirm gate. +- `tests/unit/electron/services/orchestration/light-brain.test.ts` — planning, + awaiting-confirmation pause/resume, rejection. +- `tests/unit/electron/services/orchestration/heavy-brain.test.ts` — dispatch, + UserChannel relays. +- `tests/unit/electron/services/orchestration/task-executor.test.ts` — role + resolution, retry, error surfacing. +- `tests/unit/electron/services/orchestration/dag-executor.test.ts` — DAG order, + concurrency cap. diff --git a/electron/main/engines/claude/index.ts b/electron/main/engines/claude/index.ts index 0fe9358d..c9e4f191 100644 --- a/electron/main/engines/claude/index.ts +++ b/electron/main/engines/claude/index.ts @@ -215,6 +215,8 @@ export class ClaudeCodeAdapter extends EngineAdapter { private sessionDirectories = new Map(); /** Persisted ccSessionId per session, for SDK session resumption across restarts */ private sessionCcIds = new Map(); + /** Custom system prompts per session (e.g. orchestration instructions for agent team) */ + private sessionSystemPrompts = new Map(); /** Sessions that were just resumed after a dead process — emit notice on next message */ private pendingResumeNotice = new Set(); @@ -562,6 +564,9 @@ export class ClaudeCodeAdapter extends EngineAdapter { if (meta?.ccSessionId && typeof meta.ccSessionId === "string") { this.sessionCcIds.set(sessionId, meta.ccSessionId); } + if (meta?.systemPrompt && typeof meta.systemPrompt === "string") { + this.sessionSystemPrompts.set(sessionId, meta.systemPrompt); + } this.emit("session.created", { session }); // Warm up in background — store the promise so listCommands() can await it. @@ -626,6 +631,7 @@ export class ClaudeCodeAdapter extends EngineAdapter { } this.sessionDirectories.delete(sessionId); + this.sessionSystemPrompts.delete(sessionId); this.messageHistory.delete(sessionId); this.messageBuffers.delete(sessionId); this.sessionModes.delete(sessionId); @@ -1813,11 +1819,15 @@ export class ClaudeCodeAdapter extends EngineAdapter { // narrower than the internal Options type. The SDK internally passes these // through to ProcessTransport which accepts all Options fields. - // Build system prompt append: identity + cached user skills + // Build system prompt append: identity + cached user skills + optional custom system prompt let promptAppend = CODEMUX_IDENTITY_PROMPT; if (this.cachedSkillNames.length > 0) { promptAppend += `\n\nThe user has installed the following additional skills (invokable via the Skill tool): ${this.cachedSkillNames.join(", ")}. When the user's request matches one of these skills, use the Skill tool to invoke it.`; } + const customSystemPrompt = this.sessionSystemPrompts.get(sessionId); + if (customSystemPrompt) { + promptAppend += "\n\n" + customSystemPrompt; + } const sdkOptions: any = { model: opts.model ?? this.currentModelId ?? "claude-sonnet-4-20250514", diff --git a/electron/main/engines/codex/index.ts b/electron/main/engines/codex/index.ts index a0ec9f79..2d61743c 100644 --- a/electron/main/engines/codex/index.ts +++ b/electron/main/engines/codex/index.ts @@ -214,6 +214,8 @@ export class CodexAdapter extends EngineAdapter { private sessionReasoningEfforts = new Map(); private sessionServiceTiers = new Map(); private sessionDirectories = new Map(); + /** Custom system prompts per session (e.g. orchestration instructions for agent team) */ + private sessionSystemPrompts = new Map(); private currentModelId: string = CODEX_FALLBACK_MODEL; private currentMode: string = DEFAULT_MODE_ID; @@ -362,17 +364,24 @@ export class CodexAdapter extends EngineAdapter { await this.start(); const normalizedDirectory = normalizeDirectory(directory); + const customSystemPrompt = (meta?.systemPrompt && typeof meta.systemPrompt === "string") ? meta.systemPrompt : undefined; const existingThreadId = resolveThreadId(undefined, meta); + const startThread = () => + customSystemPrompt + ? this.startThread(normalizedDirectory, customSystemPrompt) + : this.startThread(normalizedDirectory); let threadResponse: ThreadResponse; if (existingThreadId) { try { - threadResponse = await this.resumeThread(existingThreadId, normalizedDirectory); + threadResponse = customSystemPrompt + ? await this.resumeThread(existingThreadId, normalizedDirectory, customSystemPrompt) + : await this.resumeThread(existingThreadId, normalizedDirectory); } catch (error) { codexLog.warn(`Failed to resume Codex thread ${existingThreadId}, starting a new one instead:`, error); - threadResponse = await this.startThread(normalizedDirectory); + threadResponse = await startThread(); } } else { - threadResponse = await this.startThread(normalizedDirectory); + threadResponse = await startThread(); } const threadId = threadResponse.thread?.id; @@ -389,6 +398,9 @@ export class CodexAdapter extends EngineAdapter { if (!this.sessionDirectories.has(sessionId)) { this.sessionDirectories.set(sessionId, normalizedDirectory); } + if (customSystemPrompt) { + this.sessionSystemPrompts.set(sessionId, customSystemPrompt); + } if (threadResponse.model) { this.sessionModels.set(sessionId, threadResponse.model); this.currentModelId = threadResponse.model; @@ -1893,16 +1905,19 @@ export class CodexAdapter extends EngineAdapter { return undefined; } - private async startThread(directory: string): Promise { + private async startThread(directory: string, customSystemPrompt?: string): Promise { const modeId = this.currentMode; const approvalPolicy = clampApprovalPolicy(modeToApprovalPolicy(modeId), this.configRequirements); const sandboxMode = clampSandboxMode(modeToSandboxMode(modeId), this.configRequirements); + const baseInstructions = customSystemPrompt + ? CODEMUX_IDENTITY_PROMPT + "\n\n" + customSystemPrompt + : CODEMUX_IDENTITY_PROMPT; const response = asRecord(await this.client!.request("thread/start", { cwd: directory, model: this.currentModelId, approvalPolicy, sandbox: sandboxMode, - baseInstructions: CODEMUX_IDENTITY_PROMPT, + baseInstructions, serviceName: "codemux", experimentalRawEvents: false, persistExtendedHistory: true, @@ -1911,13 +1926,17 @@ export class CodexAdapter extends EngineAdapter { return response; } - private async resumeThread(threadId: string, directory: string): Promise { + private async resumeThread(threadId: string, directory: string, customSystemPrompt?: string): Promise { const sessionId = toEngineSessionId(threadId); const modeId = this.sessionModes.get(sessionId) ?? this.currentMode; const approvalPolicy = clampApprovalPolicy(modeToApprovalPolicy(modeId), this.configRequirements); const sandboxMode = clampSandboxMode(modeToSandboxMode(modeId), this.configRequirements); const modelId = this.sessionModels.get(sessionId) ?? this.currentModelId; const serviceTier = this.sessionServiceTiers.get(sessionId); + const systemPrompt = customSystemPrompt ?? this.sessionSystemPrompts.get(sessionId); + const baseInstructions = systemPrompt + ? CODEMUX_IDENTITY_PROMPT + "\n\n" + systemPrompt + : CODEMUX_IDENTITY_PROMPT; const response = asRecord(await this.client!.request("thread/resume", { threadId, @@ -1925,7 +1944,7 @@ export class CodexAdapter extends EngineAdapter { model: modelId, approvalPolicy, sandbox: sandboxMode, - baseInstructions: CODEMUX_IDENTITY_PROMPT, + baseInstructions, persistExtendedHistory: true, ...(serviceTier ? { serviceTier } : {}), })) as ThreadResponse; @@ -2171,6 +2190,7 @@ export class CodexAdapter extends EngineAdapter { this.sessionReasoningEfforts.delete(sessionId); this.sessionServiceTiers.delete(sessionId); this.sessionDirectories.delete(sessionId); + this.sessionSystemPrompts.delete(sessionId); this.rejectQueuedMessagesForSession(sessionId, "Session deleted"); this.rejectPendingForSession(sessionId, "Session deleted"); diff --git a/electron/main/engines/copilot/index.ts b/electron/main/engines/copilot/index.ts index 01c842ca..20f422cf 100644 --- a/electron/main/engines/copilot/index.ts +++ b/electron/main/engines/copilot/index.ts @@ -128,6 +128,8 @@ export class CopilotSdkAdapter extends EngineAdapter { private sessionTodos = new Map>(); private allowedAlwaysKinds = new Set(); private cachedCommands: EngineCommand[] = []; + /** Custom system prompt per session (e.g. orchestration instructions for agent team) */ + private sessionSystemPrompts = new Map(); private messageBuffers = new Map(); private messageHistory = new Map(); @@ -297,18 +299,24 @@ export class CopilotSdkAdapter extends EngineAdapter { } } - async createSession(directory: string): Promise { + async createSession(directory: string, meta?: Record): Promise { this.ensureClient(); const normalizedDir = directory.replaceAll("\\", "/"); const mode = "autopilot"; + // Build system message: identity prompt + optional custom system prompt (e.g. orchestration instructions) + let systemContent = CODEMUX_IDENTITY_PROMPT; + if (meta?.systemPrompt && typeof meta.systemPrompt === "string") { + systemContent += "\n\n" + meta.systemPrompt; + } + const config: SessionConfig = { workingDirectory: directory, streaming: true, model: this.currentModelId ?? undefined, onPermissionRequest: (req, ctx) => this.handlePermissionRequest(req as any, ctx), onUserInputRequest: (req, ctx) => this.handleUserInputRequest(req as any, ctx), - systemMessage: { mode: "append" as const, content: CODEMUX_IDENTITY_PROMPT }, + systemMessage: { mode: "append" as const, content: systemContent }, }; const sdkSession = await this.client!.createSession(config); @@ -318,6 +326,9 @@ export class CopilotSdkAdapter extends EngineAdapter { this.activeSessions.set(sessionId, sdkSession); this.sessionModes.set(sessionId, mode); this.sessionDirectories.set(sessionId, directory); + if (meta?.systemPrompt && typeof meta.systemPrompt === "string") { + this.sessionSystemPrompts.set(sessionId, meta.systemPrompt); + } const now = Date.now(); const session: UnifiedSession = { @@ -375,6 +386,7 @@ export class CopilotSdkAdapter extends EngineAdapter { this.sessionModes.delete(sessionId); this.sessionDirectories.delete(sessionId); this.sessionTodos.delete(sessionId); + this.sessionSystemPrompts.delete(sessionId); } async sendMessage( @@ -990,12 +1002,16 @@ export class CopilotSdkAdapter extends EngineAdapter { const workingDirectory = directory || this.sessionDirectories.get(sessionId); const sdkReasoningEffort = this.getSdkReasoningEffort(sessionId); + const customSystemPrompt = this.sessionSystemPrompts.get(sessionId); + const systemContent = customSystemPrompt + ? CODEMUX_IDENTITY_PROMPT + "\n\n" + customSystemPrompt + : CODEMUX_IDENTITY_PROMPT; const config: ResumeSessionConfig = { streaming: true, workingDirectory, model: this.currentModelId ?? undefined, ...(sdkReasoningEffort ? { reasoningEffort: sdkReasoningEffort } : {}), - systemMessage: { mode: "append" as const, content: CODEMUX_IDENTITY_PROMPT }, + systemMessage: { mode: "append" as const, content: systemContent }, onPermissionRequest: (req, ctx) => this.handlePermissionRequest(req as any, ctx), onUserInputRequest: (req, ctx) => this.handleUserInputRequest(req as any, ctx), }; @@ -1017,7 +1033,7 @@ export class CopilotSdkAdapter extends EngineAdapter { workingDirectory, model: this.currentModelId ?? undefined, ...(sdkReasoningEffort ? { reasoningEffort: sdkReasoningEffort } : {}), - systemMessage: { mode: "append" as const, content: CODEMUX_IDENTITY_PROMPT }, + systemMessage: { mode: "append" as const, content: systemContent }, onPermissionRequest: (req, ctx) => this.handlePermissionRequest(req as any, ctx), onUserInputRequest: (req, ctx) => this.handleUserInputRequest(req as any, ctx), }; diff --git a/electron/main/engines/opencode/index.ts b/electron/main/engines/opencode/index.ts index 8bbc4c45..7cd15cad 100644 --- a/electron/main/engines/opencode/index.ts +++ b/electron/main/engines/opencode/index.ts @@ -94,6 +94,9 @@ export class OpenCodeAdapter extends EngineAdapter { // final message when session.status: idle arrives (see resolveSessionIdle). private lastEmittedMessage = new Map(); + /** Custom system prompts per session (e.g. orchestration instructions for agent team) */ + private sessionSystemPrompts = new Map(); + // Track primary (first) user message IDs per session to avoid false-positive // queued.consumed emissions when the primary user message.updated arrives late. private primaryUserMsgIds = new Map(); @@ -119,6 +122,15 @@ export class OpenCodeAdapter extends EngineAdapter { this.port = options?.port ?? OPENCODE_PORT; } + /** Get the system prompt for a session, composing identity + custom system prompt */ + private getSystemPrompt(sessionId?: string): string { + if (sessionId) { + const custom = this.sessionSystemPrompts.get(sessionId); + if (custom) return CODEMUX_IDENTITY_PROMPT + "\n\n" + custom; + } + return CODEMUX_IDENTITY_PROMPT; + } + private get baseUrl(): string { return `http://127.0.0.1:${this.port}`; } @@ -785,7 +797,7 @@ export class OpenCodeAdapter extends EngineAdapter { // --- Sessions --- - async createSession(directory: string): Promise { + async createSession(directory: string, meta?: Record): Promise { this.switchDirectory(directory); const client = this.ensureClient(); @@ -796,6 +808,9 @@ export class OpenCodeAdapter extends EngineAdapter { const session = convertSession(this.engineType, result.data); this.sessions.set(session.id, session); + if (meta?.systemPrompt && typeof meta.systemPrompt === "string") { + this.sessionSystemPrompts.set(session.id, meta.systemPrompt); + } return session; } @@ -875,6 +890,7 @@ export class OpenCodeAdapter extends EngineAdapter { // Clean up user message IDs for this session to prevent memory leak this.userMessageIds.delete(sessionId); + this.sessionSystemPrompts.delete(sessionId); } // --- Messages --- @@ -933,7 +949,7 @@ export class OpenCodeAdapter extends EngineAdapter { parts, agent: options?.mode, model, - system: CODEMUX_IDENTITY_PROMPT, + system: this.getSystemPrompt(sessionId), }); const promptError = (promptResult as any).error; diff --git a/electron/main/gateway/engine-manager.ts b/electron/main/gateway/engine-manager.ts index bfa81173..f9e4e8f7 100644 --- a/electron/main/gateway/engine-manager.ts +++ b/electron/main/gateway/engine-manager.ts @@ -116,6 +116,9 @@ export class EngineManager extends EventEmitter { /** Track which sessions have active sendMessage calls (for idle detection) */ private activeSessions = new Set(); + /** Engine session IDs with pending internal sends — suppress user message.updated events */ + private internalSendSessions = new Set(); + // --- Adapter Registration --- registerAdapter(adapter: EngineAdapter): void { @@ -291,6 +294,13 @@ export class EngineManager extends EventEmitter { const { sessionId: engineSessionId, message } = data; const convId = this.resolveConversationId(engineSessionId); + // Suppress user message events from internal sends (e.g. orchestration relay). + // The user message was already persisted with internal: true by persistUserMessage; + // the engine re-emitting it without the flag would show it as user input. + if (message.role === "user" && this.internalSendSessions.has(engineSessionId)) { + return; + } + if (convId) { // Persist the message this.persistMessage(convId, message); @@ -536,7 +546,7 @@ export class EngineManager extends EventEmitter { * Called before adapter.sendMessage() to ensure user messages are saved * even if the adapter doesn't emit user message events (e.g., OpenCode). */ - private async persistUserMessage(conversationId: string, content: MessagePromptContent[]): Promise { + private async persistUserMessage(conversationId: string, content: MessagePromptContent[], internal?: boolean): Promise { try { const now = Date.now(); const msgId = timeId("msg"); @@ -573,6 +583,7 @@ export class EngineManager extends EventEmitter { role: "user", time: { created: now, completed: now }, parts, + ...(internal ? { internal: true } : {}), }; await conversationStore.appendMessage(conversationId, convMessage); @@ -690,6 +701,7 @@ export class EngineManager extends EventEmitter { engineType: EngineType | undefined, directory: string, worktreeId?: string, + meta?: Record, ): Promise { const resolvedType = engineType || this.getDefaultEngineType(); const adapter = this.getAdapterOrThrow(resolvedType); // Validate engine exists @@ -719,7 +731,8 @@ export class EngineManager extends EventEmitter { // or Claude V2 session init) happens at session creation time, so features // like slash command autocomplete work before the user sends a message. try { - const engineSession = await adapter.createSession(conv.directory, conv.engineMeta); + const adapterMeta = { ...conv.engineMeta, ...meta }; + const engineSession = await adapter.createSession(conv.directory, adapterMeta); conversationStore.setEngineSession(conv.id, engineSession.id, engineSession.engineMeta); this.engineToConvMap.set(engineSession.id, conv.id); } catch (err) { @@ -828,7 +841,7 @@ export class EngineManager extends EventEmitter { async sendMessage( sessionId: string, content: MessagePromptContent[], - options?: { mode?: string; modelId?: string; reasoningEffort?: ReasoningEffort | null; serviceTier?: CodexServiceTier | null }, + options?: { mode?: string; modelId?: string; reasoningEffort?: ReasoningEffort | null; serviceTier?: CodexServiceTier | null; internal?: boolean }, ): Promise { this.activeSessions.add(sessionId); try { @@ -856,17 +869,30 @@ export class EngineManager extends EventEmitter { // which would cause applyTitleFallback to skip (title no longer default). // Run BEFORE adapter.sendMessage so the sidebar updates immediately, // not after the (potentially long-running) engine processing completes. - this.applyTitleFallback(sessionId, content); + if (!options?.internal) { + this.applyTitleFallback(sessionId, content); + } // Persist user message before sending to engine // (Some adapters like OpenCode don't emit user message events) - await this.persistUserMessage(sessionId, content); + // Internal messages (e.g. orchestration relay) are persisted but marked + // so the frontend can hide the user bubble while keeping the turn intact. + await this.persistUserMessage(sessionId, content, options?.internal); + + // Mark session to suppress user message.updated events from the engine + // for internal sends — the user message is already persisted with internal: true. + if (options?.internal) { + this.internalSendSessions.add(engineSessionId); + } const result = await adapter.sendMessage(engineSessionId, content, { ...options, directory: conv.directory, }); + // Clear the internal send suppression flag after the send completes + this.internalSendSessions.delete(engineSessionId); + // If the engine reported a stale session (no SSE response within timeout), // clear the engineSessionId so the next attempt creates a fresh session. if (result.staleSession) { @@ -886,6 +912,23 @@ export class EngineManager extends EventEmitter { this.emit("message.updated", { sessionId, message: finalized }); } + // Merge buffered content parts into the returned message. + // Adapters stream text/file content via message.part.updated events which + // are buffered in contentPartsBuffer but NOT included in the message + // returned by adapter.sendMessage(). Callers that read result.parts + // (e.g. AgentTeamService) would otherwise see an empty parts array. + const bufferedContent = this.contentPartsBuffer.get(result.id); + if (bufferedContent && bufferedContent.length > 0) { + const existingIds = new Set((result.parts || []).map((p) => p.id)); + const merged = [...(result.parts || [])]; + for (const bp of bufferedContent) { + if (!existingIds.has(bp.id)) { + merged.push(bp); + } + } + result.parts = merged; + } + return result; } finally { this.activeSessions.delete(sessionId); @@ -973,6 +1016,7 @@ export class EngineManager extends EventEmitter { modelId: msg.modelId, reasoningEffort: msg.reasoningEffort, error: msg.error, + internal: msg.internal, }; }); } diff --git a/electron/main/gateway/ws-server.ts b/electron/main/gateway/ws-server.ts index b381f875..ebd3c11a 100644 --- a/electron/main/gateway/ws-server.ts +++ b/electron/main/gateway/ws-server.ts @@ -17,6 +17,7 @@ import { gatewayLog } from "../services/logger"; import log from "../services/logger"; import { conversationStore } from "../services/conversation-store"; import { scheduledTaskService } from "../services/scheduled-task-service"; +import { orchestrationService } from "../services/orchestration"; import { GatewayRequestType, GatewayNotificationType, @@ -40,6 +41,12 @@ import { type WorktreeRemoveRequest, type WorktreeMergeRequest, type WorktreeListBranchesRequest, + type OrchestrationCreateRequest, + type OrchestrationCancelRequest, + type OrchestrationGetRequest, + type OrchestrationSendMessageRequest, + type OrchestrationConfirmPlanRequest, + type OrchestrationUpdateRoleMappingsRequest, } from "../../../src/types/unified"; import { isCodexServiceTier } from "../../../src/types/unified"; @@ -455,7 +462,8 @@ export class GatewayServer { // Worktree case GatewayRequestType.WORKTREE_CREATE: { const req = p as WorktreeCreateRequest; - if (!this.isWorktreeEnabled()) { + // Allow team worktrees even when worktree feature is disabled (internal orchestration) + if (!this.isWorktreeEnabled() && !req.name?.startsWith("team-")) { throw Object.assign(new Error("Worktree feature is disabled"), { code: "WORKTREE_DISABLED" }); } const { worktreeManager } = await import("../services/worktree-manager"); @@ -502,6 +510,51 @@ export class GatewayServer { return worktreeManager.listBranches(req.directory); } + // Orchestration (unified Light/Heavy brain + role-based plan confirmation) + case GatewayRequestType.ORCHESTRATION_CREATE: { + const req = p as OrchestrationCreateRequest; + return orchestrationService.createRun(req); + } + + case GatewayRequestType.ORCHESTRATION_DECOMPOSE: { + // Legacy frontend hint — decomposition is started automatically by createRun; + // this handler exists for backward compatibility and is a no-op. + return { ok: true }; + } + + case GatewayRequestType.ORCHESTRATION_CONFIRM: { + const req = p as OrchestrationConfirmPlanRequest; + orchestrationService.confirmPlan(req.runId, req.subtasks); + return { ok: true }; + } + + case GatewayRequestType.ORCHESTRATION_CANCEL: { + const req = p as OrchestrationCancelRequest; + return orchestrationService.cancelRun(req.runId); + } + + case GatewayRequestType.ORCHESTRATION_LIST: + return orchestrationService.listRuns(); + + case GatewayRequestType.ORCHESTRATION_GET: { + const req = p as OrchestrationGetRequest; + return orchestrationService.getRun(req.runId); + } + + case GatewayRequestType.ORCHESTRATION_SEND_MESSAGE: { + const req = p as OrchestrationSendMessageRequest; + orchestrationService.sendMessageToRun(req.runId, req.text); + return; + } + + case GatewayRequestType.ORCHESTRATION_GET_ROLE_MAPPINGS: + return { mappings: orchestrationService.getRoleMappings() }; + + case GatewayRequestType.ORCHESTRATION_UPDATE_ROLE_MAPPINGS: { + const req = p as OrchestrationUpdateRoleMappingsRequest; + return { mappings: orchestrationService.updateRoleMappings(req.mappings) }; + } + default: throw Object.assign( new Error(`Unknown request type: ${type}`), @@ -625,6 +678,17 @@ export class GatewayServer { payload: data, }); }); + + // Orchestration events + orchestrationService.on("orchestration.updated", (data) => { + this.broadcast({ type: GatewayNotificationType.ORCHESTRATION_UPDATED, payload: data }); + }); + orchestrationService.on("orchestration.subtask.updated", (data) => { + this.broadcast({ + type: GatewayNotificationType.ORCHESTRATION_SUBTASK_UPDATED, + payload: data, + }); + }); } private broadcast(notification: GatewayNotification): void { diff --git a/electron/main/index.ts b/electron/main/index.ts index 3f7b2025..0a268718 100644 --- a/electron/main/index.ts +++ b/electron/main/index.ts @@ -128,6 +128,10 @@ if (!gotTheLock) { // Initialize scheduled task service (persistent desktop-level scheduled tasks) scheduledTaskService.init(engineManager); + // Initialize agent team service (cross-engine orchestration) + const { orchestrationService } = await import("./services/orchestration"); + orchestrationService.init(engineManager); + // Register IPC handlers registerIpcHandlers(); diff --git a/electron/main/services/logger.ts b/electron/main/services/logger.ts index 9db9d25a..8ba0fb08 100644 --- a/electron/main/services/logger.ts +++ b/electron/main/services/logger.ts @@ -150,6 +150,7 @@ export const wecomLog = log.scope("wecom"); export const teamsLog = log.scope("teams"); export const codexLog = log.scope("codex"); export const scheduledTaskLog = log.scope("sched-task"); +export const orchestrationLog = log.scope("orchestration"); export type ScopedLogger = Pick< typeof feishuLog, diff --git a/electron/main/services/orchestration/dag-executor.ts b/electron/main/services/orchestration/dag-executor.ts new file mode 100644 index 00000000..eb900ade --- /dev/null +++ b/electron/main/services/orchestration/dag-executor.ts @@ -0,0 +1,184 @@ +// ============================================================================ +// DAG Executor — Deterministic parallel task scheduling +// Executes tasks in topological order, running independent tasks in parallel. +// Shared between Light Brain and Heavy Brain orchestrators. +// ============================================================================ + +import { EventEmitter } from "events"; +import type { OrchestrationSubtask, OrchestrationRun } from "../../../../src/types/unified"; +import { TaskExecutor } from "./task-executor"; +import { AGENT_TEAM_MAX_CONCURRENT_TASKS } from "./guardrails"; + +export interface DAGExecutorEvents { + /** A task's status changed */ + "task.updated": (data: { runId: string; task: OrchestrationSubtask }) => void; +} + +export declare interface DAGExecutor { + on(event: K, listener: DAGExecutorEvents[K]): this; + emit(event: K, ...args: Parameters): boolean; +} + +export class DAGExecutor extends EventEmitter { + constructor( + private taskExecutor: TaskExecutor, + private directory: string, + private maxConcurrentTasks = AGENT_TEAM_MAX_CONCURRENT_TASKS, + ) { + super(); + } + + /** + * Execute all ready tasks in the DAG. + * Runs in layers: find all tasks whose dependencies are satisfied, + * execute them in parallel, then repeat until no more tasks are runnable. + * + * @param run - The team run containing the task DAG + * @returns The tasks that were executed in this call + */ + async executeReadyTasks(run: OrchestrationRun): Promise { + const executedTasks: OrchestrationSubtask[] = []; + + while (true) { + const ready = DAGExecutor.findReadyTasks(run.subtasks).slice(0, this.maxConcurrentTasks); + if (ready.length === 0) break; + + // Execute the next ready batch up to the concurrency limit. + const results = await Promise.allSettled( + ready.map((task) => this.runSingleTask(run, task)), + ); + + // Process results + for (let i = 0; i < ready.length; i++) { + const task = ready[i]; + const result = results[i]; + + if (result.status === "fulfilled") { + task.status = "completed"; + task.resultSummary = result.value.summary; + task.sessionId = result.value.sessionId; + if (result.value.error) { + task.status = "failed"; + task.error = result.value.error; + } + } else { + task.status = "failed"; + task.error = result.reason?.message ?? String(result.reason); + } + + task.time = { ...task.time, completed: Date.now() }; + this.emit("task.updated", { runId: run.id, task }); + executedTasks.push(task); + } + + // Propagate failures: mark downstream tasks as blocked + DAGExecutor.propagateFailures(run.subtasks); + } + + return executedTasks; + } + + /** + * Find tasks that are ready to execute: + * - status is "pending" + * - all dependencies are "completed" + */ + static findReadyTasks(tasks: OrchestrationSubtask[]): OrchestrationSubtask[] { + return tasks.filter((task) => { + if (task.status !== "pending") return false; + return task.dependsOn.every((depId) => { + const dep = tasks.find((t) => t.id === depId); + return dep?.status === "completed"; + }); + }); + } + + /** + * Execute a single task with upstream context injection. + * + * Team-worktree routing: + * - If run.teamWorktreeDir is set and the task is write-capable (needsWorktree + * !== false), the task runs in the shared team worktree so sibling tasks + * see each other's edits. + * - Read-only tasks (needsWorktree === false) skip the team worktree and + * run in the run's primary directory, avoiding contention. + * - When teamWorktreeDir is not set, behavior is unchanged. + */ + private async runSingleTask(run: OrchestrationRun, task: OrchestrationSubtask) { + task.status = "running"; + task.time = { ...task.time, started: Date.now() }; + this.emit("task.updated", { runId: run.id, task }); + + // Gather upstream results for context injection + const dependencies = task.dependsOn + .map((depId) => run.subtasks.find((t) => t.id === depId)) + .filter((t): t is OrchestrationSubtask => t != null); + + const upstreamContext = TaskExecutor.buildUpstreamContext(dependencies); + + // Choose directory + default worktree based on team worktree presence. + const readOnlyByRole = task.role + ? (run.roleMappings?.find((m) => m.role === task.role)?.readOnly ?? false) + : false; + const isReadOnly = task.needsWorktree === false || readOnlyByRole; + const useTeamWorktree = !!run.teamWorktreeDir && !isReadOnly; + + // Always pass the parent project directory to createSession — the worktree + // directory is resolved inside createSession from the worktreeId. Passing + // the already-resolved worktree dir breaks resolveProjectId which uses the + // last path segment, and also breaks parentDirectory derivation. + const effectiveDirectory = this.directory; + const effectiveDefaultWorktreeId = useTeamWorktree + ? (run.teamWorktreeName ?? run.worktreeId) + : run.worktreeId; + + return this.taskExecutor.execute(task, effectiveDirectory, { + upstreamContext, + defaultWorktreeId: effectiveDefaultWorktreeId, + }); + } + + /** + * Mark tasks as "blocked" if any of their dependencies failed. + */ + static propagateFailures(tasks: OrchestrationSubtask[]): OrchestrationSubtask[] { + const blockedTasks: OrchestrationSubtask[] = []; + let changed = true; + while (changed) { + changed = false; + for (const task of tasks) { + if (task.status !== "pending") continue; + + const hasFailedDep = task.dependsOn.some((depId) => { + const dep = tasks.find((t) => t.id === depId); + return dep?.status === "failed" || dep?.status === "blocked"; + }); + + if (hasFailedDep) { + task.status = "blocked"; + task.error = "Blocked by failed upstream task"; + blockedTasks.push(task); + changed = true; + } + } + } + + return blockedTasks; + } + + /** + * Check if the DAG execution is complete (no more pending/running tasks). + */ + static isComplete(tasks: OrchestrationSubtask[]): boolean { + return tasks.every((t) => + t.status === "completed" || t.status === "failed" || t.status === "blocked" || t.status === "cancelled", + ); + } + + /** + * Check if all tasks completed successfully. + */ + static isAllSuccessful(tasks: OrchestrationSubtask[]): boolean { + return tasks.every((t) => t.status === "completed"); + } +} diff --git a/electron/main/services/orchestration/guardrails.ts b/electron/main/services/orchestration/guardrails.ts new file mode 100644 index 00000000..edda9306 --- /dev/null +++ b/electron/main/services/orchestration/guardrails.ts @@ -0,0 +1,4 @@ +export const AGENT_TEAM_MAX_CONCURRENT_TASKS = 100; +export const AGENT_TEAM_INACTIVITY_TIMEOUT_MS = 15 * 60 * 1000; +export const AGENT_TEAM_MAX_TASK_RETRIES = 1; +export const AGENT_TEAM_RETRY_BACKOFF_MS = 1000; diff --git a/electron/main/services/orchestration/heavy-brain.ts b/electron/main/services/orchestration/heavy-brain.ts new file mode 100644 index 00000000..99442c00 --- /dev/null +++ b/electron/main/services/orchestration/heavy-brain.ts @@ -0,0 +1,796 @@ +// ============================================================================ +// Heavy Brain — Continuous LLM supervisor orchestration +// A long-running orchestrator session dispatches tasks and adapts dynamically. +// Results are reported incrementally as each task completes. +// ============================================================================ + +import type { EngineManager } from "../../gateway/engine-manager"; +import type { OrchestrationSubtask, OrchestrationRun, EngineType, UnifiedMessage } from "../../../../src/types/unified"; +import { DAGExecutor } from "./dag-executor"; +import { + TaskExecutor, + extractTextFromMessage, + trackAutoApproveSession, + type AutoApproveSessionTracker, + type TaskExecutionResult, + type RoleResolver, +} from "./task-executor"; +import { dispatchSkill, type DispatchInstruction, type DispatchTask } from "./skills"; +import { buildOrchestratorPrompt, formatSingleTaskResult, formatTaskResults, formatUserMessage } from "./prompts"; +import { orchestrationLog } from "./logger"; +import { UserChannel } from "./user-channel"; +import { AGENT_TEAM_MAX_CONCURRENT_TASKS } from "./guardrails"; + +/** Maximum orchestration iterations to prevent infinite loops */ +const MAX_ITERATIONS = 20; + +interface RunningTaskState { + promise: Promise; + sessionId?: string; +} + +/** + * Race a Map of promises: resolves with the first one to settle. + * Returns the key and result; the remaining promises keep running. + */ +function raceMap(map: Map): Promise<{ id: string; result: TaskExecutionResult }> { + return Promise.race( + Array.from(map.entries()).map(([id, state]) => + state.promise.then( + (result) => ({ id, result }), + (err) => ({ + id, + result: { + sessionId: "", + summary: "", + error: err instanceof Error ? err.message : String(err), + }, + }), + ), + ), + ); +} + +type RunningTasks = Map; +type TerminalMessage = UnifiedMessage & { _terminal?: true }; + +type ParseInstructionResult = + | { ok: true; data: DispatchInstruction; response: UnifiedMessage } + | { ok: false; error: string; response: UnifiedMessage }; + +type MergeDispatchTasksResult = + | { ok: true; tasks: OrchestrationSubtask[] } + | { ok: false; error: string }; + +export class HeavyBrainOrchestrator { + private cancelled = false; + private terminal = false; + private nextAutoTaskIndex = 0; + private activeRun: { + teamRun: OrchestrationRun; + onTaskUpdated: (task: OrchestrationSubtask) => void; + } | null = null; + private activeRunningTasks: RunningTasks | null = null; + private cancelSignal: Promise = Promise.resolve(); + private resolveCancelSignal: (() => void) | null = null; + private cancelledSessionIds = new Set(); + /** Shared user message channel — external code calls userChannel.send() */ + readonly userChannel = new UserChannel(); + + constructor( + private engineManager: EngineManager, + private autoApproveSessions: AutoApproveSessionTracker, + private resolveRole?: RoleResolver, + private awaitPlanConfirmation?: (runId: string) => Promise, + private maxConcurrentTasks = AGENT_TEAM_MAX_CONCURRENT_TASKS, + ) {} + + /** + * Run Heavy Brain orchestration: + * 1. Create orchestrator session + * 2. Loop: orchestrator dispatches → execute with incremental results → decide next + * 3. Until orchestrator signals "complete" or max iterations + */ + async run( + teamRun: OrchestrationRun, + orchestratorEngineType: EngineType | undefined, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): Promise { + this.cancelled = false; + this.terminal = false; + this.nextAutoTaskIndex = teamRun.subtasks.length; + this.activeRun = { teamRun, onTaskUpdated }; + this.activeRunningTasks = null; + this.cancelledSessionIds.clear(); + this.cancelSignal = new Promise((resolve) => { + this.resolveCancelSignal = resolve; + }); + + const defaultEngineType = this.engineManager.getDefaultEngineType(); + const engineType = orchestratorEngineType || defaultEngineType; + + try { + // --- Create orchestrator session --- + teamRun.status = "decomposing"; + orchestrationLog.info(`[${teamRun.id}] Heavy Brain: creating orchestrator session on ${engineType}`); + + const engines = this.engineManager.listEngines(); + const prompt = buildOrchestratorPrompt( + teamRun.prompt, + engines, + teamRun.directory, + ); + + // Inject format spec + orchestrator role as system-level prompt. + const systemPrompt = `${dispatchSkill.formatPrompt}\n\n---\n\n${prompt}`; + + const orchSession = await this.engineManager.createSession( + engineType, + teamRun.parentDirectory ?? teamRun.directory, + teamRun.worktreeId, + { systemPrompt }, + ); + teamRun.orchestratorSessionId = orchSession.id; + this.registerAutoApprove(orchSession.id); + + if (this.terminal) { + return; + } + + const taskExecutor = new TaskExecutor( + this.engineManager, + this.autoApproveSessions, + defaultEngineType, + this.resolveRole, + ); + + // --- Initial prompt --- + // Also send as user message for engines that don't support custom system prompts + const fullPrompt = `${dispatchSkill.formatPrompt}\n\n---\n\n${prompt}`; + const initialResponse = await this.sendToOrchestrator(orchSession.id, fullPrompt); + if (!initialResponse || this.terminal) { + return; + } + let response = initialResponse; + + teamRun.status = "running"; + let iterations = 0; + let planConfirmed = false; + + // --- Orchestration loop --- + while (iterations++ < MAX_ITERATIONS && !this.terminal) { + const parsed = await this.parseInstruction(teamRun, orchSession.id, response); + response = parsed.response; + + if (this.terminal) { + return; + } + + if (!parsed.ok) { + await this.failRun(teamRun, `Orchestrator output format error: ${parsed.error}`, onTaskUpdated); + return; + } + + const data = parsed.data; + + // --- Handle "complete" --- + if (data.action === "complete") { + await this.completeRun(teamRun, data.result, onTaskUpdated); + return; + } + + // --- Handle "dispatch" --- + if (data.action === "dispatch") { + const mergeResult = this.mergeDispatchTasks(teamRun, data.tasks, onTaskUpdated); + if (!mergeResult.ok) { + await this.failRun(teamRun, `Orchestrator task graph error: ${mergeResult.error}`, onTaskUpdated); + return; + } + + // --- Plan confirmation (first dispatch only) --- + if (!planConfirmed && teamRun.requirePlanConfirmation && this.awaitPlanConfirmation) { + teamRun.status = "confirming"; + orchestrationLog.info(`[${teamRun.id}] Heavy Brain: awaiting user plan confirmation`); + try { + const confirmedTasks = await this.awaitPlanConfirmation(teamRun.id); + teamRun.subtasks = confirmedTasks.map((t): OrchestrationSubtask => ({ + ...t, + status: "pending", + worktreeId: t.worktreeId ?? teamRun.worktreeId, + })); + orchestrationLog.info(`[${teamRun.id}] Heavy Brain: plan confirmed (${teamRun.subtasks.length} tasks)`); + } catch (err) { + teamRun.status = "failed"; + teamRun.resultSummary = `Plan confirmation failed: ${(err as Error).message}`; + teamRun.time.completed = Date.now(); + return; + } + teamRun.status = "running"; + } + planConfirmed = true; + + orchestrationLog.info( + `[${teamRun.id}] Heavy Brain: dispatching ${mergeResult.tasks.length} tasks (iteration ${iterations})`, + ); + + // Execute tasks and report results incrementally + response = await this.executeAndReportIncrementally( + teamRun, + orchSession.id, + taskExecutor, + onTaskUpdated, + ); + + if (this.isTerminal(response)) { + return; + } + } + } + + if (this.terminal) { + return; + } + + if (this.cancelled) { + await this.finalizeRun(teamRun, "cancelled", "Orchestration was cancelled.", onTaskUpdated); + } else { + await this.failRun( + teamRun, + `Orchestration exceeded maximum iterations (${MAX_ITERATIONS}).`, + onTaskUpdated, + ); + } + } finally { + this.activeRun = null; + this.activeRunningTasks = null; + this.resolveCancelSignal = null; + } + } + + /** + * Execute tasks in parallel and report each result to the orchestrator + * as it completes. User messages (via UserChannel) are prioritized over + * task results. The orchestrator can: + * - dispatch new tasks (added to the running set) + * - signal complete (remaining tasks are cancelled) + * - acknowledge and wait (continueWaiting) + * + * Returns the last orchestrator response for the main loop to parse. + */ + private async executeAndReportIncrementally( + teamRun: OrchestrationRun, + orchSessionId: string, + executor: TaskExecutor, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): Promise { + const running: RunningTasks = new Map(); + this.activeRunningTasks = running; + this.propagateBlockedTasks(teamRun, onTaskUpdated); + this.startReadyTasks(teamRun, running, executor, onTaskUpdated); + + let lastResponse: UnifiedMessage | undefined; + + // Process completions and user messages + while (running.size > 0 && !this.terminal) { + // --- Priority 1: check for buffered user message --- + const pendingUserMsg = this.userChannel.takePending(); + if (pendingUserMsg) { + const response = await this.sendToOrchestrator( + orchSessionId, + formatUserMessage(pendingUserMsg, this.countOpenTasks(teamRun.subtasks)), + ); + if (!response || this.terminal) { + return this.markTerminal(lastResponse ?? this.createSyntheticMessage(orchSessionId)); + } + lastResponse = response; + lastResponse = await this.handleOrchestratorResponse( + teamRun, + orchSessionId, + lastResponse, + running, + onTaskUpdated, + ); + if (this.isTerminal(lastResponse)) return lastResponse; + this.startReadyTasks(teamRun, running, executor, onTaskUpdated); + continue; + } + + // --- Priority 2: race task completions vs user messages --- + type RaceResult = + | { type: "task"; id: string; result: TaskExecutionResult } + | { type: "user"; text: string } + | { type: "cancel" }; + + const taskPromise = raceMap(running).then( + (result): RaceResult => ({ type: "task", ...result }), + ); + const userPromise = this.userChannel.waitForMessage().then( + (text): RaceResult => ({ type: "user", text }), + ); + const cancelPromise = this.cancelSignal.then( + (): RaceResult => ({ type: "cancel" }), + ); + + const winner = await Promise.race([userPromise, taskPromise, cancelPromise]); + + if (winner.type === "cancel") { + return this.markTerminal(lastResponse ?? this.createSyntheticMessage(orchSessionId)); + } + + if (winner.type === "user") { + // User message arrived — send to orchestrator immediately + orchestrationLog.info(`[${teamRun.id}] Human feedback received (${running.size} tasks running)`); + const response = await this.sendToOrchestrator( + orchSessionId, + formatUserMessage(winner.text, this.countOpenTasks(teamRun.subtasks)), + ); + if (!response || this.terminal) { + return this.markTerminal(lastResponse ?? this.createSyntheticMessage(orchSessionId)); + } + lastResponse = response; + lastResponse = await this.handleOrchestratorResponse( + teamRun, + orchSessionId, + lastResponse, + running, + onTaskUpdated, + ); + if (this.isTerminal(lastResponse)) return lastResponse; + this.startReadyTasks(teamRun, running, executor, onTaskUpdated); + continue; + } + + // --- Task completed --- + running.delete(winner.id); + + if (this.terminal) { + continue; + } + + // Update task status + const task = teamRun.subtasks.find((candidate) => candidate.id === winner.id); + if (!task) { + continue; + } + + task.time = { ...task.time, completed: Date.now() }; + task.sessionId = winner.result.sessionId; + if (winner.result.error) { + task.status = "failed"; + task.error = winner.result.error; + task.resultSummary = winner.result.summary; + } else { + task.status = "completed"; + task.resultSummary = winner.result.summary; + } + onTaskUpdated(task); + this.propagateBlockedTasks(teamRun, onTaskUpdated); + + orchestrationLog.info( + `[${teamRun.id}] Task ${winner.id} ${task.status} (${running.size} remaining)`, + ); + + // Send this task's result to orchestrator + const resultMsg = formatSingleTaskResult(task, this.countOpenTasks(teamRun.subtasks)); + const response = await this.sendToOrchestrator(orchSessionId, resultMsg); + if (!response || this.terminal) { + return this.markTerminal(lastResponse ?? this.createSyntheticMessage(orchSessionId)); + } + lastResponse = response; + lastResponse = await this.handleOrchestratorResponse( + teamRun, + orchSessionId, + lastResponse, + running, + onTaskUpdated, + ); + if (this.isTerminal(lastResponse)) return lastResponse; + this.startReadyTasks(teamRun, running, executor, onTaskUpdated); + } + + if (this.terminal) { + return this.markTerminal(lastResponse ?? this.createSyntheticMessage(orchSessionId)); + } + + // All tasks done — send final summary for orchestrator to decide + const summaryText = formatTaskResults(teamRun); + const response = await this.sendToOrchestrator(orchSessionId, summaryText); + if (!response) { + return this.markTerminal(lastResponse ?? this.createSyntheticMessage(orchSessionId)); + } + + return response; + } + + /** + * Send a message to the orchestrator and return the response. + */ + private async sendToOrchestrator( + orchSessionId: string, + text: string, + ): Promise { + const responsePromise = this.engineManager.sendMessage(orchSessionId, [ + { type: "text", text }, + ]); + + const guardedResponse = responsePromise.catch((error) => { + if (this.cancelled || this.terminal) { + orchestrationLog.debug( + `[${orchSessionId}] Ignoring orchestrator response after cancellation: ${error instanceof Error ? error.message : String(error)}`, + ); + return null; + } + throw error; + }); + + const winner = await Promise.race([ + guardedResponse.then((response) => ({ type: "response" as const, response })), + this.cancelSignal.then(() => ({ type: "cancel" as const })), + ]); + + if (winner.type === "cancel") { + return null; + } + + return winner.response; + } + + /** + * Parse an orchestrator response and handle dispatch/complete actions. + * Returns the (possibly updated) lastResponse. Mutates `running` if new + * tasks are dispatched or remaining tasks are cancelled. + */ + private async handleOrchestratorResponse( + teamRun: OrchestrationRun, + orchSessionId: string, + lastResponse: UnifiedMessage, + running: RunningTasks, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): Promise { + const parsed = await this.parseInstruction(teamRun, orchSessionId, lastResponse); + lastResponse = parsed.response; + + if (this.terminal) { + return this.markTerminal(lastResponse); + } + + if (!parsed.ok) { + await this.failRun(teamRun, `Orchestrator output format error: ${parsed.error}`, onTaskUpdated); + running.clear(); + return this.markTerminal(lastResponse); + } + + if (parsed.data.action === "complete") { + await this.completeRun(teamRun, parsed.data.result, onTaskUpdated); + running.clear(); + return this.markTerminal(lastResponse); + } + + if (parsed.data.action === "dispatch") { + const mergeResult = this.mergeDispatchTasks(teamRun, parsed.data.tasks, onTaskUpdated); + if (!mergeResult.ok) { + await this.failRun(teamRun, `Orchestrator task graph error: ${mergeResult.error}`, onTaskUpdated); + running.clear(); + return this.markTerminal(lastResponse); + } + + orchestrationLog.info( + `[${teamRun.id}] Orchestrator dispatched ${mergeResult.tasks.length} new tasks mid-execution`, + ); + } + + return lastResponse; + } + + /** Check if the orchestrator signaled completion */ + private isTerminal(response: UnifiedMessage): boolean { + return !!(response as TerminalMessage)._terminal; + } + + async cancel(): Promise { + if (this.terminal) { + return; + } + + this.cancelled = true; + this.userChannel.dispose(); + this.resolveCancelSignal?.(); + + const activeRun = this.activeRun; + if (!activeRun) { + this.terminal = true; + return; + } + + if (activeRun.teamRun.orchestratorSessionId) { + await this.cancelSession( + activeRun.teamRun.orchestratorSessionId, + `run ${activeRun.teamRun.id} orchestrator`, + ); + } + + await this.finalizeRun( + activeRun.teamRun, + "cancelled", + "Orchestration was cancelled.", + activeRun.onTaskUpdated, + ); + } + + private convertDispatchTasks(tasks: DispatchTask[], defaultWorktreeId?: string): OrchestrationSubtask[] { + return tasks.map((t): OrchestrationSubtask => ({ + id: t.id || `task_${this.nextAutoTaskIndex++}`, + description: t.description, + prompt: t.prompt, + engineType: t.engineType as EngineType | undefined, + dependsOn: t.dependsOn || [], + worktreeId: t.worktreeId ?? defaultWorktreeId, + status: "pending", + })); + } + + private registerAutoApprove(sessionId: string): void { + trackAutoApproveSession(this.autoApproveSessions, sessionId); + } + + private async parseInstruction( + teamRun: OrchestrationRun, + orchSessionId: string, + response: UnifiedMessage, + ): Promise { + const responseText = extractTextFromMessage(response); + let instruction = dispatchSkill.parse(responseText); + + if (instruction.ok) { + return { ok: true, data: instruction.data, response }; + } + + orchestrationLog.warn( + `[${teamRun.id}] Orchestrator response not valid JSON: ${instruction.error}`, + ); + + const correction = dispatchSkill.correctionPrompt(responseText, instruction.error); + const retryResponse = await this.sendToOrchestrator(orchSessionId, correction); + if (!retryResponse) { + return { ok: false, error: "Orchestration was cancelled.", response }; + } + + instruction = dispatchSkill.parse(extractTextFromMessage(retryResponse)); + if (!instruction.ok) { + return { ok: false, error: instruction.error, response: retryResponse }; + } + + return { ok: true, data: instruction.data, response: retryResponse }; + } + + private mergeDispatchTasks( + teamRun: OrchestrationRun, + tasks: DispatchTask[], + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): MergeDispatchTasksResult { + const newTasks = this.convertDispatchTasks(tasks, teamRun.worktreeId); + const validation = this.validateMergedTasks(teamRun.subtasks, newTasks); + if (!validation.ok) { + return validation; + } + + teamRun.subtasks.push(...newTasks); + for (const task of newTasks) { + onTaskUpdated(task); + } + this.propagateBlockedTasks(teamRun, onTaskUpdated); + + return { ok: true, tasks: newTasks }; + } + + private validateMergedTasks( + existingTasks: OrchestrationSubtask[], + newTasks: OrchestrationSubtask[], + ): MergeDispatchTasksResult { + const combined = [...existingTasks, ...newTasks]; + const ids = new Set(); + const errors: string[] = []; + + for (const task of combined) { + if (ids.has(task.id)) { + errors.push(`Duplicate task id '${task.id}'`); + } else { + ids.add(task.id); + } + } + + for (const task of combined) { + for (const depId of task.dependsOn) { + if (!ids.has(depId)) { + errors.push(`Task '${task.id}' dependsOn unknown task '${depId}'`); + } + } + } + + const visited = new Set(); + const inStack = new Set(); + const taskMap = new Map(combined.map((task) => [task.id, task])); + + const hasCycle = (taskId: string): boolean => { + if (inStack.has(taskId)) return true; + if (visited.has(taskId)) return false; + + visited.add(taskId); + inStack.add(taskId); + + const task = taskMap.get(taskId); + if (task) { + for (const depId of task.dependsOn) { + if (hasCycle(depId)) return true; + } + } + + inStack.delete(taskId); + return false; + }; + + for (const taskId of ids) { + if (hasCycle(taskId)) { + errors.push("Circular dependency detected in task graph"); + break; + } + } + + if (errors.length > 0) { + return { ok: false, error: errors.join("; ") }; + } + + return { ok: true, tasks: newTasks }; + } + + private propagateBlockedTasks( + teamRun: OrchestrationRun, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): void { + const blockedTasks = DAGExecutor.propagateFailures(teamRun.subtasks); + for (const task of blockedTasks) { + task.time = { ...task.time, completed: task.time?.completed ?? Date.now() }; + onTaskUpdated(task); + } + } + + private startReadyTasks( + teamRun: OrchestrationRun, + running: RunningTasks, + executor: TaskExecutor, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): void { + const capacity = Math.max(this.maxConcurrentTasks - running.size, 0); + if (capacity === 0) { + return; + } + + const readyTasks = DAGExecutor.findReadyTasks(teamRun.subtasks).slice(0, capacity); + + for (const task of readyTasks) { + const dependencies = task.dependsOn + .map((depId) => teamRun.subtasks.find((candidate) => candidate.id === depId)) + .filter((candidate): candidate is OrchestrationSubtask => candidate != null); + + const upstreamContext = TaskExecutor.buildUpstreamContext(dependencies); + + task.status = "running"; + task.time = { ...task.time, started: Date.now() }; + onTaskUpdated(task); + + let state!: RunningTaskState; + const promise = executor.execute(task, teamRun.parentDirectory ?? teamRun.directory, { + upstreamContext, + defaultWorktreeId: teamRun.worktreeId, + onSessionCreated: (sessionId) => { + state.sessionId = sessionId; + if (this.cancelled || this.terminal) { + void this.cancelSession(sessionId, `task ${task.id}`); + } + }, + shouldCancel: () => this.cancelled || this.terminal, + }); + state = { promise }; + running.set(task.id, state); + } + } + + private countOpenTasks(tasks: OrchestrationSubtask[]): number { + return tasks.filter((task) => task.status === "pending" || task.status === "running").length; + } + + private async completeRun( + teamRun: OrchestrationRun, + result: string, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): Promise { + if (await this.finalizeRun(teamRun, "completed", result, onTaskUpdated)) { + orchestrationLog.info(`[${teamRun.id}] Heavy Brain: orchestrator signaled complete`); + } + } + + private async failRun( + teamRun: OrchestrationRun, + message: string, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): Promise { + if (await this.finalizeRun(teamRun, "failed", message, onTaskUpdated)) { + orchestrationLog.error(`[${teamRun.id}] Heavy Brain: ${message}`); + } + } + + private markTerminal(response: UnifiedMessage): UnifiedMessage { + (response as TerminalMessage)._terminal = true; + return response; + } + + private async finalizeRun( + teamRun: OrchestrationRun, + status: OrchestrationRun["status"], + result: string, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): Promise { + if (this.terminal) { + return false; + } + + this.terminal = true; + this.resolveCancelSignal?.(); + await this.cancelOutstandingTasks(teamRun, onTaskUpdated); + teamRun.status = status; + teamRun.resultSummary = result; + teamRun.time.completed = Date.now(); + return true; + } + + private async cancelOutstandingTasks( + teamRun: OrchestrationRun, + onTaskUpdated: (task: OrchestrationSubtask) => void, + ): Promise { + const cancellationPromises: Promise[] = []; + + for (const task of teamRun.subtasks) { + if (task.status === "running") { + const sessionId = this.activeRunningTasks?.get(task.id)?.sessionId ?? task.sessionId; + if (sessionId) { + cancellationPromises.push(this.cancelSession(sessionId, `task ${task.id}`)); + } + } + + if (task.status !== "pending" && task.status !== "running") { + continue; + } + + task.status = "cancelled"; + task.time = { ...task.time, completed: task.time?.completed ?? Date.now() }; + onTaskUpdated(task); + } + + await Promise.all(cancellationPromises); + } + + private async cancelSession(sessionId: string, label: string): Promise { + if (this.cancelledSessionIds.has(sessionId)) { + return; + } + + this.cancelledSessionIds.add(sessionId); + try { + await this.engineManager.cancelMessage(sessionId); + } catch (error) { + orchestrationLog.warn( + `[${label}] Failed to cancel session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + private createSyntheticMessage(sessionId: string): UnifiedMessage { + return { + id: `team-terminal-${Date.now()}`, + sessionId, + role: "assistant", + time: { created: Date.now(), completed: Date.now() }, + parts: [], + }; + } +} diff --git a/electron/main/services/orchestration/index.ts b/electron/main/services/orchestration/index.ts new file mode 100644 index 00000000..237a1636 --- /dev/null +++ b/electron/main/services/orchestration/index.ts @@ -0,0 +1,614 @@ +// ============================================================================ +// Agent Team Service — Singleton orchestration service +// Manages team runs (Light Brain and Heavy Brain) with auto-approve permissions. +// Follows the same pattern as ScheduledTaskService. +// ============================================================================ + +import { EventEmitter } from "events"; +import fs from "node:fs"; +import path from "node:path"; +import { app } from "electron"; +import { timeId } from "../../utils/id-gen"; +import { orchestrationLog } from "./logger"; +import { LightBrainOrchestrator } from "./light-brain"; +import { HeavyBrainOrchestrator } from "./heavy-brain"; +import type { UserChannel } from "./user-channel"; +import type { EngineManager } from "../../gateway/engine-manager"; +import type { + OrchestrationRun, + OrchestrationCreateRequest, + OrchestrationSubtask, + EngineType, + RoleEngineMapping, + OrchestratorRole, + ConversationMessage, + TextPart, +} from "../../../../src/types/unified"; +import { loadSettings, saveSettings } from "../logger"; +import { conversationStore } from "../conversation-store"; + +const SAVE_DEBOUNCE_MS = 500; +const ROLE_MAPPINGS_SETTING_KEY = "orchestration.roleMappings"; + +/** Default role → engine mapping (inspired by oh-my-opencode-slim agent roles, via PR #117). */ +export const DEFAULT_ROLE_MAPPINGS: RoleEngineMapping[] = [ + { + role: "explorer", + label: "Explorer", + description: "Codebase search, file/symbol location, pattern matching (read-only)", + engineType: "claude", + readOnly: true, + }, + { + role: "researcher", + label: "Researcher", + description: "Documentation research, external resources, library/API investigation (read-only)", + engineType: "claude", + readOnly: true, + }, + { + role: "reviewer", + label: "Reviewer", + description: "Architecture analysis, code review, quality checks (read-only)", + engineType: "claude", + readOnly: true, + }, + { + role: "designer", + label: "Designer", + description: "UI/UX design and implementation, frontend styling, visual components", + engineType: "claude", + }, + { + role: "coder", + label: "Coder", + description: "Code implementation, refactoring, bug fixing, feature development", + engineType: "claude", + }, +]; + +interface OrchestrationRunFileFormat { + version: 1; + runs: OrchestrationRun[]; +} + +/** + * A pending plan-confirmation gate. Light Brain (and optionally Heavy Brain) + * stashes a resolver here when paused at `awaiting-confirmation`. The + * gateway handler for TEAM_CONFIRM_PLAN calls `resolve(tasks)` to unblock it. + */ +interface PendingConfirmation { + resolve: (tasks: OrchestrationSubtask[]) => void; + reject: (reason: unknown) => void; +} + +// --- Event types --- + +export interface OrchestrationServiceEvents { + "orchestration.updated": (data: { run: OrchestrationRun }) => void; + "orchestration.subtask.updated": (data: { runId: string; task: OrchestrationSubtask }) => void; + "orchestration.roleMappings.updated": (data: { mappings: RoleEngineMapping[] }) => void; +} + +export declare interface OrchestrationService { + on(event: K, listener: OrchestrationServiceEvents[K]): this; + off(event: K, listener: OrchestrationServiceEvents[K]): this; + emit(event: K, ...args: Parameters): boolean; +} + +// --- Service --- + +export class OrchestrationService extends EventEmitter { + private engineManager: EngineManager | null = null; + private runs = new Map(); + /** Session IDs created by team runs — auto-approve permissions for these. */ + private autoApproveSessions = new Set(); + /** Run-scoped auto-approve session tracking for deterministic cleanup. */ + private autoApproveSessionsByRun = new Map>(); + /** Active Heavy Brain orchestrators (for cancellation). */ + private activeOrchestrators = new Map(); + /** Active Heavy Brain relay channels for human-in-the-loop messages. */ + private activeRelayChannels = new Map(); + /** Pending plan-confirmation gates keyed by runId. */ + private pendingConfirmations = new Map(); + private saveTimer: ReturnType | null = null; + private initialized = false; + + // --- Lifecycle --- + + init(engineManager: EngineManager): void { + if (this.initialized) return; + this.runs.clear(); + this.autoApproveSessions.clear(); + this.autoApproveSessionsByRun.clear(); + this.activeOrchestrators.clear(); + this.activeRelayChannels.clear(); + this.engineManager = engineManager; + this.loadFromDisk(); + this.subscribePermissionAutoApprove(); + this.initialized = true; + orchestrationLog.info(`Agent Team Service initialized with ${this.runs.size} run(s)`); + } + + async shutdown(): Promise { + // Cancel all running orchestrators + for (const [runId, orchestrator] of this.activeOrchestrators) { + orchestrationLog.info(`Cancelling orchestrator for run ${runId}`); + await orchestrator.cancel(); + } + this.activeOrchestrators.clear(); + this.activeRelayChannels.clear(); + this.autoApproveSessionsByRun.clear(); + this.autoApproveSessions.clear(); + this.flushPendingSave(); + this.initialized = false; + orchestrationLog.info("Agent Team Service shut down"); + } + + // --- Auto-approve (same pattern as ScheduledTaskService) --- + + private subscribePermissionAutoApprove(): void { + if (!this.engineManager) return; + + this.engineManager.on("permission.asked", (data: any) => { + const permission = data.permission ?? data; + const sessionId = permission.sessionId; + if (!sessionId || !this.autoApproveSessions.has(sessionId)) return; + + const acceptOption = permission.options?.find( + (o: any) => + o.type?.includes("accept") || + o.type?.includes("allow") || + o.label?.toLowerCase().includes("allow"), + ); + + if (acceptOption) { + orchestrationLog.info(`Auto-approving permission ${permission.id} for session ${sessionId}`); + this.engineManager!.replyPermission(permission.id, { optionId: acceptOption.id }); + } + }); + } + + // --- CRUD --- + + /** + * Create and start a new team run. + * Returns immediately with the run in "decomposing" status. + * Execution happens asynchronously. + */ + async createRun(req: OrchestrationCreateRequest): Promise { + if (!this.engineManager) { + throw new Error("OrchestrationService not initialized"); + } + + if (Boolean(req.worktreeId) !== Boolean(req.parentDirectory)) { + throw new Error( + "Team runs started from worktree sessions must include both worktreeId and parentDirectory.", + ); + } + + const run: OrchestrationRun = { + id: timeId("team"), + parentSessionId: req.sessionId, + directory: req.directory, + parentDirectory: req.parentDirectory, + worktreeId: req.worktreeId, + teamWorktreeName: req.teamWorktreeInfo?.name, + teamWorktreeDir: req.teamWorktreeInfo?.directory, + roleMappings: req.roleMappings ?? this.getRoleMappings(), + prompt: req.prompt, + engineTypes: req.engineTypes ?? (req.engineType ? [req.engineType] : []), + mode: req.mode, + status: "decomposing", + subtasks: [], + time: { created: Date.now() }, + // Plan confirmation defaults: Light = on, Heavy = off (can be overridden) + requirePlanConfirmation: req.requirePlanConfirmation ?? (req.mode === "light"), + // Default: relay results to parent when we have one + aggregateToParent: req.aggregateToParent ?? true, + }; + + this.runs.set(run.id, run); + this.emitRunUpdated(run); + + // Persist the user's prompt as a user message in the parent session + // so it survives page refresh (the frontend only creates a temp message). + try { + const msgId = timeId("msg"); + const userMessage: ConversationMessage = { + id: msgId, + role: "user", + time: { created: Date.now(), completed: Date.now() }, + parts: [{ + type: "text" as const, + id: `${msgId}_p0`, + messageId: msgId, + sessionId: req.sessionId, + text: req.prompt, + } as TextPart], + }; + await conversationStore.appendMessage(req.sessionId, userMessage); + } catch (err) { + orchestrationLog.warn(`Failed to persist user prompt for run ${run.id}:`, err); + } + + orchestrationLog.info(`Created team run ${run.id} (${run.mode} brain)`); + + // Start execution asynchronously + void this.executeRun(run, req.engineType as EngineType | undefined).catch((err) => { + orchestrationLog.error(`Team run ${run.id} failed:`, err); + run.status = "failed"; + run.resultSummary = `Orchestration error: ${err.message}`; + run.time.completed = Date.now(); + this.emitRunUpdated(run); + }); + + return run; + } + + /** + * Cancel a running team run. + */ + async cancelRun(runId: string): Promise { + const run = this.runs.get(runId); + if (!run) throw new Error(`Team run not found: ${runId}`); + + // Cancel heavy brain orchestrator if active + const orchestrator = this.activeOrchestrators.get(runId); + if (orchestrator) { + await orchestrator.cancel(); + this.activeOrchestrators.delete(runId); + } else { + if (run.orchestratorSessionId) { + try { + await this.engineManager?.cancelMessage(run.orchestratorSessionId); + } catch (error) { + orchestrationLog.warn( + `Failed to cancel orchestrator session ${run.orchestratorSessionId}: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + for (const task of run.subtasks) { + if (task.sessionId && task.status === "running") { + try { + await this.engineManager?.cancelMessage(task.sessionId); + } catch (error) { + orchestrationLog.warn( + `Failed to cancel task session ${task.sessionId}: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + if (task.status === "pending" || task.status === "running") { + task.status = "cancelled"; + task.time = { ...task.time, completed: task.time?.completed ?? Date.now() }; + } + } + + run.status = "cancelled"; + run.resultSummary = "Orchestration was cancelled."; + run.time.completed = Date.now(); + } + + this.cleanupRunRuntimeState(runId); + this.emitRunUpdated(run); + orchestrationLog.info(`Cancelled team run ${runId}`); + } + + /** + * Relay a user message to an active Heavy Brain orchestrator. + */ + sendMessageToRun(runId: string, text: string): void { + const run = this.runs.get(runId); + if (!run) throw new Error(`Team run not found: ${runId}`); + if (run.status !== "running" && run.status !== "decomposing") { + throw new Error(`Team run ${runId} is not active (status: ${run.status})`); + } + if (run.mode !== "heavy") { + throw new Error( + `Relay messaging is only supported for active Heavy Brain runs. Run ${runId} is ${run.mode}.`, + ); + } + + const channel = this.activeRelayChannels.get(runId); + if (!channel) { + throw new Error(`No active Heavy Brain relay channel for run ${runId}`); + } + + channel.send(text); + orchestrationLog.info(`User message sent to run ${runId}`); + } + + listRuns(): OrchestrationRun[] { + return Array.from(this.runs.values()).sort((a, b) => b.time.created - a.time.created); + } + + getRun(runId: string): OrchestrationRun | null { + return this.runs.get(runId) ?? null; + } + + // --- Plan confirmation --- + + /** + * Register a pending plan-confirmation gate for a run. Called by + * LightBrainOrchestrator (and optionally HeavyBrainOrchestrator) when it + * reaches the `awaiting-confirmation` phase. Returns a promise that + * resolves with the user-approved task list. + */ + awaitPlanConfirmation(runId: string): Promise { + // Notify the frontend that the run has transitioned to "confirming" status + // so the UI can show the plan confirmation card with approve/cancel buttons. + const run = this.runs.get(runId); + if (run) { + this.emitRunUpdated(run); + } + return new Promise((resolve, reject) => { + this.pendingConfirmations.set(runId, { resolve, reject }); + }); + } + + /** + * Resolve a pending plan-confirmation gate. Called by the gateway when the + * user confirms / edits the plan via TEAM_CONFIRM_PLAN. + */ + confirmPlan(runId: string, tasks: OrchestrationSubtask[]): void { + const gate = this.pendingConfirmations.get(runId); + if (!gate) { + throw new Error(`No pending plan confirmation for run ${runId}`); + } + this.pendingConfirmations.delete(runId); + gate.resolve(tasks); + orchestrationLog.info(`Plan confirmed for run ${runId} with ${tasks.length} task(s)`); + } + + private rejectPendingConfirmation(runId: string, reason: string): void { + const gate = this.pendingConfirmations.get(runId); + if (!gate) return; + this.pendingConfirmations.delete(runId); + gate.reject(new Error(reason)); + } + + // --- Role mappings --- + + getRoleMappings(): RoleEngineMapping[] { + const stored = loadSettings()[ROLE_MAPPINGS_SETTING_KEY]; + if (Array.isArray(stored) && stored.length > 0) { + // Validate role field exists + return stored.filter((m: any): m is RoleEngineMapping => + m && typeof m === "object" && typeof m.role === "string" && typeof m.engineType === "string" + ); + } + return DEFAULT_ROLE_MAPPINGS.map((m) => ({ ...m })); + } + + updateRoleMappings(mappings: RoleEngineMapping[]): RoleEngineMapping[] { + saveSettings({ [ROLE_MAPPINGS_SETTING_KEY]: mappings }); + orchestrationLog.info(`Updated role mappings (${mappings.length} roles)`); + this.emit("orchestration.roleMappings.updated", { mappings }); + return mappings; + } + + resolveRole(role: OrchestratorRole, fallback: EngineType): { engineType: EngineType; modelId?: string } { + const mapping = this.getRoleMappings().find((m) => m.role === role); + if (mapping) { + return { engineType: mapping.engineType, modelId: mapping.modelId }; + } + return { engineType: fallback }; + } + + // --- Execution --- + + private async executeRun(run: OrchestrationRun, orchestratorEngineType?: EngineType): Promise { + const onTaskUpdated = (task: OrchestrationSubtask) => { + this.emit("orchestration.subtask.updated", { runId: run.id, task }); + this.emitRunUpdated(run); + }; + + const resolvedEngine = orchestratorEngineType ?? this.engineManager!.getDefaultEngineType(); + const registerAutoApproveSession = (sessionId: string) => { + this.registerAutoApproveSession(run.id, sessionId); + }; + const resolveRole = (role: OrchestratorRole) => this.resolveRole(role, resolvedEngine); + const awaitPlanConfirmation = (runId: string) => this.awaitPlanConfirmation(runId); + + try { + if (run.mode === "light") { + const orchestrator = new LightBrainOrchestrator( + this.engineManager!, + registerAutoApproveSession, + resolveRole, + awaitPlanConfirmation, + ); + await orchestrator.run(run, onTaskUpdated, resolvedEngine); + } else { + const orchestrator = new HeavyBrainOrchestrator( + this.engineManager!, + registerAutoApproveSession, + resolveRole, + awaitPlanConfirmation, + ); + this.activeOrchestrators.set(run.id, orchestrator); + this.activeRelayChannels.set(run.id, orchestrator.userChannel); + await orchestrator.run(run, resolvedEngine, onTaskUpdated); + } + await this.relayResultsToParentSession(run); + } finally { + this.cleanupRunRuntimeState(run.id); + this.emitRunUpdated(run); + } + } + + /** + * Send the aggregated run result back to the parent session so the parent + * engine can summarize it for the user. Silently no-ops if there is no + * parent session, no result, or the engine send fails. + * + * Gated by `run.aggregateToParent` (defaults to true when a parentSessionId + * exists, preserving the original single-session UX). + */ + private async relayResultsToParentSession(run: OrchestrationRun): Promise { + if (!run.parentSessionId || !this.engineManager) return; + if (run.aggregateToParent === false) return; + if (run.status !== "completed" && run.status !== "failed") return; + if (!run.resultSummary) return; + + const failed = run.subtasks.filter((t) => t.status === "failed"); + const failedSection = failed.length > 0 + ? `\n\nFailed tasks:\n${failed.map((t) => `- ${t.description}: ${t.error ?? "unknown error"}`).join("\n")}` + : ""; + + const header = run.status === "completed" + ? "The agent team has completed. Here are the results from each task:" + : "The agent team finished with failures. Partial results:"; + + const prompt = `${header}\n\n${run.resultSummary}${failedSection}\n\nPlease provide a concise summary of what was accomplished${failed.length > 0 ? ", any issues encountered," : ""} and suggested next steps if applicable.`; + + try { + await this.engineManager.sendMessage( + run.parentSessionId, + [{ type: "text", text: prompt }], + { internal: true }, + ); + orchestrationLog.info(`[${run.id}] Relayed aggregated results to parent session ${run.parentSessionId}`); + } catch (err) { + orchestrationLog.warn(`[${run.id}] Failed to relay results to parent session:`, err); + } + } + + private registerAutoApproveSession(runId: string, sessionId: string): void { + this.autoApproveSessions.add(sessionId); + + const runSessions = this.autoApproveSessionsByRun.get(runId) ?? new Set(); + runSessions.add(sessionId); + this.autoApproveSessionsByRun.set(runId, runSessions); + } + + private unregisterAutoApproveSessions(runId: string): void { + const runSessions = this.autoApproveSessionsByRun.get(runId); + if (!runSessions) return; + + for (const sessionId of runSessions) { + this.autoApproveSessions.delete(sessionId); + } + this.autoApproveSessionsByRun.delete(runId); + } + + private cleanupRunRuntimeState(runId: string): void { + this.activeOrchestrators.delete(runId); + this.activeRelayChannels.delete(runId); + this.rejectPendingConfirmation(runId, "Run ended before plan was confirmed"); + this.unregisterAutoApproveSessions(runId); + } + + private getFilePath(): string { + return path.join(app.getPath("userData"), "orchestration-runs.json"); + } + + private loadFromDisk(): void { + const filePath = this.getFilePath(); + + try { + if (!fs.existsSync(filePath)) { + orchestrationLog.info("No orchestration-runs.json found, starting empty"); + return; + } + + const raw = fs.readFileSync(filePath, "utf-8"); + const data = JSON.parse(raw) as OrchestrationRunFileFormat; + + if (data.version !== 1 || !Array.isArray(data.runs)) { + orchestrationLog.warn("Invalid orchestration-runs.json format, ignoring"); + return; + } + + for (const run of data.runs) { + this.runs.set(run.id, run); + } + + const recoveredCount = this.recoverInterruptedRuns(); + if (recoveredCount > 0) { + this.writeToDisk(); + } + + orchestrationLog.info(`Loaded ${data.runs.length} team run(s) from disk`); + } catch (error) { + orchestrationLog.error("Failed to load orchestration-runs.json:", error); + } + } + + private recoverInterruptedRuns(): number { + let recoveredCount = 0; + const completionTime = Date.now(); + + for (const run of this.runs.values()) { + if (run.status !== "decomposing" && run.status !== "running") { + continue; + } + + recoveredCount += 1; + run.status = "failed"; + run.resultSummary = "Agent Team run was interrupted because CodeMux restarted before it completed."; + run.time.completed = completionTime; + + for (const task of run.subtasks) { + if (task.status !== "pending" && task.status !== "running") { + continue; + } + + task.status = "cancelled"; + task.time = { ...task.time, completed: task.time?.completed ?? completionTime }; + } + } + + return recoveredCount; + } + + private writeToDisk(): void { + const filePath = this.getFilePath(); + const data: OrchestrationRunFileFormat = { + version: 1, + runs: this.listRuns(), + }; + + try { + const dir = path.dirname(filePath); + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + + const tmpPath = `${filePath}.tmp`; + fs.writeFileSync(tmpPath, JSON.stringify(data, null, 2), "utf-8"); + fs.renameSync(tmpPath, filePath); + } catch (error) { + orchestrationLog.error("Failed to write orchestration-runs.json:", error); + } + } + + private scheduleSave(): void { + if (this.saveTimer) { + clearTimeout(this.saveTimer); + } + + this.saveTimer = setTimeout(() => { + this.saveTimer = null; + this.writeToDisk(); + }, SAVE_DEBOUNCE_MS); + } + + private flushPendingSave(): void { + if (!this.saveTimer) return; + + clearTimeout(this.saveTimer); + this.saveTimer = null; + this.writeToDisk(); + } + + private emitRunUpdated(run: OrchestrationRun): void { + this.scheduleSave(); + this.emit("orchestration.updated", { run }); + } +} + +/** Singleton instance */ +export const orchestrationService = new OrchestrationService(); diff --git a/electron/main/services/orchestration/light-brain.ts b/electron/main/services/orchestration/light-brain.ts new file mode 100644 index 00000000..f2850800 --- /dev/null +++ b/electron/main/services/orchestration/light-brain.ts @@ -0,0 +1,177 @@ +// ============================================================================ +// Light Brain — Deterministic DAG orchestration +// One LLM call to generate the DAG, then a Node.js state machine executes it. +// ============================================================================ + +import type { EngineManager } from "../../gateway/engine-manager"; +import type { OrchestrationSubtask, OrchestrationRun, EngineType, EngineInfo } from "../../../../src/types/unified"; +import { DAGExecutor } from "./dag-executor"; +import { + TaskExecutor, + extractTextFromMessage, + trackAutoApproveSession, + type AutoApproveSessionTracker, + type RoleResolver, +} from "./task-executor"; +import { dagPlanningSkill, executeWithSkill, type RawTaskNode } from "./skills"; +import { buildPlanningPrompt } from "./prompts"; +import { orchestrationLog } from "./logger"; + +export class LightBrainOrchestrator { + constructor( + private engineManager: EngineManager, + private autoApproveSessions: AutoApproveSessionTracker, + private resolveRole?: RoleResolver, + private awaitPlanConfirmation?: (runId: string) => Promise, + ) {} + + /** + * Run Light Brain orchestration: + * 1. Planning call (LLM generates DAG) + * 2. Deterministic execution (DAG state machine) + */ + async run( + teamRun: OrchestrationRun, + onTaskUpdated: (task: OrchestrationSubtask) => void, + plannerEngineType?: EngineType, + ): Promise { + const defaultEngineType = this.engineManager.getDefaultEngineType(); + const resolvedPlannerEngineType = plannerEngineType ?? defaultEngineType; + + // --- Phase 1: Planning --- + teamRun.status = "decomposing"; + orchestrationLog.info( + `[${teamRun.id}] Light Brain: planning phase using ${resolvedPlannerEngineType}`, + ); + + const engines = this.engineManager.listEngines(); + const tasks = await this.generateDAG(teamRun, resolvedPlannerEngineType, engines); + + // Convert raw tasks to TaskNodes + teamRun.subtasks = tasks.map((raw): OrchestrationSubtask => ({ + id: raw.id, + description: raw.description, + prompt: raw.prompt, + engineType: raw.engineType as EngineType | undefined, + dependsOn: raw.dependsOn, + worktreeId: raw.worktreeId ?? teamRun.worktreeId, + status: "pending", + })); + + orchestrationLog.info(`[${teamRun.id}] Light Brain: DAG generated with ${teamRun.subtasks.length} tasks`); + + // --- Phase 1.5: Plan confirmation (optional) --- + if (teamRun.requirePlanConfirmation && this.awaitPlanConfirmation) { + teamRun.status = "confirming"; + orchestrationLog.info(`[${teamRun.id}] Light Brain: awaiting user plan confirmation`); + try { + const confirmedTasks = await this.awaitPlanConfirmation(teamRun.id); + // Replace tasks with user-approved (possibly edited) version. + teamRun.subtasks = confirmedTasks.map((t): OrchestrationSubtask => ({ + ...t, + status: "pending", + worktreeId: t.worktreeId ?? teamRun.worktreeId, + })); + orchestrationLog.info(`[${teamRun.id}] Light Brain: plan confirmed (${teamRun.subtasks.length} tasks)`); + } catch (err) { + teamRun.status = "failed"; + teamRun.resultSummary = `Plan confirmation failed: ${(err as Error).message}`; + teamRun.time.completed = Date.now(); + return; + } + } + + // --- Phase 2: Execution --- + teamRun.status = "running"; + + const taskExecutor = new TaskExecutor( + this.engineManager, + this.autoApproveSessions, + defaultEngineType, + this.resolveRole, + ); + const dagExecutor = new DAGExecutor( + taskExecutor, + teamRun.parentDirectory ?? teamRun.directory, + ); + + // Forward task update events + dagExecutor.on("task.updated", ({ task }) => onTaskUpdated(task)); + + await dagExecutor.executeReadyTasks(teamRun); + + // --- Phase 3: Determine final status --- + if (DAGExecutor.isAllSuccessful(teamRun.subtasks)) { + teamRun.status = "completed"; + teamRun.resultSummary = this.synthesizeResult(teamRun); + } else { + teamRun.status = "failed"; + const failed = teamRun.subtasks.filter((t) => t.status === "failed"); + teamRun.resultSummary = `${failed.length} task(s) failed: ${failed.map((t) => t.description).join(", ")}`; + } + + teamRun.time.completed = Date.now(); + orchestrationLog.info(`[${teamRun.id}] Light Brain: ${teamRun.status}`); + } + + /** + * Generate the task DAG using a planning LLM call with dagPlanningSkill. + */ + private async generateDAG( + teamRun: OrchestrationRun, + plannerEngineType: EngineType, + engines: EngineInfo[], + ): Promise { + // Create a temporary planning session + // Inject format spec + planner role as system-level prompt for engines + // that support it (e.g. Copilot). Also sent as user message for compatibility. + const prompt = buildPlanningPrompt( + teamRun.prompt, + engines, + teamRun.directory, + ); + const systemPrompt = `${dagPlanningSkill.formatPrompt}\n\n---\n\n${prompt}`; + + const planSession = await this.engineManager.createSession( + plannerEngineType, + teamRun.parentDirectory ?? teamRun.directory, + teamRun.worktreeId, + { systemPrompt }, + ); + + // Register for auto-approve + trackAutoApproveSession(this.autoApproveSessions, planSession.id); + + // Execute with skill (includes format spec + self-check + retry) + const sendMessage = async (text: string): Promise => { + const msg = await this.engineManager.sendMessage(planSession.id, [ + { type: "text", text }, + ]); + orchestrationLog.info(`[${teamRun.id}] sendMessage returned: role=${msg.role}, parts=${JSON.stringify(msg.parts?.map(p => ({ type: p.type, textLen: (p as any).text?.length })))}`); + const extracted = extractTextFromMessage(msg); + orchestrationLog.info(`[${teamRun.id}] extractTextFromMessage: ${extracted.length} chars`); + if (extracted.length === 0) { + orchestrationLog.warn(`[${teamRun.id}] Empty text! Full message: ${JSON.stringify(msg).slice(0, 1000)}`); + } + return extracted; + }; + + const result = await executeWithSkill(sendMessage, prompt, dagPlanningSkill, 1, orchestrationLog); + + if (!result.ok) { + throw new Error(`DAG planning failed: ${result.error}`); + } + + return result.data.tasks; + } + + /** + * Simple synthesis: concatenate all task results. + */ + private synthesizeResult(teamRun: OrchestrationRun): string { + const results = teamRun.subtasks + .filter((t) => t.status === "completed" && t.resultSummary) + .map((t) => `[${t.description}]: ${t.resultSummary}`); + return results.join("\n\n"); + } +} diff --git a/electron/main/services/orchestration/logger.ts b/electron/main/services/orchestration/logger.ts new file mode 100644 index 00000000..45d95d57 --- /dev/null +++ b/electron/main/services/orchestration/logger.ts @@ -0,0 +1,2 @@ +// Re-export the agent-team logger scope from the shared logger module +export { orchestrationLog } from "../logger"; diff --git a/electron/main/services/orchestration/prompts.ts b/electron/main/services/orchestration/prompts.ts new file mode 100644 index 00000000..eaf5687e --- /dev/null +++ b/electron/main/services/orchestration/prompts.ts @@ -0,0 +1,175 @@ +// ============================================================================ +// Prompt Templates — Engine-agnostic prompts for agent team orchestration +// ============================================================================ + +import type { EngineInfo, OrchestrationSubtask, OrchestrationRun } from "../../../../src/types/unified"; + +/** + * Format available engines list for inclusion in prompts. + */ +export function formatEngineList(engines: EngineInfo[]): string { + const running = engines.filter((e) => e.status === "running"); + if (running.length === 0) return "No engines currently available."; + + return running + .map((e) => `- ${e.type}: ${e.name}${e.version ? ` v${e.version}` : ""}`) + .join("\n"); +} + +/** + * Build the planning prompt for Light Brain DAG generation. + * The dagPlanningSkill.formatPrompt is prepended by executeWithSkill(). + */ +export function buildPlanningPrompt( + userRequest: string, + engines: EngineInfo[], + directory: string, +): string { + return `You are a **task decomposition agent**. Your only job is to analyze the user's request and output a JSON task plan. You do NOT execute any tasks yourself. You do NOT spawn subagents. + +An **external orchestration system** will read your JSON output, create separate sessions on other machines, and run each task independently. + +## Important +- You are producing a **plan**, not executing it. Do not attempt to carry out any tasks or spawn subagents. +- Your output will be machine-parsed. The external system creates separate agent sessions for each task and sends them the prompt you write. +- You may use tools to explore the project structure if that helps you write better task prompts, but your **final answer** must be the JSON task plan described in the Output Format above. + +## Available Worker Engines +${formatEngineList(engines)} + +## Project Directory +${directory} + +## Planning Guidelines +- Break complex tasks into smaller, focused subtasks +- Use dependsOn to express task ordering (parallel tasks have no dependency) +- Each task's prompt must be self-contained — the worker agent cannot see other tasks or the original request +- Include enough context in each prompt for the worker to succeed independently +- If a task produces output needed by a downstream task, mention what the downstream task should expect in its prompt +- Choose engines based on their strengths if applicable, or omit engineType to use the default + +## User Request +${userRequest}`; +} + +/** + * Build the initial prompt for Heavy Brain orchestrator. + * The dispatchSkill.formatPrompt is prepended by executeWithSkill(). + */ +export function buildOrchestratorPrompt( + userRequest: string, + engines: EngineInfo[], + directory: string, +): string { + return `You are a **task decomposition and review agent**. Your only job is to break down work into subtasks by outputting JSON, then review results returned to you. + +You do NOT execute any tasks yourself. You do NOT spawn subagents. You do NOT call tools to perform the work. An **external orchestration system** reads the JSON you output, creates separate sessions on other machines, and runs each task independently. You will receive the results as your next message. + +## Your Workflow +1. Analyze the user request — you may use tools (read files, search code) to understand the project and write better task descriptions +2. Output a JSON block describing subtasks (see Communication Protocol above) — the external system handles execution +3. Results arrive incrementally — you receive each task's result as a separate message as soon as it completes +4. After each result, you can: output a "dispatch" JSON to add more tasks, output a "complete" JSON to finish early, or output "continueWaiting" to wait for more results +5. You may receive **human feedback** at any time — it takes priority over task results. Read it carefully and adjust your plan accordingly. +6. After all tasks finish, decide: dispatch another round of tasks, or output "complete" with a summary + +## Important Constraints +- **Never do the work yourself** — your role is solely to decide WHAT needs to be done and write task descriptions +- **Never spawn subagents or delegate via tools** — only output JSON; the external system handles all execution +- Your JSON output is machine-parsed, not human-read + +## Available Worker Engines +${formatEngineList(engines)} + +## Project Directory +${directory} + +## Task Design Guidelines +- Each task runs in an isolated session with no shared context — include all necessary information in the task prompt +- If a worker fails, you can retry with a modified prompt or different engine +- When all work is done, use the "complete" action with a comprehensive summary + +## User Request +${userRequest}`; +} + +/** + * Format task results for injection back into the orchestrator (Heavy Brain). + */ +export function formatTaskResults(run: OrchestrationRun): string { + const lines: string[] = ["## Task Execution Results\n"]; + + for (const task of run.subtasks) { + if (task.status === "completed") { + const duration = task.time?.started && task.time?.completed + ? `${((task.time.completed - task.time.started) / 1000).toFixed(1)}s` + : ""; + lines.push(`### ${task.id}: "${task.description}" [COMPLETED]${duration ? ` (${duration})` : ""}`); + lines.push(task.resultSummary || "(no output)"); + lines.push(""); + } else if (task.status === "failed") { + lines.push(`### ${task.id}: "${task.description}" [FAILED]`); + lines.push(`Error: ${task.error || "unknown error"}`); + lines.push(""); + } else if (task.status === "blocked") { + lines.push(`### ${task.id}: "${task.description}" [BLOCKED]`); + lines.push(`Blocked by failed upstream task.`); + lines.push(""); + } + } + + const completed = run.subtasks.filter((t) => t.status === "completed").length; + const failed = run.subtasks.filter((t) => t.status === "failed").length; + const total = run.subtasks.length; + lines.push(`---\n${completed}/${total} tasks completed${failed > 0 ? `, ${failed} failed` : ""}. What would you like to do next?`); + + return lines.join("\n"); +} + +/** + * Format a single completed task result for incremental reporting (Heavy Brain). + */ +export function formatSingleTaskResult(task: OrchestrationSubtask, remainingCount: number): string { + const duration = task.time?.started && task.time?.completed + ? ` (${((task.time.completed - task.time.started) / 1000).toFixed(1)}s)` + : ""; + + const lines: string[] = []; + + if (task.status === "completed") { + lines.push(`## Task Completed: ${task.id}${duration}`); + lines.push(`**${task.description}**\n`); + lines.push(task.resultSummary || "(no output)"); + } else if (task.status === "failed") { + lines.push(`## Task Failed: ${task.id}${duration}`); + lines.push(`**${task.description}**\n`); + lines.push(`Error: ${task.error || "unknown error"}`); + } + + lines.push(""); + if (remainingCount > 0) { + lines.push(`---\n${remainingCount} task(s) are still pending or running. You may output a JSON block to dispatch new tasks or mark complete, or just acknowledge to wait for more results.`); + } else { + lines.push(`---\nAll tasks have finished. Output a JSON block: dispatch more tasks, or mark complete with a summary.`); + } + + return lines.join("\n"); +} + +/** + * Format a human feedback message for injection into the orchestrator (Heavy Brain). + * Human feedback has higher priority than task results. + */ +export function formatUserMessage(text: string, remainingTasks: number): string { + const lines = [ + `## Human Feedback`, + ``, + text, + ``, + `---`, + remainingTasks > 0 + ? `${remainingTasks} task(s) are still pending or running. Respond with a JSON block.` + : `No tasks are currently pending or running. Respond with a JSON block.`, + ]; + return lines.join("\n"); +} diff --git a/electron/main/services/orchestration/skills.ts b/electron/main/services/orchestration/skills.ts new file mode 100644 index 00000000..59e31be5 --- /dev/null +++ b/electron/main/services/orchestration/skills.ts @@ -0,0 +1,457 @@ +// ============================================================================ +// Structured Output Skills — format spec + self-check + parser bundles +// Injected into agent prompts so agents self-validate before outputting. +// Parser is a safety net; correctionPrompt handles the rare parse failure. +// ============================================================================ + +// --- Skill Framework --- + +/** + * A structured output skill bundles format specification, parser, and + * correction logic for a specific JSON output format. + * + * The formatPrompt is injected into the agent's prompt so it knows the + * expected schema and self-checks before outputting. The parser extracts + * the typed result from raw LLM text. On parse failure, correctionPrompt + * generates a follow-up message to let the agent fix its output in-place + * (same session, no new session needed). + */ +export interface StructuredOutputSkill { + /** Skill name for logging */ + name: string; + + /** Prompt instructions injected into the session: format spec + self-check checklist */ + formatPrompt: string; + + /** Parse LLM text output into T. Returns null on failure with error detail. */ + parse(text: string): { ok: true; data: T } | { ok: false; error: string }; + + /** Generate a correction prompt when parse fails */ + correctionPrompt(rawText: string, error: string): string; +} + +// --- JSON Extraction Utility --- + +/** + * Extract JSON objects/arrays from LLM text output. + * Handles: ```json fenced blocks, bare JSON, and markdown-wrapped JSON. + */ +export function extractJsonBlocks(text: string): string[] { + const blocks: string[] = []; + + // Strategy: find top-level JSON objects/arrays by bracket-balanced scanning. + // This handles JSON values that contain ``` fences (e.g. prompts with code blocks) + // which break naive regex-based fence matching. + for (let i = 0; i < text.length; i++) { + const ch = text[i]; + if (ch !== "{" && ch !== "[") continue; + + const close = ch === "{" ? "}" : "]"; + let depth = 1; + let inString = false; + let escaped = false; + let j = i + 1; + + for (; j < text.length && depth > 0; j++) { + const c = text[j]; + if (escaped) { + escaped = false; + continue; + } + if (c === "\\") { + if (inString) escaped = true; + continue; + } + if (c === '"') { + inString = !inString; + continue; + } + if (inString) continue; + if (c === ch) depth++; + else if (c === close) depth--; + } + + if (depth === 0) { + blocks.push(text.slice(i, j)); + i = j - 1; // skip past this block + } + } + + return blocks; +} + +/** + * Try to parse the first valid JSON from extracted blocks. + */ +export function parseFirstJson(text: string): { ok: true; data: T } | { ok: false; error: string } { + const blocks = extractJsonBlocks(text); + if (blocks.length === 0) { + return { ok: false, error: "No JSON block found in output. Expected a ```json code block." }; + } + + const errors: string[] = []; + for (const block of blocks) { + try { + const parsed = JSON.parse(block) as T; + return { ok: true, data: parsed }; + } catch (e) { + errors.push(`JSON parse error: ${(e as Error).message}`); + } + } + + return { ok: false, error: errors.join("; ") }; +} + +// --- DAG Planning Skill (Light Brain) --- + +/** Raw task node as output by the planning LLM */ +export interface RawTaskNode { + id: string; + description: string; + prompt: string; + dependsOn: string[]; + engineType?: string; + worktreeId?: string; +} + +interface DagPlanOutput { + tasks: RawTaskNode[]; +} + +function validateDagPlan(data: unknown): { ok: true; data: DagPlanOutput } | { ok: false; error: string } { + if (!data || typeof data !== "object") { + return { ok: false, error: "Expected a JSON object with a 'tasks' array." }; + } + + const obj = data as Record; + if (!Array.isArray(obj.tasks)) { + return { ok: false, error: "Missing 'tasks' array in output." }; + } + + const tasks = obj.tasks as unknown[]; + if (tasks.length === 0) { + return { ok: false, error: "Task list is empty. At least one task is required." }; + } + + const ids = new Set(); + const errors: string[] = []; + + for (let i = 0; i < tasks.length; i++) { + const t = tasks[i] as Record; + if (!t.id || typeof t.id !== "string") { + errors.push(`Task [${i}]: missing or invalid 'id'`); + continue; + } + if (ids.has(t.id)) { + errors.push(`Task [${i}]: duplicate id '${t.id}'`); + } + ids.add(t.id); + + if (!t.description || typeof t.description !== "string") { + errors.push(`Task '${t.id}': missing 'description'`); + } + if (!t.prompt || typeof t.prompt !== "string") { + errors.push(`Task '${t.id}': missing 'prompt'`); + } + if (!Array.isArray(t.dependsOn)) { + errors.push(`Task '${t.id}': missing 'dependsOn' array`); + } + if (t.worktreeId != null && typeof t.worktreeId !== "string") { + errors.push(`Task '${t.id}': invalid 'worktreeId'`); + } + } + + if (errors.length > 0) { + return { ok: false, error: errors.join("; ") }; + } + + // Validate dependency references + for (const t of tasks as RawTaskNode[]) { + for (const dep of t.dependsOn) { + if (!ids.has(dep)) { + errors.push(`Task '${t.id}': dependsOn references unknown task '${dep}'`); + } + } + } + + // Check for circular dependencies + const visited = new Set(); + const inStack = new Set(); + const taskMap = new Map((tasks as RawTaskNode[]).map((t) => [t.id, t])); + + function hasCycle(id: string): boolean { + if (inStack.has(id)) return true; + if (visited.has(id)) return false; + visited.add(id); + inStack.add(id); + const task = taskMap.get(id); + if (task) { + for (const dep of task.dependsOn) { + if (hasCycle(dep)) return true; + } + } + inStack.delete(id); + return false; + } + + for (const id of ids) { + if (hasCycle(id)) { + errors.push("Circular dependency detected in task DAG"); + break; + } + } + + // Check for at least one root task + const hasRoot = (tasks as RawTaskNode[]).some((t) => t.dependsOn.length === 0); + if (!hasRoot) { + errors.push("No root task found (at least one task must have dependsOn: [])"); + } + + if (errors.length > 0) { + return { ok: false, error: errors.join("; ") }; + } + + return { ok: true, data: { tasks: tasks as RawTaskNode[] } }; +} + +export const dagPlanningSkill: StructuredOutputSkill = { + name: "dag-planning", + + formatPrompt: ` +## Output Format Requirements + +Your **final answer** MUST be a single JSON code block with the following schema. This JSON will be parsed by an external orchestration system — it is not for human reading. + +\`\`\`json +{ + "tasks": [ + { + "id": "string (unique, e.g. t1, t2)", + "description": "string (1-sentence summary of the task)", + "prompt": "string (detailed, self-contained instructions for the worker agent)", + "dependsOn": ["array of task IDs this task depends on, use [] if none"], + "engineType": "optional string: claude | copilot | opencode (omit to use default)", + "worktreeId": "optional string: existing worktree name for isolated file changes" + } + ] + } +\`\`\` + +## Self-Check Before Outputting (MANDATORY) + +Before writing the JSON block, verify ALL of the following: +1. JSON syntax is valid (balanced braces, proper quoting, no trailing commas) +2. Every task ID is unique +3. Every ID referenced in dependsOn exists in the task list +4. No circular dependency chains (e.g. A depends on B, B depends on A) +5. Each prompt is self-contained — the worker agent CANNOT see other tasks or the original request +6. At least one task has dependsOn: [] (the DAG must have a root) +7. Your final answer is ONLY the JSON block — no additional text before or after +`.trim(), + + parse(text: string) { + const jsonResult = parseFirstJson(text); + if (!jsonResult.ok) return jsonResult; + return validateDagPlan(jsonResult.data); + }, + + correctionPrompt(rawText: string, error: string) { + return ( + `Your previous output had a format error:\n${error}\n\n` + + `Please output ONLY the corrected JSON block following the schema above. ` + + `Do not include any explanation — just the valid JSON.` + ); + }, +}; + +// --- Dispatch Skill (Heavy Brain) --- + +export interface DispatchTask { + id: string; + description: string; + prompt: string; + engineType?: string; + dependsOn?: string[]; + worktreeId?: string; +} + +export type DispatchInstruction = + | { action: "dispatch"; tasks: DispatchTask[] } + | { action: "complete"; result: string } + | { action: "continueWaiting" }; + +function validateDispatchInstruction( + data: unknown, +): { ok: true; data: DispatchInstruction } | { ok: false; error: string } { + if (!data || typeof data !== "object") { + return { ok: false, error: "Expected a JSON object with 'action' field." }; + } + + const obj = data as Record; + + if (obj.action === "complete") { + if (!obj.result || typeof obj.result !== "string") { + return { ok: false, error: "action 'complete' requires a 'result' string." }; + } + return { ok: true, data: { action: "complete", result: obj.result } }; + } + + if (obj.action === "continueWaiting") { + return { ok: true, data: { action: "continueWaiting" } }; + } + + if (obj.action === "dispatch") { + if (!Array.isArray(obj.tasks) || obj.tasks.length === 0) { + return { ok: false, error: "action 'dispatch' requires a non-empty 'tasks' array." }; + } + + const errors: string[] = []; + const ids = new Set(); + for (let i = 0; i < obj.tasks.length; i++) { + const t = obj.tasks[i] as Record; + if (!t.id || typeof t.id !== "string") { + errors.push(`Task [${i}]: missing 'id'`); + continue; + } + if (ids.has(t.id)) { + errors.push(`Task [${i}]: duplicate id '${t.id}'`); + } + ids.add(t.id); + if (!t.description || typeof t.description !== "string") { + errors.push(`Task '${t.id}': missing 'description'`); + } + if (!t.prompt || typeof t.prompt !== "string") { + errors.push(`Task '${t.id}': missing 'prompt'`); + } + if (t.worktreeId != null && typeof t.worktreeId !== "string") { + errors.push(`Task '${t.id}': invalid 'worktreeId'`); + } + } + + if (errors.length > 0) { + return { ok: false, error: errors.join("; ") }; + } + + return { + ok: true, + data: { + action: "dispatch", + tasks: obj.tasks as DispatchTask[], + }, + }; + } + + return { + ok: false, + error: `Unknown action '${String(obj.action)}'. Expected 'dispatch', 'complete', or 'continue'.`, + }; +} + +export const dispatchSkill: StructuredOutputSkill = { + name: "orchestrator-dispatch", + + formatPrompt: ` +## Communication Protocol + +You communicate your decisions via JSON code blocks. This JSON is parsed by an external orchestration system — it is not for human reading. Every response MUST be a single JSON block with one of these actions: + +### 1. Dispatch new tasks: +\`\`\`json +{ + "action": "dispatch", + "tasks": [ + { + "id": "unique_id", + "description": "1-sentence summary", + "prompt": "detailed, self-contained instructions for the worker agent", + "engineType": "optional: claude | copilot | opencode", + "dependsOn": ["optional array of already-known task IDs"], + "worktreeId": "optional string: existing worktree name for isolated file changes" + } + ] + } +\`\`\` + +### 2. Mark orchestration as complete: +\`\`\`json +{ + "action": "complete", + "result": "Summary of everything accomplished by the team..." +} +\`\`\` + +### 3. Acknowledge and wait for more results: +\`\`\`json +{ + "action": "continueWaiting" +} +\`\`\` + +## Self-Check Before Outputting (MANDATORY) + +1. JSON syntax is valid +2. action is "dispatch", "complete", or "continueWaiting" +3. If dispatch: every task has id, description, and a detailed self-contained prompt +4. If complete: result contains a meaningful summary of all work done +5. Your response is ONLY the JSON block — no additional text before or after +`.trim(), + + parse(text: string) { + const jsonResult = parseFirstJson(text); + if (!jsonResult.ok) return jsonResult; + return validateDispatchInstruction(jsonResult.data); + }, + + correctionPrompt(rawText: string, error: string) { + return ( + `Your previous output had a format error:\n${error}\n\n` + + `Please output ONLY the corrected JSON block. Use one of: ` + + `{ "action": "dispatch", "tasks": [...] }, { "action": "complete", "result": "..." }, or { "action": "continueWaiting" }.` + ); + }, +}; + +// --- Skill Execution Helper --- + +/** + * Execute a skill against an LLM session with optional retry on parse failure. + * On first parse failure, sends a correction prompt to let the agent fix in-place. + * + * @param sendMessage - Function to send a message and get the response text + * @param prompt - The user's prompt content + * @param skill - The structured output skill to use + * @param maxRetries - Maximum correction attempts (default: 1) + * @returns Parsed result or null if all attempts fail + */ +export async function executeWithSkill( + sendMessage: (text: string) => Promise, + prompt: string, + skill: StructuredOutputSkill, + maxRetries = 1, + log?: { info: (...args: unknown[]) => void; warn: (...args: unknown[]) => void }, +): Promise<{ ok: true; data: T } | { ok: false; error: string }> { + // First attempt: send prompt with skill format instructions + const fullPrompt = `${skill.formatPrompt}\n\n---\n\n${prompt}`; + const responseText = await sendMessage(fullPrompt); + + log?.info(`[${skill.name}] LLM response (${responseText.length} chars): ${responseText.slice(0, 500)}${responseText.length > 500 ? "..." : ""}`); + + const result = skill.parse(responseText); + if (result.ok) return result; + + log?.warn(`[${skill.name}] Parse failed: ${result.error}`); + + // Retry with correction prompt + let lastError = result.error; + for (let i = 0; i < maxRetries; i++) { + const correction = skill.correctionPrompt(responseText, lastError); + const retryText = await sendMessage(correction); + log?.info(`[${skill.name}] Retry ${i + 1} response (${retryText.length} chars): ${retryText.slice(0, 500)}${retryText.length > 500 ? "..." : ""}`); + const retryResult = skill.parse(retryText); + if (retryResult.ok) return retryResult; + lastError = retryResult.error; + log?.warn(`[${skill.name}] Retry ${i + 1} parse failed: ${lastError}`); + } + + return { ok: false, error: lastError }; +} diff --git a/electron/main/services/orchestration/task-executor.ts b/electron/main/services/orchestration/task-executor.ts new file mode 100644 index 00000000..5031381a --- /dev/null +++ b/electron/main/services/orchestration/task-executor.ts @@ -0,0 +1,343 @@ +// ============================================================================ +// Task Executor — Executes a single task node as a CodeMux session +// Follows the same pattern as ScheduledTaskService.executeTask() +// ============================================================================ + +import type { EngineManager } from "../../gateway/engine-manager"; +import type { OrchestrationSubtask, EngineType, UnifiedMessage, UnifiedPart, OrchestratorRole } from "../../../../src/types/unified"; +import { + AGENT_TEAM_INACTIVITY_TIMEOUT_MS, + AGENT_TEAM_MAX_TASK_RETRIES, + AGENT_TEAM_RETRY_BACKOFF_MS, +} from "./guardrails"; + +/** Role → engine/model resolver (injected by OrchestrationService). */ +export type RoleResolver = (role: OrchestratorRole) => { engineType: EngineType; modelId?: string } | null; + +/** Result of executing a single task */ +export interface TaskExecutionResult { + sessionId: string; + summary: string; + error?: string; +} + +export type AutoApproveSessionTracker = Set | ((sessionId: string) => void); + +export interface TaskExecutionOptions { + upstreamContext?: string; + defaultWorktreeId?: string; + onSessionCreated?: (sessionId: string) => void; + shouldCancel?: () => boolean; + inactivityTimeoutMs?: number; + maxRetries?: number; + retryBackoffMs?: number; +} + +export function trackAutoApproveSession( + tracker: AutoApproveSessionTracker, + sessionId: string, +): void { + if (typeof tracker === "function") { + tracker(sessionId); + return; + } + + if (tracker.size > 200) { + const recent = [...tracker].slice(-100); + tracker.clear(); + for (const id of recent) tracker.add(id); + } + + tracker.add(sessionId); +} + +/** + * Extracts text content from a completed UnifiedMessage. + * Concatenates all text parts from the message. + */ +export function extractTextFromMessage(message: UnifiedMessage): string { + const textParts = (message.parts || []) + .filter((p: UnifiedPart) => p.type === "text") + .map((p) => (p as { text: string }).text); + return textParts.join("\n").trim(); +} + +function stringifyError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function formatDuration(ms: number): string { + if (ms % 60_000 === 0) { + const minutes = ms / 60_000; + return `${minutes} minute${minutes === 1 ? "" : "s"}`; + } + + if (ms % 1000 === 0) { + const seconds = ms / 1000; + return `${seconds} second${seconds === 1 ? "" : "s"}`; + } + + return `${ms}ms`; +} + +function isRecoverableExecutionError(error: unknown): boolean { + const message = stringifyError(error).toLowerCase(); + return ( + message.includes("timed out after") || + message.includes("timeout") || + message.includes("network") || + message.includes("connection") || + message.includes("temporar") || + message.includes("unavailable") || + message.includes("rate limit") || + message.includes("429") || + message.includes("503") || + message.includes("econnreset") || + message.includes("epipe") + ); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export class TaskExecutor { + constructor( + private engineManager: EngineManager, + private autoApproveSessions: AutoApproveSessionTracker, + private defaultEngineType: EngineType, + private resolveRole?: RoleResolver, + ) {} + + /** + * Execute a single task: create session, send prompt, wait for completion. + * + * @param task - The task node to execute + * @param directory - Working directory for the session + * @param upstreamContext - Optional context from completed upstream tasks + */ + async execute( + task: OrchestrationSubtask, + directory: string, + options: TaskExecutionOptions = {}, + ): Promise { + const maxRetries = options.maxRetries ?? AGENT_TEAM_MAX_TASK_RETRIES; + const inactivityTimeoutMs = options.inactivityTimeoutMs ?? AGENT_TEAM_INACTIVITY_TIMEOUT_MS; + const retryBackoffMs = options.retryBackoffMs ?? AGENT_TEAM_RETRY_BACKOFF_MS; + + let attempt = 0; + let lastSessionId = task.sessionId ?? ""; + + while (true) { + if (options.shouldCancel?.()) { + return { + sessionId: lastSessionId, + summary: "", + error: "Task cancelled before execution started.", + }; + } + + try { + const result = await this.executeAttempt(task, directory, options, inactivityTimeoutMs); + return result; + } catch (error) { + lastSessionId = task.sessionId ?? lastSessionId; + + if (options.shouldCancel?.()) { + return { + sessionId: lastSessionId, + summary: "", + error: "Task cancelled during execution.", + }; + } + + if (attempt >= maxRetries || !isRecoverableExecutionError(error)) { + return { + sessionId: lastSessionId, + summary: "", + error: stringifyError(error), + }; + } + + attempt += 1; + + if (retryBackoffMs > 0) { + await sleep(retryBackoffMs); + } + } + } + } + + /** + * Build upstream context string from completed dependency tasks. + */ + static buildUpstreamContext(dependencies: OrchestrationSubtask[]): string | undefined { + const completed = dependencies.filter((d) => d.status === "completed" && d.resultSummary); + if (completed.length === 0) return undefined; + + const sections = completed.map( + (d) => `[Task "${d.description}"]: ${d.resultSummary}`, + ); + + return `Context from completed upstream tasks:\n---\n${sections.join("\n---\n")}`; + } + + private registerAutoApprove(sessionId: string): void { + trackAutoApproveSession(this.autoApproveSessions, sessionId); + } + + private async executeAttempt( + task: OrchestrationSubtask, + directory: string, + options: TaskExecutionOptions, + inactivityTimeoutMs: number, + ): Promise { + // Resolve role → engine/model if task has a role and engineType wasn't explicitly set + if (task.role && !task.engineType && this.resolveRole) { + const resolved = this.resolveRole(task.role); + if (resolved) { + task.engineType = resolved.engineType; + if (resolved.modelId && !task.modelId) { + task.modelId = resolved.modelId; + } + } + } + + const engineType = (task.engineType as EngineType) || this.defaultEngineType; + const worktreeId = task.worktreeId ?? options.defaultWorktreeId; + if (!task.worktreeId && worktreeId) { + task.worktreeId = worktreeId; + } + + const session = await this.engineManager.createSession(engineType, directory, worktreeId); + task.sessionId = session.id; + + this.registerAutoApprove(session.id); + options.onSessionCreated?.(session.id); + + if (options.shouldCancel?.()) { + return { + sessionId: session.id, + summary: "", + error: "Task cancelled before execution started.", + }; + } + + let prompt = task.prompt; + if (options.upstreamContext) { + prompt = `${options.upstreamContext}\n\n---\n\nYour task:\n${task.prompt}`; + } + + try { + const message = await this.waitForSessionResponse( + session.id, + this.engineManager.sendMessage(session.id, [ + { type: "text", text: prompt }, + ]), + inactivityTimeoutMs, + ); + + const summary = extractTextFromMessage(message); + + if (message.error) { + return { sessionId: session.id, summary, error: message.error }; + } + + return { sessionId: session.id, summary }; + } catch (error) { + if (!stringifyError(error).includes("of inactivity")) { + await this.cancelSessionQuietly(session.id); + } + throw error; + } + } + + private async waitForSessionResponse( + sessionId: string, + responsePromise: Promise, + inactivityTimeoutMs: number, + ): Promise { + const eventSource: Pick & Partial> = + this.engineManager; + + return await new Promise((resolve, reject) => { + let settled = false; + let timer: ReturnType | null = null; + + const finish = (callback: () => void) => { + if (settled) { + return; + } + settled = true; + cleanup(); + callback(); + }; + + const resetTimer = () => { + if (settled) { + return; + } + if (timer) { + clearTimeout(timer); + } + timer = setTimeout(() => { + void this.cancelSessionQuietly(sessionId).finally(() => { + finish(() => reject(new Error(`Task timed out after ${formatDuration(inactivityTimeoutMs)} of inactivity.`))); + }); + }, inactivityTimeoutMs); + }; + + const handlePartUpdated = (data: { sessionId: string }) => { + if (data.sessionId === sessionId) { + resetTimer(); + } + }; + const handleMessageUpdated = (data: { sessionId: string }) => { + if (data.sessionId === sessionId) { + resetTimer(); + } + }; + const handlePermissionAsked = (data: { permission: { sessionId: string } }) => { + if (data.permission.sessionId === sessionId) { + resetTimer(); + } + }; + const handleQuestionAsked = (data: { question: { sessionId: string } }) => { + if (data.question.sessionId === sessionId) { + resetTimer(); + } + }; + + const cleanup = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + eventSource.off?.("message.part.updated", handlePartUpdated); + eventSource.off?.("message.updated", handleMessageUpdated); + eventSource.off?.("permission.asked", handlePermissionAsked); + eventSource.off?.("question.asked", handleQuestionAsked); + }; + + eventSource.on?.("message.part.updated", handlePartUpdated); + eventSource.on?.("message.updated", handleMessageUpdated); + eventSource.on?.("permission.asked", handlePermissionAsked); + eventSource.on?.("question.asked", handleQuestionAsked); + + resetTimer(); + + responsePromise.then( + (message) => finish(() => resolve(message)), + (error) => finish(() => reject(error)), + ); + }); + } + + private async cancelSessionQuietly(sessionId: string): Promise { + try { + await this.engineManager.cancelMessage(sessionId); + } catch { + // Best effort cleanup only. + } + } +} diff --git a/electron/main/services/orchestration/user-channel.ts b/electron/main/services/orchestration/user-channel.ts new file mode 100644 index 00000000..e11161e8 --- /dev/null +++ b/electron/main/services/orchestration/user-channel.ts @@ -0,0 +1,76 @@ +// ============================================================================ +// UserChannel — Shared human-in-the-loop message channel for orchestrators. +// Allows users to inject messages into any orchestration loop (Light/Heavy Brain). +// The orchestration loop races user messages against task completions. +// ============================================================================ + +/** + * A channel for receiving user messages during orchestration. + * The orchestrator creates a channel, then races `waitForMessage()` against + * task completions. External code calls `send()` to inject a user message. + */ +export class UserChannel { + /** Pending resolve callback — set when the orchestrator is waiting */ + private waitResolve: ((text: string) => void) | null = null; + /** Buffered message — set when a message arrives while not waiting */ + private pendingMessage: string | null = null; + + /** + * Send a user message into the channel. + * If the orchestrator is currently waiting (in Promise.race), resolves immediately. + * Otherwise buffers until the next waitForMessage() call. + */ + send(text: string): void { + if (this.waitResolve) { + const resolve = this.waitResolve; + this.waitResolve = null; + resolve(text); + } else { + // Buffer — next waitForMessage() will return immediately + this.pendingMessage = text; + } + } + + /** + * Check if there's a buffered message without consuming it. + */ + hasPending(): boolean { + return this.pendingMessage !== null; + } + + /** + * Consume and return the buffered message, if any. + */ + takePending(): string | null { + const msg = this.pendingMessage; + this.pendingMessage = null; + return msg; + } + + /** + * Returns a promise that resolves when a user message arrives. + * Used in Promise.race alongside task completion promises. + * If a message is already buffered, resolves immediately. + */ + waitForMessage(): Promise { + // Return buffered message immediately + if (this.pendingMessage !== null) { + const msg = this.pendingMessage; + this.pendingMessage = null; + return Promise.resolve(msg); + } + + return new Promise((resolve) => { + this.waitResolve = resolve; + }); + } + + /** + * Cancel any pending wait (e.g. when orchestration ends). + * Does not reject — just clears the callback so GC can collect the promise. + */ + dispose(): void { + this.waitResolve = null; + this.pendingMessage = null; + } +} diff --git a/package.json b/package.json index 11c3f034..b3b34478 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "server:tunnel": "bash ./scripts/server-dev.sh start --replace --tunnel", "server:down": "bash ./scripts/server-dev.sh stop", "server:status": "bash ./scripts/server-dev.sh status", + "server:restart": "bash ./scripts/server-dev.sh restart", "server:access-code": "bun scripts/server-auth.ts access-code", "server:access-requests": "bun scripts/server-auth.ts access-requests", "update:cloudflared": "bun scripts/update-cloudflared.ts", @@ -45,7 +46,7 @@ }, "dependencies": { "@anthropic-ai/claude-agent-sdk": "^0.2.63", - "@github/copilot-sdk": "0.2.0", + "@github/copilot-sdk": "0.2.2", "@larksuiteoapi/node-sdk": "^1.42.0", "@opencode-ai/sdk": "^1.2.15", "@parcel/watcher": "^2.5.1", diff --git a/scripts/server-dev.sh b/scripts/server-dev.sh index f71f29c7..f92d5e3c 100755 --- a/scripts/server-dev.sh +++ b/scripts/server-dev.sh @@ -12,6 +12,7 @@ LOCAL_URL_FILE="$STATE_DIR/local-url" TUNNEL_URL_FILE="$STATE_DIR/tunnel-url" XVFB_SCREEN="${CODEMUX_XVFB_SCREEN:-1280x720x24}" DEFAULT_TIMEOUT="${CODEMUX_SERVER_START_TIMEOUT:-90}" +STARTED_LOCAL_URL="" export PATH="$HOME/.bun/bin:$HOME/.opencode/bin:$PATH" @@ -20,12 +21,14 @@ usage() { Usage: ./scripts/server-dev.sh start [--foreground] [--replace] [--tunnel] ./scripts/server-dev.sh stop + ./scripts/server-dev.sh restart ./scripts/server-dev.sh status ./scripts/server-dev.sh logs [app|tunnel] Examples: ./scripts/server-dev.sh start --foreground ./scripts/server-dev.sh start --replace --tunnel + ./scripts/server-dev.sh restart ./scripts/server-dev.sh status EOF } @@ -279,11 +282,10 @@ start_foreground() { dbus-run-session -- xvfb-run --auto-servernum --server-args="-screen 0 $XVFB_SCREEN" bun run dev } -start_background() { - local with_tunnel="$1" - +start_managed_app_background() { mkdir -p "$STATE_DIR" - rm -f "$LOCAL_URL_FILE" "$TUNNEL_URL_FILE" + STARTED_LOCAL_URL="" + rm -f "$LOCAL_URL_FILE" : > "$APP_LOG" setsid bash -lc "export PATH=\"$HOME/.bun/bin:$HOME/.opencode/bin:\$PATH\"; export CODEMUX_DISABLE_COPILOT_DBUS=1; export CODEMUX_SERVER_MODE=1; cd \"$REPO_DIR\"; exec dbus-run-session -- xvfb-run --auto-servernum --server-args='-screen 0 $XVFB_SCREEN' bun run dev" > "$APP_LOG" 2>&1 & @@ -294,16 +296,29 @@ start_background() { info "Started CodeMux headless dev (PID $app_pid); waiting for the renderer URL..." local local_url if ! local_url=$(wait_for_local_url "$app_pid"); then + rm -f "$APP_PID_FILE" "$LOCAL_URL_FILE" warn "CodeMux exited before the renderer URL was detected. Recent logs:" tail -n 40 "$APP_LOG" || true - exit 1 + return 1 fi + STARTED_LOCAL_URL="$local_url" success "CodeMux dev is ready: $local_url" printf ' App log: %s ' "$APP_LOG" print_auth_status +} + +start_background() { + local with_tunnel="$1" + + rm -f "$TUNNEL_URL_FILE" + + if ! start_managed_app_background; then + exit 1 + fi + local local_url="$STARTED_LOCAL_URL" if [ "$with_tunnel" -eq 1 ]; then : > "$TUNNEL_LOG" setsid bash -lc "exec cloudflared tunnel --url '$local_url'" > "$TUNNEL_LOG" 2>&1 & @@ -325,6 +340,54 @@ start_background() { fi } +restart_app() { + cleanup_stale_pid_file "$APP_PID_FILE" + cleanup_stale_pid_file "$TUNNEL_PID_FILE" + + local app_pid tunnel_pid previous_local_url tunnel_url repo_pids + app_pid=$(read_pid_file "$APP_PID_FILE" 2>/dev/null || true) + tunnel_pid=$(read_pid_file "$TUNNEL_PID_FILE" 2>/dev/null || true) + previous_local_url=$(read_local_url 2>/dev/null || true) + tunnel_url=$(read_tunnel_url 2>/dev/null || true) + repo_pids=$(find_repo_processes || true) + + if [ -z "$app_pid" ]; then + if [ -n "$repo_pids" ]; then + fail "Found CodeMux dev processes without managed state. Run bun run server:down first." + fi + warn "No managed CodeMux app process was running. Starting a fresh instance instead." + else + info "Restarting CodeMux headless dev (PID $app_pid)..." + kill_process_group_from_file "$APP_PID_FILE" + fi + + rm -f "$LOCAL_URL_FILE" + + if ! start_managed_app_background; then + if [ -n "$tunnel_pid" ] && is_process_group_running "$tunnel_pid"; then + warn "Managed Cloudflare tunnel is still running." + [ -n "$tunnel_url" ] && printf ' Tunnel URL: %s\n' "$tunnel_url" + printf ' Tunnel log: %s\n' "$TUNNEL_LOG" + fi + exit 1 + fi + + local restarted_local_url="$STARTED_LOCAL_URL" + if [ -n "$tunnel_pid" ] && is_process_group_running "$tunnel_pid"; then + success "Preserved managed Cloudflare tunnel." + [ -n "$tunnel_url" ] && printf ' Tunnel URL: %s\n' "$tunnel_url" + printf ' Tunnel log: %s\n' "$TUNNEL_LOG" + if [ -n "$previous_local_url" ] && [ "$previous_local_url" != "$restarted_local_url" ]; then + warn "App restarted on $restarted_local_url, but the preserved tunnel still targets $previous_local_url." + printf ' Recreate the tunnel if remote access stops working: bun run server:down && bun run server:tunnel\n' + else + printf ' Public URL should stay the same while the existing tunnel process remains healthy.\n' + fi + else + warn "No managed Cloudflare tunnel was running. Restart completed without public tunnel." + fi +} + stop_all() { cleanup_stale_pid_file "$APP_PID_FILE" cleanup_stale_pid_file "$TUNNEL_PID_FILE" @@ -481,6 +544,11 @@ main() { [ "$#" -eq 0 ] || fail "stop does not accept extra arguments" stop_all ;; + restart) + [ "$#" -eq 0 ] || fail "restart does not accept extra arguments" + ensure_repo_ready + restart_app + ;; status) [ "$#" -eq 0 ] || fail "status does not accept extra arguments" show_status diff --git a/scripts/server-init.sh b/scripts/server-init.sh index 1c794b11..6361c144 100755 --- a/scripts/server-init.sh +++ b/scripts/server-init.sh @@ -153,6 +153,7 @@ Next steps: bun run server:dev # foreground headless Electron dev bun run server:up # background headless Electron dev bun run server:tunnel # background headless Electron dev + quick tunnel + bun run server:restart # restart app only, preserving managed tunnel when possible bun run start # web-only standalone server EOF } diff --git a/src/components/PromptInput.tsx b/src/components/PromptInput.tsx index 481a3f85..8fa662c7 100644 --- a/src/components/PromptInput.tsx +++ b/src/components/PromptInput.tsx @@ -134,6 +134,10 @@ interface PromptInputProps { availableCommands?: EngineCommand[]; /** Called when user invokes a slash command (instead of onSend) */ onCommandInvoke?: (commandName: string, args: string, agent: AgentMode) => void; + /** Called when user triggers a team run */ + onTeamSend?: (text: string, mode: "light" | "heavy") => void; + /** When true, prompt sends are relayed to the active Heavy Brain orchestrator */ + relayToOrchestrator?: boolean; } export function PromptInput(props: PromptInputProps) { @@ -142,9 +146,35 @@ export function PromptInput(props: PromptInputProps) { const [textarea, setTextarea] = createSignal(); const [images, setImages] = createSignal([]); const [dragOver, setDragOver] = createSignal(false); + const [showTeamMenu, setShowTeamMenu] = createSignal(false); + let teamMenuRef: HTMLDivElement | undefined; let fileInputRef: HTMLInputElement | undefined; let pasteCounter = 0; + // Close team menu on click outside + const handleTeamClickOutside = (e: MouseEvent) => { + if (teamMenuRef && !teamMenuRef.contains(e.target as Node)) { + setShowTeamMenu(false); + } + }; + createEffect(() => { + if (showTeamMenu()) { + document.addEventListener("mousedown", handleTeamClickOutside); + } else { + document.removeEventListener("mousedown", handleTeamClickOutside); + } + }); + onCleanup(() => document.removeEventListener("mousedown", handleTeamClickOutside)); + + const handleTeamSend = (mode: "light" | "heavy") => { + const trimmed = text().trim(); + if (!trimmed || !props.onTeamSend) return; + props.onTeamSend(trimmed, mode); + setText(""); + setImages([]); + setShowTeamMenu(false); + }; + // --- Slash command autocomplete state --- const [showCommandMenu, setShowCommandMenu] = createSignal(false); const [commandSelectedIndex, setCommandSelectedIndex] = createSignal(0); @@ -152,6 +182,7 @@ export function PromptInput(props: PromptInputProps) { /** Parse the current text: detect `/command args` prefix */ const commandQuery = createMemo(() => { + if (props.relayToOrchestrator) return null; const val = text(); if (!val.startsWith("/")) return null; // Only trigger for single-line prefix (no newlines before command) @@ -387,7 +418,7 @@ export function PromptInput(props: PromptInputProps) { const doSend = () => { const trimmed = text().trim(); // Detect slash command: text starts with / and onCommandInvoke is provided - if (trimmed.startsWith("/") && props.onCommandInvoke) { + if (!props.relayToOrchestrator && trimmed.startsWith("/") && props.onCommandInvoke) { const spaceIdx = trimmed.indexOf(" "); const commandName = spaceIdx === -1 ? trimmed.slice(1) : trimmed.slice(1, spaceIdx); const args = spaceIdx === -1 ? "" : trimmed.slice(spaceIdx + 1).trim(); @@ -424,6 +455,9 @@ export function PromptInput(props: PromptInputProps) { // Placeholder text based on active mode and generating state const modePlaceholder = createMemo(() => { + if (props.relayToOrchestrator) { + return t().prompt.orchestrationRelayPlaceholder; + } if (props.isGenerating) { if (props.canEnqueue) return t().prompt.typeNextMessage ?? "Type your next message..."; return t().prompt.waitingForResponse ?? "Waiting for response..."; @@ -451,12 +485,13 @@ export function PromptInput(props: PromptInputProps) { return ( - + {/* Team run trigger */} + +
{ teamMenuRef = el; }}> + + +
+ + +
+
+
+
+ + + ); } diff --git a/src/components/SessionSidebar.tsx b/src/components/SessionSidebar.tsx index 16fdaabd..943819cb 100644 --- a/src/components/SessionSidebar.tsx +++ b/src/components/SessionSidebar.tsx @@ -9,6 +9,7 @@ import { getEngineBadge } from "./share/common"; import { ScheduledTaskSection } from "./ScheduledTaskSection"; import { getSetting } from "../lib/settings"; import { gateway } from "../lib/gateway-api"; +import { setOrchestrationStore, orchestrationStore } from "../stores/orchestration"; import { isElectron } from "../lib/platform"; import { systemAPI } from "../lib/electron-api"; @@ -44,6 +45,9 @@ interface SessionSidebarProps { onManageWorktrees?: (projectDirectory: string) => void; onRemoveWorktree?: (projectDirectory: string, worktreeName: string, worktreeBranch: string) => void; onMergeWorktree?: (projectDirectory: string, worktreeName: string, worktreeBranch: string) => void; + // Team Orchestration + onNewTeamTask?: (directory?: string) => void; + orchestrationParentSessionIds?: Set; } // Project grouping data structure @@ -138,7 +142,7 @@ export function SessionSidebar(props: SessionSidebarProps) { const sessions = props.sessions.filter( (s) => isEngineEnabled(s.engineType) && s.projectID === defaultProject.id - && (!s.worktreeId || worktreeEnabled()), + && (!s.worktreeId || worktreeEnabled() || (s.teamId && sessionStore.teamOrchestrationEnabled)), ); return { @@ -164,7 +168,7 @@ export function SessionSidebar(props: SessionSidebarProps) { } const rootSessions = props.sessions.filter(s => - isEngineEnabled(s.engineType) && (!s.worktreeId || worktreeEnabled()), + isEngineEnabled(s.engineType) && (!s.worktreeId || worktreeEnabled() || (s.teamId && sessionStore.teamOrchestrationEnabled)), ); for (const session of rootSessions) { @@ -235,8 +239,11 @@ export function SessionSidebar(props: SessionSidebarProps) { const isSearching = () => searchQuery().trim().length > 0; const getProjectWorktrees = (projectDir: string): UnifiedWorktree[] => { - if (!worktreeEnabled()) return []; - return sessionStore.worktrees[projectDir] || []; + const wts = sessionStore.worktrees[projectDir] || []; + if (worktreeEnabled()) return wts; + // Even if worktree feature is off, show team worktrees when team orchestration is on + if (sessionStore.teamOrchestrationEnabled) return wts.filter(wt => wt.name.startsWith("team-")); + return []; }; const isWorktreeExpanded = (key: string): boolean => { @@ -251,21 +258,20 @@ export function SessionSidebar(props: SessionSidebarProps) { const getWorktreeSessionGroups = ( projectDir: string, sessions: SessionInfo[], - ): { local: SessionInfo[]; worktreeGroups: { worktree: UnifiedWorktree; sessions: SessionInfo[] }[] } => { + ): { + local: SessionInfo[]; + worktreeGroups: { worktree: UnifiedWorktree; sessions: SessionInfo[]; isTeam: boolean }[]; + } => { const worktrees = getProjectWorktrees(projectDir); if (worktrees.length === 0) { - return { local: sessions, worktreeGroups: [] }; - } - - // Filter out worktree sessions when feature is disabled - if (!worktreeEnabled()) { - return { local: sessions.filter((s) => !s.worktreeId), worktreeGroups: [] }; + return { local: sessions.filter(s => !s.worktreeId), worktreeGroups: [] }; } const local = sessions.filter((s) => !s.worktreeId); const wtGroups = worktrees.map((wt) => ({ worktree: wt, sessions: sessions.filter((s) => s.worktreeId === wt.name), + isTeam: wt.name.startsWith("team-"), })); return { local, worktreeGroups: wtGroups }; @@ -273,7 +279,7 @@ export function SessionSidebar(props: SessionSidebarProps) { // Load worktrees for all projects when feature is enabled const refreshWorktrees = async () => { - if (!worktreeEnabled()) return; + if (!worktreeEnabled() && !sessionStore.teamOrchestrationEnabled) return; for (const group of projectGroups()) { const dir = group.project?.directory; if (!dir) continue; @@ -286,7 +292,7 @@ export function SessionSidebar(props: SessionSidebarProps) { // Initial load + re-load when projects change createEffect(() => { - if (!worktreeEnabled()) return; + if (!worktreeEnabled() && !sessionStore.teamOrchestrationEnabled) return; // Track projectGroups to re-run when projects change const groups = projectGroups(); if (groups.length === 0) return; @@ -382,7 +388,7 @@ export function SessionSidebar(props: SessionSidebarProps) { }; return ( -
+
{/* Search Box */} 0 || filteredDefaultWorkspaceGroup() !== null)}>
@@ -845,6 +851,24 @@ export function SessionSidebar(props: SessionSidebarProps) { + {/* New team task - default workspace */} + + +
@@ -1163,6 +1187,24 @@ export function SessionSidebar(props: SessionSidebarProps) { + {/* New team task */} + + + {/* Manage worktrees (only when feature enabled) */}
- {/* Worktree sessions */} + {/* Worktree sessions (normal + team) */} - {(wtGroup) => ( + {(wtGroup) => { + const wtKey = `${dir}::${wtGroup.worktree.name}`; + + if (wtGroup.isTeam) { + // Team worktree: orchestrator as header, subtasks indented + const parentSession = () => wtGroup.sessions.find(s => + props.orchestrationParentSessionIds?.has(s.id) + ); + const teamLabel = () => { + const p = parentSession(); + const title = p?.title; + if (title && title !== "New Session" && title !== "New Chat") return title; + return `Team ${wtGroup.worktree.name.slice(5)}`; + }; + const childSessions = () => wtGroup.sessions.filter(s => s.id !== parentSession()?.id); + const isOrchestratorActive = () => parentSession()?.id === props.currentSessionId; + + return ( +
+
{ + const ps = parentSession(); + if (ps) props.onSelectSession(ps.id); + toggleWorktreeExpanded(wtKey); + }} + > + + + + + + + + + + + {teamLabel()} + + 0}> + + {childSessions().length} + + + {/* Delete team overlay */} + e.stopPropagation()}> + + +
+ } + > + + +
+ + 0}> +
+ + {(session, idx) => ( +
props.onSelectSession(session.id)} + > + + #{idx() + 1} + + + {session.title || "Subtask"} + + + {session.engineType} + +
+ )} +
+
+
+
+ + ); + } + + // Normal worktree rendering + return (
toggleWorktreeExpanded(`${dir}::${wtGroup.worktree.name}`)} + onClick={() => toggleWorktreeExpanded(wtKey)} > @@ -1581,7 +1775,7 @@ export function SessionSidebar(props: SessionSidebarProps) {
- +
0} fallback={
@@ -1593,7 +1787,8 @@ export function SessionSidebar(props: SessionSidebarProps) {
- )} + ); + }}
); diff --git a/src/components/SessionTurn.tsx b/src/components/SessionTurn.tsx index dfa401eb..64b9f0a6 100644 --- a/src/components/SessionTurn.tsx +++ b/src/components/SessionTurn.tsx @@ -771,8 +771,8 @@ export function SessionTurn(props: SessionTurnProps) { {/* Compacting Turn - Show simplified UI */} - {/* User Message - Only show when there are displayable parts */} - 0}> + {/* User Message - Only show when there are displayable parts and not an internal message */} + 0 && !props.userMessage.internal}>
{(part, partIndex) => ( diff --git a/src/components/orchestration/OrchestrationCards.tsx b/src/components/orchestration/OrchestrationCards.tsx new file mode 100644 index 00000000..c824be26 --- /dev/null +++ b/src/components/orchestration/OrchestrationCards.tsx @@ -0,0 +1,127 @@ +import { createMemo, Match, Show, Switch } from "solid-js"; +import { orchestrationStore } from "../../stores/orchestration"; +import { gateway } from "../../lib/gateway-api"; +import { updateRun } from "../../stores/orchestration"; +import type { OrchestrationSubtask } from "../../types/unified"; +import { TeamPlanCard } from "./TeamPlanCard"; +import { TeamExecutionCard } from "./TeamExecutionCard"; +import { TeamResultCard } from "./TeamResultCard"; +import styles from "./orchestration.module.css"; + +interface OrchestrationCardsProps { + runId: string; + onViewSession: (sessionId: string) => void; +} + +export function OrchestrationCards(props: OrchestrationCardsProps) { + const run = createMemo(() => orchestrationStore.runs[props.runId]); + + const handleConfirm = async (subtasks: OrchestrationSubtask[]) => { + const r = run(); + if (!r) return; + await gateway.confirmOrchestrationPlan({ runId: r.id, subtasks }); + }; + + const handleCancel = async () => { + const r = run(); + if (!r) return; + await gateway.cancelOrchestration(r.id); + }; + + return ( + Loading orchestration…
}> + {(r) => ( + + + + + + {r().status}… + + + }> + {/* Decomposing / setup spinner */} + +
+ + + +
+ + {r().status === "setup" ? "Setting up team…" : "Decomposing task…"} + + + Analyzing prompt and planning subtasks + +
+
+
+ + {/* Plan confirmation */} + + + + + {/* Dispatching */} + +
+ + + +
+ + Dispatching subtasks… + + + Creating worktree and sessions + +
+
+
+ + {/* Running / aggregating */} + + + + + {/* Completed */} + + + + + {/* Failed */} + + + + + {/* Cancelled */} + +
+ + Cancelled + + + Team execution was cancelled. + +
+
+
+ )} +
+ ); +} diff --git a/src/components/orchestration/SubtaskEditor.tsx b/src/components/orchestration/SubtaskEditor.tsx new file mode 100644 index 00000000..a30c6101 --- /dev/null +++ b/src/components/orchestration/SubtaskEditor.tsx @@ -0,0 +1,203 @@ +import { createSignal, For, Show } from "solid-js"; +import type { OrchestrationSubtask, RoleEngineMapping, OrchestratorRole } from "../../types/unified"; + +interface AvailableEngine { + type: string; + name: string; + models?: { id: string; name?: string }[]; +} + +interface SubtaskEditorProps { + subtask: OrchestrationSubtask; + index: number; + availableEngines: AvailableEngine[]; + allSubtasks: OrchestrationSubtask[]; + roleMappings?: RoleEngineMapping[]; + onUpdate: (updated: OrchestrationSubtask) => void; + onDelete: () => void; + onRoleChange?: (role: OrchestratorRole) => void; + readOnly?: boolean; +} + +const ROLE_LABELS: Record = { + explorer: "Explorer", + researcher: "Researcher", + reviewer: "Reviewer", + designer: "Designer", + coder: "Coder", +}; + +export function SubtaskEditor(props: SubtaskEditorProps) { + const [expanded, setExpanded] = createSignal(false); + + const engineName = () => { + const eng = props.availableEngines.find((e) => e.type === props.subtask.engineType); + return eng?.name ?? props.subtask.engineType; + }; + + const depsLabel = () => { + if (!props.subtask.dependsOn.length) return ""; + const labels = props.subtask.dependsOn.map((depId) => { + const idx = props.allSubtasks.findIndex((t) => t.id === depId); + return idx >= 0 ? `#${idx + 1}` : depId.slice(0, 6); + }); + return `→ ${labels.join(", ")}`; + }; + + const update = (partial: Partial) => { + props.onUpdate({ ...props.subtask, ...partial }); + }; + + const toggleDep = (id: string) => { + const current = props.subtask.dependsOn; + if (current.includes(id)) { + update({ dependsOn: current.filter((d) => d !== id) }); + } else { + update({ dependsOn: [...current, id] }); + } + }; + + const animationDelay = `${props.index * 50}ms`; + + return ( +
+ {/* Collapsed header */} + + + {/* Expanded editor */} + +
+ {/* Description */} +
+ +