From af78eb3dd0ba1c6881715d7b9f86fdca74550376 Mon Sep 17 00:00:00 2001 From: Alan Kharebov Date: Mon, 23 Mar 2026 12:29:42 -0700 Subject: [PATCH 1/5] fix: suppress subprocess popups on Windows + use CLAUDE_MODEL env var - Create subprocess_utils.py with no_window_flags() helper - Apply CREATE_NO_WINDOW to all subprocess.run/Popen calls across 17 files - Monkey-patch anyio.open_process in Claude SDK adapter for headless ops - Fix claude_agent_sdk.py default model: use CLAUDE_MODEL env var instead of hardcoded claude-opus-4-6 --- scripts/claude_worker.py | 5 ++++ src/multihead/adapters/claude_agent_sdk.py | 25 ++++++++++++++++++- src/multihead/api/routes_system.py | 4 ++- .../autonomous_executor/strategies.py | 2 ++ src/multihead/claim_corroboration.py | 4 +++ src/multihead/diagnostics.py | 3 +++ src/multihead/extractors/ci_extractor.py | 4 +++ .../extractors/test_results_extractor.py | 5 +++- src/multihead/github_integration.py | 4 +++ src/multihead/init_wizard/hardware.py | 3 +++ src/multihead/mcp_server/_tools_decompose.py | 2 ++ .../source_extractors/git_extractor.py | 4 +++ src/multihead/night_shift/stages_late.py | 2 ++ src/multihead/resilience.py | 3 +++ src/multihead/resource_monitor.py | 3 +++ src/multihead/shell/context.py | 6 +++++ src/multihead/shell/core.py | 2 ++ src/multihead/subprocess_utils.py | 10 ++++++++ 18 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 src/multihead/subprocess_utils.py diff --git a/scripts/claude_worker.py b/scripts/claude_worker.py index 01cf905..bb16626 100644 --- a/scripts/claude_worker.py +++ b/scripts/claude_worker.py @@ -32,6 +32,8 @@ import httpx +from multihead.subprocess_utils import no_window_flags + logging.basicConfig( level=logging.INFO, format="[worker] %(asctime)s %(levelname)s %(message)s", @@ -376,6 +378,7 @@ def _invoke_claude(self, prompt: str, session_id: str | None = None) -> dict: cwd=CLAUDE_WORK_DIR, timeout=SUBPROCESS_TIMEOUT, env=env, + creationflags=no_window_flags(), ) except subprocess.TimeoutExpired: elapsed = time.monotonic() - start @@ -458,6 +461,7 @@ def _wake_interactive(self, message: str) -> dict: check = subprocess.run( ["tmux", "has-session", "-t", self.tmux_target], capture_output=True, timeout=5, + creationflags=no_window_flags(), ) if check.returncode != 0: return {"error": f"tmux session '{self.tmux_target}' not found. " @@ -467,6 +471,7 @@ def _wake_interactive(self, message: str) -> dict: subprocess.run( ["tmux", "send-keys", "-t", self.tmux_target, message, "Enter"], check=True, timeout=5, + creationflags=no_window_flags(), ) return {"result": f"Sent to tmux session '{self.tmux_target}'", "mode": "interactive"} diff --git a/src/multihead/adapters/claude_agent_sdk.py b/src/multihead/adapters/claude_agent_sdk.py index d2e51ea..a57b400 100644 --- a/src/multihead/adapters/claude_agent_sdk.py +++ b/src/multihead/adapters/claude_agent_sdk.py @@ -12,6 +12,7 @@ import logging import os +import sys from collections.abc import AsyncIterator from typing import Any @@ -26,6 +27,28 @@ os.environ.pop("CLAUDECODE", None) os.environ.pop("ANTHROPIC_API_KEY", None) +# --------------------------------------------------------------------------- +# Monkey-patch anyio.open_process to suppress console windows on Windows. +# The Claude Agent SDK spawns subprocesses via anyio, which does not pass +# CREATE_NO_WINDOW by default. This causes popup cmd windows during Night +# Shift and other headless operations. +# --------------------------------------------------------------------------- +if sys.platform == "win32": + import subprocess as _subprocess + + try: + import anyio as _anyio + + _original_open_process = _anyio.open_process + + async def _patched_open_process(*args: Any, **kwargs: Any) -> Any: + kwargs.setdefault("creationflags", _subprocess.CREATE_NO_WINDOW) + return await _original_open_process(*args, **kwargs) + + _anyio.open_process = _patched_open_process # type: ignore[assignment] + except ImportError: + pass # anyio not installed — SDK won't work either + logger = logging.getLogger(__name__) # Default timeouts / limits @@ -77,7 +100,7 @@ def __init__(self, manifest: HeadManifest) -> None: extra = manifest.extra or {} self._model = extra.get("model") or manifest.model or os.environ.get( - "CLAUDE_MODEL", "claude-opus-4-6" + "CLAUDE_MODEL", "claude-sonnet-4-6" ) self._max_turns = extra.get("max_turns", _DEFAULT_MAX_TURNS) self._max_budget = extra.get("max_budget_usd", _DEFAULT_MAX_BUDGET_USD) diff --git a/src/multihead/api/routes_system.py b/src/multihead/api/routes_system.py index 2da36b9..f771ee7 100644 --- a/src/multihead/api/routes_system.py +++ b/src/multihead/api/routes_system.py @@ -12,6 +12,8 @@ from fastapi import APIRouter, Query, Request +from multihead.subprocess_utils import no_window_flags + logger = logging.getLogger(__name__) router = APIRouter() @@ -62,7 +64,7 @@ async def restart(): project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # Spawn a new server process directly (bypasses mh.cmd issues with nested shells) flags = ( - subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP + subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP | no_window_flags() if sys.platform == "win32" else 0 ) diff --git a/src/multihead/autonomous_executor/strategies.py b/src/multihead/autonomous_executor/strategies.py index 281d42a..e6e6086 100644 --- a/src/multihead/autonomous_executor/strategies.py +++ b/src/multihead/autonomous_executor/strategies.py @@ -12,6 +12,7 @@ from abc import ABC, abstractmethod from typing import Any +from multihead.subprocess_utils import no_window_flags from .models import StepExecutionResult logger = logging.getLogger(__name__) @@ -213,6 +214,7 @@ async def execute_step( cwd=self.work_dir, timeout=timeout or self.subprocess_timeout, env=env, + creationflags=no_window_flags(), ), ) except subprocess.TimeoutExpired: diff --git a/src/multihead/claim_corroboration.py b/src/multihead/claim_corroboration.py index 77e3f95..4651f42 100644 --- a/src/multihead/claim_corroboration.py +++ b/src/multihead/claim_corroboration.py @@ -17,6 +17,8 @@ import subprocess from pathlib import Path +from multihead.subprocess_utils import no_window_flags + # Patterns to extract file paths from text _FILE_PATH_RE = re.compile( r'(?:^|[\s\'"`(])(/(?:mnt|home|tmp|usr|opt|var)[/\w._-]+(?:\.\w{1,10})?)', @@ -71,6 +73,7 @@ def get_git_head_sha(repo_path: str = ".") -> str | None: result = subprocess.run( ["git", "-C", repo_path, "rev-parse", "HEAD"], capture_output=True, text=True, timeout=5, + creationflags=no_window_flags(), ) if result.returncode == 0: return result.stdout.strip() @@ -103,6 +106,7 @@ def check_sha_staleness( result = subprocess.run( ["git", "-C", repo_path, "diff", source_sha, "HEAD", "--", file_path], capture_output=True, text=True, timeout=10, + creationflags=no_window_flags(), ) if result.returncode != 0: return {"stale": False, "penalty": 0.0, "reason": "git_error"} diff --git a/src/multihead/diagnostics.py b/src/multihead/diagnostics.py index 908e3e4..2e7d95a 100644 --- a/src/multihead/diagnostics.py +++ b/src/multihead/diagnostics.py @@ -7,6 +7,8 @@ from pathlib import Path from typing import Any +from multihead.subprocess_utils import no_window_flags + @dataclass class DiagnosticResult: @@ -115,6 +117,7 @@ def _check_ollama(self) -> DiagnosticResult: import subprocess result = subprocess.run( ["ollama", "list"], capture_output=True, text=True, timeout=10, + creationflags=no_window_flags(), ) if result.returncode == 0: models = [l.split()[0] for l in result.stdout.strip().split("\n")[1:] if l.strip()] diff --git a/src/multihead/extractors/ci_extractor.py b/src/multihead/extractors/ci_extractor.py index c0e8c45..4ecaba0 100644 --- a/src/multihead/extractors/ci_extractor.py +++ b/src/multihead/extractors/ci_extractor.py @@ -17,6 +17,8 @@ from pathlib import Path from typing import Any +from multihead.subprocess_utils import no_window_flags + logger = logging.getLogger(__name__) @@ -38,6 +40,7 @@ def extract_ci_runs( ["gh", "api", f"repos/{owner}/{repo}/actions/runs", "--jq", f".workflow_runs[:{limit}]"], capture_output=True, text=True, timeout=30, + creationflags=no_window_flags(), ) if result.returncode != 0: logger.error("gh api failed: %s", result.stderr) @@ -124,6 +127,7 @@ def _extract_job_results( ["gh", "api", f"repos/{owner}/{repo}/actions/runs/{run_id}/jobs", "--jq", ".jobs"], capture_output=True, text=True, timeout=15, + creationflags=no_window_flags(), ) if result.returncode != 0: return [] diff --git a/src/multihead/extractors/test_results_extractor.py b/src/multihead/extractors/test_results_extractor.py index ba19b17..24f3635 100644 --- a/src/multihead/extractors/test_results_extractor.py +++ b/src/multihead/extractors/test_results_extractor.py @@ -22,6 +22,8 @@ from pathlib import Path from typing import Any +from multihead.subprocess_utils import no_window_flags + logger = logging.getLogger(__name__) @@ -243,7 +245,8 @@ def run_and_extract( args.extend(extra_args) try: - result = subprocess.run(args, capture_output=True, text=True, timeout=600) + result = subprocess.run(args, capture_output=True, text=True, timeout=600, + creationflags=no_window_flags()) claims = extract_from_junit_xml(xml_path) # Also parse stdout for any extra info diff --git a/src/multihead/github_integration.py b/src/multihead/github_integration.py index 35a40d6..fdb8fbd 100644 --- a/src/multihead/github_integration.py +++ b/src/multihead/github_integration.py @@ -12,6 +12,8 @@ import subprocess from dataclasses import dataclass +from multihead.subprocess_utils import no_window_flags + logger = logging.getLogger(__name__) _GH_TIMEOUT = 30 # seconds @@ -84,6 +86,7 @@ def infer_repo() -> str | None: result = subprocess.run( ["git", "remote", "get-url", "origin"], capture_output=True, text=True, timeout=10, + creationflags=no_window_flags(), ) if result.returncode != 0: return None @@ -120,6 +123,7 @@ def _run_gh(args: list[str], timeout: int = _GH_TIMEOUT) -> subprocess.Completed try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, + creationflags=no_window_flags(), ) except FileNotFoundError: raise RuntimeError( diff --git a/src/multihead/init_wizard/hardware.py b/src/multihead/init_wizard/hardware.py index fae26a1..97eb9b4 100644 --- a/src/multihead/init_wizard/hardware.py +++ b/src/multihead/init_wizard/hardware.py @@ -9,6 +9,7 @@ import subprocess from pathlib import Path +from multihead.subprocess_utils import no_window_flags from .models import AdapterStatus, HardwareProfile logger = logging.getLogger(__name__) @@ -54,6 +55,7 @@ def detect_hardware() -> HardwareProfile: result = subprocess.run( ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"], capture_output=True, text=True, timeout=10, + creationflags=no_window_flags(), ) if result.returncode == 0: parts = result.stdout.strip().split(", ") @@ -75,6 +77,7 @@ def check_adapters() -> AdapterStatus: try: result = subprocess.run( ["ollama", "list"], capture_output=True, text=True, timeout=10, + creationflags=no_window_flags(), ) if result.returncode == 0: status.ollama_available = True diff --git a/src/multihead/mcp_server/_tools_decompose.py b/src/multihead/mcp_server/_tools_decompose.py index 1282d98..0f6e4c3 100644 --- a/src/multihead/mcp_server/_tools_decompose.py +++ b/src/multihead/mcp_server/_tools_decompose.py @@ -6,6 +6,7 @@ import httpx +from multihead.subprocess_utils import no_window_flags from ._core import _get_ks, _request, logger @@ -181,6 +182,7 @@ async def _decompose_claude_p(goal: str, context: str = "") -> str: lambda: subprocess.run( cmd, capture_output=True, text=True, cwd=work_dir, timeout=300, env=env, + creationflags=no_window_flags(), ), ) except subprocess.TimeoutExpired: diff --git a/src/multihead/narrative/source_extractors/git_extractor.py b/src/multihead/narrative/source_extractors/git_extractor.py index 6604c68..11be46e 100644 --- a/src/multihead/narrative/source_extractors/git_extractor.py +++ b/src/multihead/narrative/source_extractors/git_extractor.py @@ -33,6 +33,7 @@ ) from multihead.models import new_id from multihead.narrative.confidence import ConfidenceCalibrator, SourcePriority +from multihead.subprocess_utils import no_window_flags from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -98,6 +99,7 @@ def extract_commits( try: result = subprocess.run( log_args, capture_output=True, text=True, timeout=30, + creationflags=no_window_flags(), ) if result.returncode != 0: logger.error("git log failed: %s", result.stderr) @@ -340,6 +342,7 @@ def _get_diff_stats(self, commit_hash: str) -> list[dict[str, Any]]: result = subprocess.run( ["git", "-C", str(self.repo_path), "diff", "--numstat", f"{commit_hash}~1", commit_hash], capture_output=True, text=True, timeout=15, + creationflags=no_window_flags(), ) if result.returncode != 0: return [] @@ -370,6 +373,7 @@ def _get_diff_content(self, commit_hash: str, file_path: str, max_lines: int = 1 result = subprocess.run( ["git", "-C", str(self.repo_path), "diff", f"{commit_hash}~1", commit_hash, "--", file_path], capture_output=True, text=True, timeout=15, + creationflags=no_window_flags(), ) if result.returncode != 0: return "" diff --git a/src/multihead/night_shift/stages_late.py b/src/multihead/night_shift/stages_late.py index cca395c..cd8dca9 100644 --- a/src/multihead/night_shift/stages_late.py +++ b/src/multihead/night_shift/stages_late.py @@ -15,6 +15,7 @@ Link, ) +from multihead.subprocess_utils import no_window_flags from .models import _prov logger = logging.getLogger(__name__) @@ -671,6 +672,7 @@ def _find_repo(file_path: str) -> str: ["git", "diff", "--name-only", sha, "HEAD"], capture_output=True, text=True, timeout=10, cwd=repo_dir, + creationflags=no_window_flags(), ) if result.returncode != 0: sha_repo_cache[cache_key] = set() diff --git a/src/multihead/resilience.py b/src/multihead/resilience.py index f79e3b8..35e6069 100644 --- a/src/multihead/resilience.py +++ b/src/multihead/resilience.py @@ -6,6 +6,8 @@ import time from typing import Any +from multihead.subprocess_utils import no_window_flags + # --------------------------------------------------------------------------- # Circuit Breaker @@ -134,6 +136,7 @@ def check(self, data_dir: str = ".") -> ResourceStatus: capture_output=True, text=True, timeout=5, + creationflags=no_window_flags(), ) if result.returncode == 0: parts = result.stdout.strip().split(",") diff --git a/src/multihead/resource_monitor.py b/src/multihead/resource_monitor.py index 0afc3b6..ae3aac7 100644 --- a/src/multihead/resource_monitor.py +++ b/src/multihead/resource_monitor.py @@ -15,6 +15,8 @@ from pathlib import Path from typing import Any +from multihead.subprocess_utils import no_window_flags + logger = logging.getLogger(__name__) # Default history length (e.g., 60 samples at 10s interval = 10 min window) @@ -111,6 +113,7 @@ def sample(self) -> ResourceSnapshot: ["nvidia-smi", "--query-gpu=memory.used,memory.free,memory.total", "--format=csv,noheader,nounits"], capture_output=True, text=True, timeout=5, + creationflags=no_window_flags(), ) if result.returncode == 0: parts = result.stdout.strip().split(",") diff --git a/src/multihead/shell/context.py b/src/multihead/shell/context.py index 9760809..39344c1 100644 --- a/src/multihead/shell/context.py +++ b/src/multihead/shell/context.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import Any +from multihead.subprocess_utils import no_window_flags from .prompts import SHELL_SYSTEM_PROMPT logger = logging.getLogger(__name__) @@ -115,6 +116,7 @@ def _gather_codebase_context(self) -> str: repo_root = subprocess.run( ["git", "rev-parse", "--show-toplevel"], capture_output=True, text=True, timeout=5, + creationflags=no_window_flags(), ).stdout.strip() except Exception: repo_root = "" @@ -127,6 +129,7 @@ def _gather_codebase_context(self) -> str: ["git", "branch", "--show-current"], capture_output=True, text=True, timeout=5, cwd=repo_root, + creationflags=no_window_flags(), ).stdout.strip() except Exception: branch = "unknown" @@ -138,6 +141,7 @@ def _gather_codebase_context(self) -> str: top_items = subprocess.run( ["ls", "-1", repo_root], capture_output=True, text=True, timeout=5, + creationflags=no_window_flags(), ).stdout.strip() if top_items: parts.append(f"\n**Top-level**:\n```\n{top_items}\n```") @@ -151,6 +155,7 @@ def _gather_codebase_context(self) -> str: result = subprocess.run( ["find", str(src_dir), "-type", "f", "-name", "*.py"], capture_output=True, text=True, timeout=5, + creationflags=no_window_flags(), ) files = result.stdout.strip().splitlines() # Show relative paths, limit to 40 files @@ -169,6 +174,7 @@ def _gather_codebase_context(self) -> str: ["git", "log", "--oneline", "-10"], capture_output=True, text=True, timeout=5, cwd=repo_root, + creationflags=no_window_flags(), ).stdout.strip() if log: parts.append(f"\n**Recent commits**:\n```\n{log}\n```") diff --git a/src/multihead/shell/core.py b/src/multihead/shell/core.py index af8dde1..2928bcf 100644 --- a/src/multihead/shell/core.py +++ b/src/multihead/shell/core.py @@ -16,6 +16,7 @@ from ..event_watcher import EventWatcher from ..service_manager import ServiceManager from ..shell_pipeline import AGENT_ID, SELF_IDENTITIES, ShellPipeline +from ..subprocess_utils import no_window_flags from .brain import BrainMixin from .context import ContextMixin @@ -161,6 +162,7 @@ def _compute_context_hash() -> tuple[str, dict[str, str]]: try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=5, + creationflags=no_window_flags(), ) meta[key] = result.stdout.strip() if result.returncode == 0 else "" except Exception: diff --git a/src/multihead/subprocess_utils.py b/src/multihead/subprocess_utils.py new file mode 100644 index 0000000..1287a43 --- /dev/null +++ b/src/multihead/subprocess_utils.py @@ -0,0 +1,10 @@ +"""Subprocess utilities for cross-platform window suppression.""" +import sys +import subprocess + + +def no_window_flags() -> int: + """Return CREATE_NO_WINDOW on Windows, 0 elsewhere.""" + if sys.platform == "win32": + return subprocess.CREATE_NO_WINDOW + return 0 From fa678addfad0ad3adc3f1efede8e2e68ba722452 Mon Sep 17 00:00:00 2001 From: Alan Kharebov Date: Mon, 23 Mar 2026 17:02:39 -0700 Subject: [PATCH 2/5] =?UTF-8?q?fix:=20guard=20Night=20Shift=20trigger=20?= =?UTF-8?q?=E2=80=94=20require=20active=20head=20+=20increase=20progress?= =?UTF-8?q?=20buffer=20to=205000?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Return 400 if no heads are active when triggering Night Shift - Increase progress deque from 100 to 5000 to prevent event loss on large runs --- src/multihead/api/routes_nightshift.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/multihead/api/routes_nightshift.py b/src/multihead/api/routes_nightshift.py index 5abeb0b..79b082f 100644 --- a/src/multihead/api/routes_nightshift.py +++ b/src/multihead/api/routes_nightshift.py @@ -7,6 +7,9 @@ from typing import Any from fastapi import APIRouter, BackgroundTasks, Query, Request +from fastapi.responses import JSONResponse + +from multihead.models import HeadState router = APIRouter() @@ -16,7 +19,7 @@ "running": False, "last_report": None, "current_stage": None, - "progress": deque(maxlen=100), + "progress": deque(maxlen=5000), } @@ -28,12 +31,25 @@ async def trigger_nightshift( concurrency: int = Query(1, description="Parallel LLM calls per stage (1=sequential)"), ) -> dict[str, Any]: """Trigger a Night Shift run in the background.""" + # Check for at least one active head before starting + head_manager = request.app.state.head_manager + states = head_manager.get_states() + has_active = any( + info.get("state") == HeadState.ACTIVE.value + for info in states.values() + ) + if not has_active: + return JSONResponse( + status_code=400, + content={"error": "No active heads — wake a head before running Night Shift"}, + ) + async with _nightshift_lock: if _nightshift_status["running"]: return {"status": "already_running"} _nightshift_status["running"] = True _nightshift_status["current_stage"] = None - _nightshift_status["progress"] = deque(maxlen=100) + _nightshift_status["progress"] = deque(maxlen=5000) night_shift = request.app.state.night_shift night_shift.config.concurrency = max(1, concurrency) From e27c97b45757bfa79b0d5e556a6f9d55256aeb35 Mon Sep 17 00:00:00 2001 From: Alan Kharebov Date: Mon, 23 Mar 2026 17:02:46 -0700 Subject: [PATCH 3/5] feat: emit project_start/project_done progress events during session harvest - Add on_progress callback to harvest_all() - Emit per-project events with project name, index, total, and file count --- src/multihead/session_harvester/harvester.py | 29 +++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/multihead/session_harvester/harvester.py b/src/multihead/session_harvester/harvester.py index e7a4226..3017b84 100644 --- a/src/multihead/session_harvester/harvester.py +++ b/src/multihead/session_harvester/harvester.py @@ -94,16 +94,29 @@ def get_manifest(self) -> dict[str, Any]: # Harvesting # ------------------------------------------------------------------ - def harvest_all(self) -> HarvestResult: - """Full scan + harvest cycle.""" + def harvest_all(self, on_progress=None) -> HarvestResult: + """Full scan + harvest cycle. + + Args: + on_progress: Optional callback ``(event_dict) -> None`` for progress reporting. + """ t0 = time.monotonic() result = HarvestResult() manifest = self.get_manifest() projects = self.scan_projects() result.projects_scanned = len(projects) - - for project in projects: + total = len(projects) + + for i, project in enumerate(projects): + if on_progress: + on_progress({ + "event": "project_start", + "project": project.name, + "project_index": i, + "project_total": total, + }) + files_in_project = len(project.memory_files) try: harvested, evolutions = self._harvest_project(project, manifest) if harvested > 0: @@ -116,6 +129,14 @@ def harvest_all(self) -> HarvestResult: msg = f"{project.name}: {e}" result.errors.append(msg) logger.warning("Harvest error for %s: %s", project.name, e) + if on_progress: + on_progress({ + "event": "project_done", + "project": project.name, + "project_index": i, + "project_total": total, + "file_count": files_in_project, + }) # Update manifest timestamp manifest["last_full_scan"] = datetime.now(timezone.utc).isoformat() From a2c421b136fdb9a0b1cfeee40db1d4b66060f1df Mon Sep 17 00:00:00 2001 From: Alan Kharebov Date: Mon, 23 Mar 2026 17:02:55 -0700 Subject: [PATCH 4/5] feat: emit file_start/file_done/chunk_progress events during extraction stages - Add on_chunk_progress callback to base extractor map_generate() - All 4 extractors pass chunk progress callback through - stages_early emits file_start/file_done for each extraction + passes on_progress to harvester --- src/multihead/extractors/base.py | 3 ++ src/multihead/extractors/claim_extractor.py | 3 +- src/multihead/extractors/entity_extractor.py | 3 +- src/multihead/extractors/event_extractor.py | 3 +- src/multihead/extractors/topic_assigner.py | 1 + src/multihead/night_shift/stages_early.py | 52 +++++++++++++++++++- 6 files changed, 61 insertions(+), 4 deletions(-) diff --git a/src/multihead/extractors/base.py b/src/multihead/extractors/base.py index 1fcd146..8110600 100644 --- a/src/multihead/extractors/base.py +++ b/src/multihead/extractors/base.py @@ -83,6 +83,7 @@ async def map_generate( stage_name: str = "", batch_mode: bool = False, no_wait: bool = False, + on_chunk_progress: Callable[[int, int], None] | None = None, ) -> list[dict[str, Any] | Exception]: """Call generate for each prompt, optionally in parallel or batch. @@ -209,6 +210,8 @@ def _on_progress(completed: int, total: int) -> None: raise InterruptedError( f"Shutdown at {idx}/{len(prompts)} in {stage_name or 'map_generate'}" ) + if on_chunk_progress: + on_chunk_progress(idx, len(prompts)) try: result = await BaseExtractor.call_generate(adapter, prompt) results.append(result) diff --git a/src/multihead/extractors/claim_extractor.py b/src/multihead/extractors/claim_extractor.py index 4f74574..d882e41 100644 --- a/src/multihead/extractors/claim_extractor.py +++ b/src/multihead/extractors/claim_extractor.py @@ -78,8 +78,9 @@ async def extract( adapter, prompts, concurrency=concurrency, checkpoint_dir=kwargs.get("checkpoint_dir"), stage_name=kwargs.get("stage_name", ""), - batch_mode=kwargs.get("batch_mode", False), + batch_mode=kwargs.get("batch_mode", False), no_wait=kwargs.get("no_wait", False), + on_chunk_progress=kwargs.get("on_chunk_progress"), ) import logging as _log diff --git a/src/multihead/extractors/entity_extractor.py b/src/multihead/extractors/entity_extractor.py index 5a7a242..8464c78 100644 --- a/src/multihead/extractors/entity_extractor.py +++ b/src/multihead/extractors/entity_extractor.py @@ -40,8 +40,9 @@ async def extract( adapter, prompts, concurrency=concurrency, checkpoint_dir=kwargs.get("checkpoint_dir"), stage_name=kwargs.get("stage_name", ""), - batch_mode=kwargs.get("batch_mode", False), + batch_mode=kwargs.get("batch_mode", False), no_wait=kwargs.get("no_wait", False), + on_chunk_progress=kwargs.get("on_chunk_progress"), ) for chunk, resp in zip(chunks, responses): diff --git a/src/multihead/extractors/event_extractor.py b/src/multihead/extractors/event_extractor.py index 669bf33..77e8ba1 100644 --- a/src/multihead/extractors/event_extractor.py +++ b/src/multihead/extractors/event_extractor.py @@ -38,8 +38,9 @@ async def extract( adapter, prompts, concurrency=concurrency, checkpoint_dir=kwargs.get("checkpoint_dir"), stage_name=kwargs.get("stage_name", ""), - batch_mode=kwargs.get("batch_mode", False), + batch_mode=kwargs.get("batch_mode", False), no_wait=kwargs.get("no_wait", False), + on_chunk_progress=kwargs.get("on_chunk_progress"), ) for chunk, resp in zip(chunks, responses): diff --git a/src/multihead/extractors/topic_assigner.py b/src/multihead/extractors/topic_assigner.py index 1cec59e..dc9b237 100644 --- a/src/multihead/extractors/topic_assigner.py +++ b/src/multihead/extractors/topic_assigner.py @@ -53,6 +53,7 @@ async def extract( stage_name=kwargs.get("stage_name", ""), batch_mode=kwargs.get("batch_mode", False), no_wait=kwargs.get("no_wait", False), + on_chunk_progress=kwargs.get("on_chunk_progress"), ) for idx, resp in zip(batch_indices, responses): diff --git a/src/multihead/night_shift/stages_early.py b/src/multihead/night_shift/stages_early.py index 304339c..34416dc 100644 --- a/src/multihead/night_shift/stages_early.py +++ b/src/multihead/night_shift/stages_early.py @@ -146,7 +146,7 @@ async def _stage_session_harvest(self, context: dict) -> dict: knowledge_store=self.knowledge, data_dir=data_dir, ) - result = harvester.harvest_all() + result = harvester.harvest_all(on_progress=self._emit) # 2. Conversation transcript harvest (new) conv_harvester = ConversationHarvester( @@ -270,6 +270,27 @@ async def _stage_hot_signals(self, context: dict) -> dict: "metrics": {}, } + def _make_chunk_progress_cb(self, stage_name: str, chunks: list): + """Build a callback that emits file_start/file_done/chunk_progress events.""" + total = len(chunks) + + def _cb(chunk_index: int, chunk_total: int) -> None: + chunk = chunks[chunk_index] if chunk_index < len(chunks) else None + file_name = "" + source_project = "" + if chunk: + file_name = getattr(chunk, "record_id", "") or "" + source_project = getattr(chunk, "chunk_id", "").split("/")[0] if "/" in getattr(chunk, "chunk_id", "") else "" + self._emit({ + "event": "chunk_progress", + "stage": stage_name, + "file_name": file_name, + "chunk_index": chunk_index, + "chunk_total": chunk_total, + }) + + return _cb + async def _adapter_or_fn(self): """Return raw adapter for batch mode, generate closure otherwise.""" if self.config.batch_mode: @@ -278,11 +299,18 @@ async def _adapter_or_fn(self): async def _stage_entity_extraction(self, context: dict) -> dict: chunks = context.get("chunks", []) + self._emit({"event": "file_start", "stage": "entity_extraction", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "source_project": "", "chunk_total": len(chunks)}) result = await self.entity_extractor.extract( chunks, await self._adapter_or_fn(), concurrency=self.config.concurrency, checkpoint_dir=self.output_dir, stage_name="entity_extraction", batch_mode=self.config.batch_mode, no_wait=self.config.no_wait, + on_chunk_progress=self._make_chunk_progress_cb("entity_extraction", chunks), ) + self._emit({"event": "file_done", "stage": "entity_extraction", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "chunk_count": len(chunks), "results_count": len(result.items)}) return {"entities": result.items, "metrics": result.metrics} async def _stage_topic_assignment(self, context: dict) -> dict: @@ -309,19 +337,30 @@ async def _stage_topic_assignment(self, context: dict) -> dict: span_end=0, )) + self._emit({"event": "file_start", "stage": "topic_assignment", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "source_project": "", "chunk_total": len(chunks)}) result = await self.topic_assigner.extract( chunks, await self._adapter_or_fn(), concurrency=self.config.concurrency, checkpoint_dir=self.output_dir, stage_name="topic_assignment", batch_mode=self.config.batch_mode, no_wait=self.config.no_wait, + on_chunk_progress=self._make_chunk_progress_cb("topic_assignment", chunks), ) + self._emit({"event": "file_done", "stage": "topic_assignment", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "chunk_count": len(chunks), "results_count": len(result.items)}) return {"topics": result.items, "metrics": result.metrics} async def _stage_event_extraction(self, context: dict) -> dict: chunks = context.get("chunks", []) + self._emit({"event": "file_start", "stage": "event_extraction", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "source_project": "", "chunk_total": len(chunks)}) result = await self.event_extractor.extract( chunks, await self._adapter_or_fn(), concurrency=self.config.concurrency, checkpoint_dir=self.output_dir, stage_name="event_extraction", batch_mode=self.config.batch_mode, no_wait=self.config.no_wait, + on_chunk_progress=self._make_chunk_progress_cb("event_extraction", chunks), ) # Build chunk_id → agent_folder lookup @@ -369,6 +408,10 @@ async def _stage_event_extraction(self, context: dict) -> dict: except (ValueError, KeyError) as e: logger.warning(f"Skipping malformed event: {e}") + self._emit({"event": "file_done", "stage": "event_extraction", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "chunk_count": len(chunks), "results_count": len(result.items)}) + return { "extracted_events": result.items, "events_created": events_created, @@ -377,11 +420,18 @@ async def _stage_event_extraction(self, context: dict) -> dict: async def _stage_claim_extraction(self, context: dict) -> dict: chunks = context.get("chunks", []) + self._emit({"event": "file_start", "stage": "claim_extraction", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "source_project": "", "chunk_total": len(chunks)}) result = await self.claim_extractor.extract( chunks, await self._adapter_or_fn(), concurrency=self.config.concurrency, checkpoint_dir=self.output_dir, stage_name="claim_extraction", batch_mode=self.config.batch_mode, no_wait=self.config.no_wait, + on_chunk_progress=self._make_chunk_progress_cb("claim_extraction", chunks), ) + self._emit({"event": "file_done", "stage": "claim_extraction", + "file_name": "chunks", "file_index": 0, "file_total": 1, + "chunk_count": len(chunks), "results_count": len(result.items)}) # Build chunk_id → (session_id, session_date) lookup for session dating chunk_session: dict[str, tuple[str, str]] = {} From af3d99e2e8428cf096a3d63e72ea7c645cf76466 Mon Sep 17 00:00:00 2001 From: Alan Kharebov Date: Tue, 24 Mar 2026 13:25:20 -0700 Subject: [PATCH 5/5] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?= =?UTF-8?q?move=20anyio=20patch=20to=20adapter=20init,=20fix=20docstring,?= =?UTF-8?q?=20add=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/multihead/adapters/claude_agent_sdk.py | 52 ++++++++++++---------- tests/0-nano/test_subprocess_utils.py | 16 +++++++ 2 files changed, 45 insertions(+), 23 deletions(-) create mode 100644 tests/0-nano/test_subprocess_utils.py diff --git a/src/multihead/adapters/claude_agent_sdk.py b/src/multihead/adapters/claude_agent_sdk.py index a57b400..daa6f5f 100644 --- a/src/multihead/adapters/claude_agent_sdk.py +++ b/src/multihead/adapters/claude_agent_sdk.py @@ -27,28 +27,6 @@ os.environ.pop("CLAUDECODE", None) os.environ.pop("ANTHROPIC_API_KEY", None) -# --------------------------------------------------------------------------- -# Monkey-patch anyio.open_process to suppress console windows on Windows. -# The Claude Agent SDK spawns subprocesses via anyio, which does not pass -# CREATE_NO_WINDOW by default. This causes popup cmd windows during Night -# Shift and other headless operations. -# --------------------------------------------------------------------------- -if sys.platform == "win32": - import subprocess as _subprocess - - try: - import anyio as _anyio - - _original_open_process = _anyio.open_process - - async def _patched_open_process(*args: Any, **kwargs: Any) -> Any: - kwargs.setdefault("creationflags", _subprocess.CREATE_NO_WINDOW) - return await _original_open_process(*args, **kwargs) - - _anyio.open_process = _patched_open_process # type: ignore[assignment] - except ImportError: - pass # anyio not installed — SDK won't work either - logger = logging.getLogger(__name__) # Default timeouts / limits @@ -85,7 +63,7 @@ class ClaudeAgentSDKAdapter(HeadAdapter): for typed messages, streaming, session management, and hooks. Configuration via manifest.extra: - model: str — Claude model (default: from env or "claude-sonnet-4-20250514") + model: str — Claude model (default: from env or "claude-sonnet-4-6") max_turns: int — Max agent turns per call (default: 25) max_budget_usd: float — Budget cap per call (default: 5.0) permission_mode: str — "bypassPermissions" | "acceptEdits" | "default" @@ -95,8 +73,36 @@ class ClaudeAgentSDKAdapter(HeadAdapter): effort: str — "low" | "medium" | "high" | "max" """ + _anyio_patched = False + + @classmethod + def _patch_anyio(cls) -> None: + """Monkey-patch anyio.open_process to suppress console windows on Windows. + + The Claude Agent SDK spawns subprocesses via anyio, which does not pass + CREATE_NO_WINDOW by default. This causes popup cmd windows during Night + Shift and other headless operations. Only patches once per process. + """ + if cls._anyio_patched or sys.platform != "win32": + return + try: + import anyio as _anyio + import subprocess as _subprocess + + _original = _anyio.open_process + + async def _patched(*args: Any, **kwargs: Any) -> Any: + kwargs.setdefault("creationflags", _subprocess.CREATE_NO_WINDOW) + return await _original(*args, **kwargs) + + _anyio.open_process = _patched # type: ignore[assignment] + cls._anyio_patched = True + except ImportError: + pass # anyio not installed — SDK won't work either + def __init__(self, manifest: HeadManifest) -> None: super().__init__(manifest) + self._patch_anyio() extra = manifest.extra or {} self._model = extra.get("model") or manifest.model or os.environ.get( diff --git a/tests/0-nano/test_subprocess_utils.py b/tests/0-nano/test_subprocess_utils.py new file mode 100644 index 0000000..e25402c --- /dev/null +++ b/tests/0-nano/test_subprocess_utils.py @@ -0,0 +1,16 @@ +"""Tests for subprocess_utils.no_window_flags().""" +import subprocess +import sys +from unittest.mock import patch + +from multihead.subprocess_utils import no_window_flags + + +def test_no_window_flags_windows(): + with patch.object(sys, "platform", "win32"): + assert no_window_flags() == subprocess.CREATE_NO_WINDOW + + +def test_no_window_flags_non_windows(): + with patch.object(sys, "platform", "linux"): + assert no_window_flags() == 0