Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions scripts/claude_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import httpx

from multihead.subprocess_utils import no_window_flags

logging.basicConfig(
level=logging.INFO,
format="[worker] %(asctime)s %(levelname)s %(message)s",
Expand Down Expand Up @@ -376,6 +378,7 @@ def _invoke_claude(self, prompt: str, session_id: str | None = None) -> dict:
cwd=CLAUDE_WORK_DIR,
timeout=SUBPROCESS_TIMEOUT,
env=env,
creationflags=no_window_flags(),
)
except subprocess.TimeoutExpired:
elapsed = time.monotonic() - start
Expand Down Expand Up @@ -458,6 +461,7 @@ def _wake_interactive(self, message: str) -> dict:
check = subprocess.run(
["tmux", "has-session", "-t", self.tmux_target],
capture_output=True, timeout=5,
creationflags=no_window_flags(),
)
if check.returncode != 0:
return {"error": f"tmux session '{self.tmux_target}' not found. "
Expand All @@ -467,6 +471,7 @@ def _wake_interactive(self, message: str) -> dict:
subprocess.run(
["tmux", "send-keys", "-t", self.tmux_target, message, "Enter"],
check=True, timeout=5,
creationflags=no_window_flags(),
)
return {"result": f"Sent to tmux session '{self.tmux_target}'", "mode": "interactive"}

Expand Down
33 changes: 31 additions & 2 deletions src/multihead/adapters/claude_agent_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import logging
import os
import sys
from collections.abc import AsyncIterator
from typing import Any

Expand Down Expand Up @@ -62,7 +63,7 @@ class ClaudeAgentSDKAdapter(HeadAdapter):
for typed messages, streaming, session management, and hooks.

Configuration via manifest.extra:
model: str — Claude model (default: from env or "claude-sonnet-4-20250514")
model: str — Claude model (default: from env or "claude-sonnet-4-6")
max_turns: int — Max agent turns per call (default: 25)
max_budget_usd: float — Budget cap per call (default: 5.0)
permission_mode: str — "bypassPermissions" | "acceptEdits" | "default"
Expand All @@ -72,12 +73,40 @@ class ClaudeAgentSDKAdapter(HeadAdapter):
effort: str — "low" | "medium" | "high" | "max"
"""

_anyio_patched = False

@classmethod
def _patch_anyio(cls) -> None:
"""Monkey-patch anyio.open_process to suppress console windows on Windows.

The Claude Agent SDK spawns subprocesses via anyio, which does not pass
CREATE_NO_WINDOW by default. This causes popup cmd windows during Night
Shift and other headless operations. Only patches once per process.
"""
if cls._anyio_patched or sys.platform != "win32":
return
try:
import anyio as _anyio
import subprocess as _subprocess

_original = _anyio.open_process

async def _patched(*args: Any, **kwargs: Any) -> Any:
kwargs.setdefault("creationflags", _subprocess.CREATE_NO_WINDOW)
return await _original(*args, **kwargs)

_anyio.open_process = _patched # type: ignore[assignment]
cls._anyio_patched = True
except ImportError:
pass # anyio not installed — SDK won't work either

def __init__(self, manifest: HeadManifest) -> None:
super().__init__(manifest)
self._patch_anyio()
extra = manifest.extra or {}

self._model = extra.get("model") or manifest.model or os.environ.get(
"CLAUDE_MODEL", "claude-opus-4-6"
"CLAUDE_MODEL", "claude-sonnet-4-6"
)
self._max_turns = extra.get("max_turns", _DEFAULT_MAX_TURNS)
self._max_budget = extra.get("max_budget_usd", _DEFAULT_MAX_BUDGET_USD)
Expand Down
20 changes: 18 additions & 2 deletions src/multihead/api/routes_nightshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from typing import Any

from fastapi import APIRouter, BackgroundTasks, Query, Request
from fastapi.responses import JSONResponse

from multihead.models import HeadState

router = APIRouter()

Expand All @@ -16,7 +19,7 @@
"running": False,
"last_report": None,
"current_stage": None,
"progress": deque(maxlen=100),
"progress": deque(maxlen=5000),
}


Expand All @@ -28,12 +31,25 @@ async def trigger_nightshift(
concurrency: int = Query(1, description="Parallel LLM calls per stage (1=sequential)"),
) -> dict[str, Any]:
"""Trigger a Night Shift run in the background."""
# Check for at least one active head before starting
head_manager = request.app.state.head_manager
states = head_manager.get_states()
has_active = any(
info.get("state") == HeadState.ACTIVE.value
for info in states.values()
)
if not has_active:
return JSONResponse(
status_code=400,
content={"error": "No active heads — wake a head before running Night Shift"},
)
Comment on lines 31 to +45
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This endpoint is annotated to return dict[str, Any] but returns a JSONResponse for the 400 case. To keep the return type consistent and improve generated OpenAPI docs, prefer raising fastapi.HTTPException(status_code=400, detail=...) (or update the return type annotation/response model to include the JSONResponse path).

Copilot uses AI. Check for mistakes.

async with _nightshift_lock:
if _nightshift_status["running"]:
return {"status": "already_running"}
_nightshift_status["running"] = True
_nightshift_status["current_stage"] = None
_nightshift_status["progress"] = deque(maxlen=100)
_nightshift_status["progress"] = deque(maxlen=5000)

Comment on lines 49 to 53
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Progress events are appended to _nightshift_status["progress"] (via the debug progress callback) while /status converts the same deque to a list. Mutating a deque during iteration can raise RuntimeError, and (because _emit swallows exceptions) can silently drop progress events. Consider synchronizing both appends and reads (e.g., protect with a lock, or funnel updates through an async task that holds _nightshift_lock).

Copilot uses AI. Check for mistakes.
night_shift = request.app.state.night_shift
night_shift.config.concurrency = max(1, concurrency)
Expand Down
4 changes: 3 additions & 1 deletion src/multihead/api/routes_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from fastapi import APIRouter, Query, Request

from multihead.subprocess_utils import no_window_flags

logger = logging.getLogger(__name__)

router = APIRouter()
Expand Down Expand Up @@ -62,7 +64,7 @@ async def restart():
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Spawn a new server process directly (bypasses mh.cmd issues with nested shells)
flags = (
subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP | no_window_flags()
if sys.platform == "win32"
else 0
)
Expand Down
2 changes: 2 additions & 0 deletions src/multihead/autonomous_executor/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from abc import ABC, abstractmethod
from typing import Any

from multihead.subprocess_utils import no_window_flags
from .models import StepExecutionResult

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -213,6 +214,7 @@ async def execute_step(
cwd=self.work_dir,
timeout=timeout or self.subprocess_timeout,
env=env,
creationflags=no_window_flags(),
),
)
except subprocess.TimeoutExpired:
Expand Down
4 changes: 4 additions & 0 deletions src/multihead/claim_corroboration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import subprocess
from pathlib import Path

from multihead.subprocess_utils import no_window_flags

# Patterns to extract file paths from text
_FILE_PATH_RE = re.compile(
r'(?:^|[\s\'"`(])(/(?:mnt|home|tmp|usr|opt|var)[/\w._-]+(?:\.\w{1,10})?)',
Expand Down Expand Up @@ -71,6 +73,7 @@ def get_git_head_sha(repo_path: str = ".") -> str | None:
result = subprocess.run(
["git", "-C", repo_path, "rev-parse", "HEAD"],
capture_output=True, text=True, timeout=5,
creationflags=no_window_flags(),
)
if result.returncode == 0:
return result.stdout.strip()
Expand Down Expand Up @@ -103,6 +106,7 @@ def check_sha_staleness(
result = subprocess.run(
["git", "-C", repo_path, "diff", source_sha, "HEAD", "--", file_path],
capture_output=True, text=True, timeout=10,
creationflags=no_window_flags(),
)
if result.returncode != 0:
return {"stale": False, "penalty": 0.0, "reason": "git_error"}
Expand Down
3 changes: 3 additions & 0 deletions src/multihead/diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pathlib import Path
from typing import Any

from multihead.subprocess_utils import no_window_flags


@dataclass
class DiagnosticResult:
Expand Down Expand Up @@ -115,6 +117,7 @@ def _check_ollama(self) -> DiagnosticResult:
import subprocess
result = subprocess.run(
["ollama", "list"], capture_output=True, text=True, timeout=10,
creationflags=no_window_flags(),
)
if result.returncode == 0:
models = [l.split()[0] for l in result.stdout.strip().split("\n")[1:] if l.strip()]
Expand Down
3 changes: 3 additions & 0 deletions src/multihead/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async def map_generate(
stage_name: str = "",
batch_mode: bool = False,
no_wait: bool = False,
on_chunk_progress: Callable[[int, int], None] | None = None,
) -> list[dict[str, Any] | Exception]:
"""Call generate for each prompt, optionally in parallel or batch.

Expand Down Expand Up @@ -209,6 +210,8 @@ def _on_progress(completed: int, total: int) -> None:
raise InterruptedError(
f"Shutdown at {idx}/{len(prompts)} in {stage_name or 'map_generate'}"
)
if on_chunk_progress:
on_chunk_progress(idx, len(prompts))
try:
result = await BaseExtractor.call_generate(adapter, prompt)
results.append(result)
Expand Down
4 changes: 4 additions & 0 deletions src/multihead/extractors/ci_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from pathlib import Path
from typing import Any

from multihead.subprocess_utils import no_window_flags

logger = logging.getLogger(__name__)


Expand All @@ -38,6 +40,7 @@ def extract_ci_runs(
["gh", "api", f"repos/{owner}/{repo}/actions/runs",
"--jq", f".workflow_runs[:{limit}]"],
capture_output=True, text=True, timeout=30,
creationflags=no_window_flags(),
)
if result.returncode != 0:
logger.error("gh api failed: %s", result.stderr)
Expand Down Expand Up @@ -124,6 +127,7 @@ def _extract_job_results(
["gh", "api", f"repos/{owner}/{repo}/actions/runs/{run_id}/jobs",
"--jq", ".jobs"],
capture_output=True, text=True, timeout=15,
creationflags=no_window_flags(),
)
if result.returncode != 0:
return []
Expand Down
3 changes: 2 additions & 1 deletion src/multihead/extractors/claim_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ async def extract(
adapter, prompts, concurrency=concurrency,
checkpoint_dir=kwargs.get("checkpoint_dir"),
stage_name=kwargs.get("stage_name", ""),
batch_mode=kwargs.get("batch_mode", False),
batch_mode=kwargs.get("batch_mode", False),
no_wait=kwargs.get("no_wait", False),
on_chunk_progress=kwargs.get("on_chunk_progress"),
)

import logging as _log
Expand Down
3 changes: 2 additions & 1 deletion src/multihead/extractors/entity_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ async def extract(
adapter, prompts, concurrency=concurrency,
checkpoint_dir=kwargs.get("checkpoint_dir"),
stage_name=kwargs.get("stage_name", ""),
batch_mode=kwargs.get("batch_mode", False),
batch_mode=kwargs.get("batch_mode", False),
no_wait=kwargs.get("no_wait", False),
on_chunk_progress=kwargs.get("on_chunk_progress"),
)

for chunk, resp in zip(chunks, responses):
Expand Down
3 changes: 2 additions & 1 deletion src/multihead/extractors/event_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ async def extract(
adapter, prompts, concurrency=concurrency,
checkpoint_dir=kwargs.get("checkpoint_dir"),
stage_name=kwargs.get("stage_name", ""),
batch_mode=kwargs.get("batch_mode", False),
batch_mode=kwargs.get("batch_mode", False),
no_wait=kwargs.get("no_wait", False),
on_chunk_progress=kwargs.get("on_chunk_progress"),
)

for chunk, resp in zip(chunks, responses):
Expand Down
5 changes: 4 additions & 1 deletion src/multihead/extractors/test_results_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from pathlib import Path
from typing import Any

from multihead.subprocess_utils import no_window_flags

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -243,7 +245,8 @@ def run_and_extract(
args.extend(extra_args)

try:
result = subprocess.run(args, capture_output=True, text=True, timeout=600)
result = subprocess.run(args, capture_output=True, text=True, timeout=600,
creationflags=no_window_flags())
claims = extract_from_junit_xml(xml_path)

# Also parse stdout for any extra info
Expand Down
1 change: 1 addition & 0 deletions src/multihead/extractors/topic_assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async def extract(
stage_name=kwargs.get("stage_name", ""),
batch_mode=kwargs.get("batch_mode", False),
no_wait=kwargs.get("no_wait", False),
on_chunk_progress=kwargs.get("on_chunk_progress"),
)

for idx, resp in zip(batch_indices, responses):
Expand Down
4 changes: 4 additions & 0 deletions src/multihead/github_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import subprocess
from dataclasses import dataclass

from multihead.subprocess_utils import no_window_flags

logger = logging.getLogger(__name__)

_GH_TIMEOUT = 30 # seconds
Expand Down Expand Up @@ -84,6 +86,7 @@ def infer_repo() -> str | None:
result = subprocess.run(
["git", "remote", "get-url", "origin"],
capture_output=True, text=True, timeout=10,
creationflags=no_window_flags(),
)
if result.returncode != 0:
return None
Expand Down Expand Up @@ -120,6 +123,7 @@ def _run_gh(args: list[str], timeout: int = _GH_TIMEOUT) -> subprocess.Completed
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout,
creationflags=no_window_flags(),
)
except FileNotFoundError:
raise RuntimeError(
Expand Down
3 changes: 3 additions & 0 deletions src/multihead/init_wizard/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import subprocess
from pathlib import Path

from multihead.subprocess_utils import no_window_flags
from .models import AdapterStatus, HardwareProfile

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -54,6 +55,7 @@ def detect_hardware() -> HardwareProfile:
result = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10,
creationflags=no_window_flags(),
)
if result.returncode == 0:
parts = result.stdout.strip().split(", ")
Expand All @@ -75,6 +77,7 @@ def check_adapters() -> AdapterStatus:
try:
result = subprocess.run(
["ollama", "list"], capture_output=True, text=True, timeout=10,
creationflags=no_window_flags(),
)
if result.returncode == 0:
status.ollama_available = True
Expand Down
2 changes: 2 additions & 0 deletions src/multihead/mcp_server/_tools_decompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import httpx

from multihead.subprocess_utils import no_window_flags
from ._core import _get_ks, _request, logger


Expand Down Expand Up @@ -181,6 +182,7 @@ async def _decompose_claude_p(goal: str, context: str = "") -> str:
lambda: subprocess.run(
cmd, capture_output=True, text=True,
cwd=work_dir, timeout=300, env=env,
creationflags=no_window_flags(),
),
)
except subprocess.TimeoutExpired:
Expand Down
Loading
Loading