Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions tests/test_plan_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Unit tests for the plan-mode tools.

Exercise `tools.plan_mode._enter_plan_mode` and `_exit_plan_mode` in
isolation: the permission-mode transitions, plan-file lifecycle and the
"empty plan" guard. E2E coverage (through agent.run + a mocked LLM stream
+ the Write tool) lives in test_plan_mode_e2e.py.
"""
from __future__ import annotations

from pathlib import Path

import pytest

import runtime
import tools as _tools_init # noqa: F401 — register tools including plan_mode
from tools.plan_mode import _enter_plan_mode, _exit_plan_mode


@pytest.fixture(autouse=True)
def _isolated_ctx():
"""Ensure plan-mode state is not leaked between tests (same session_id)."""
yield
for sid in ("default", "unit_sess"):
ctx = runtime.get_session_ctx(sid)
ctx.plan_file = None
ctx.prev_permission_mode = None


def _mk_config(cwd):
return {
"_session_id": "unit_sess",
"_worktree_cwd": str(cwd),
"permission_mode": "auto",
}


class TestEnterPlanMode:
def test_creates_plan_file_with_header(self, tmp_path):
config = _mk_config(tmp_path)
result = _enter_plan_mode({"task_description": "Refactor X"}, config)

plan_path = tmp_path / ".nano_claude" / "plans" / "unit_sess.md"
assert plan_path.exists()
assert plan_path.read_text(encoding="utf-8").startswith("# Plan: Refactor X")
assert "Plan mode activated" in result

def test_flips_permission_mode_to_plan(self, tmp_path):
config = _mk_config(tmp_path)
_enter_plan_mode({}, config)
assert config["permission_mode"] == "plan"

def test_is_idempotent_if_already_in_plan_mode(self, tmp_path):
config = _mk_config(tmp_path)
_enter_plan_mode({}, config)
second = _enter_plan_mode({}, config)
assert "Already in plan mode" in second


class TestExitPlanMode:
def test_rejects_empty_plan(self, tmp_path):
config = _mk_config(tmp_path)
_enter_plan_mode({}, config) # writes only the "# Plan" header
result = _exit_plan_mode({}, config)
assert "empty" in result.lower()
# Still in plan mode, since exit was refused.
assert config["permission_mode"] == "plan"

def test_accepts_plan_with_real_content_and_restores_permission(self, tmp_path):
config = _mk_config(tmp_path)
_enter_plan_mode({}, config)
plan_path = tmp_path / ".nano_claude" / "plans" / "unit_sess.md"
plan_path.write_text("# Plan\n\n## Steps\n1. read\n2. write\n", encoding="utf-8")

result = _exit_plan_mode({}, config)
assert "Plan mode exited" in result
assert "## Steps" in result
assert config["permission_mode"] == "auto"

def test_noop_when_not_in_plan_mode(self, tmp_path):
config = _mk_config(tmp_path) # permission_mode = "auto"
result = _exit_plan_mode({}, config)
assert "Not in plan mode" in result
113 changes: 113 additions & 0 deletions tests/test_plan_mode_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""End-to-end: LLM drives the plan-mode workflow via agent.run + mocked stream.

The plan file is written using the regular `Write` tool, whose permission
check only allows writes to the current plan_file while in plan mode -- so
this test also exercises the agent._check_permission plan-mode branch.

Only `providers.stream` is mocked. Plan tools, registry dispatch, Write tool
and the per-session RuntimeContext all run for real against tmp_path.
"""
from __future__ import annotations

from pathlib import Path

import pytest

import tools as _tools_init # noqa: F401 - register built-ins + plan_mode
import runtime
from agent import AgentState, run
from providers import AssistantTurn


def _scripted_stream(turns):
cursor = iter(turns)

def fake_stream(**_kwargs):
spec = next(cursor)
yield AssistantTurn(
text=spec.get("text", ""),
tool_calls=spec.get("tool_calls") or [],
in_tokens=1, out_tokens=1,
)

return fake_stream


@pytest.fixture(autouse=True)
def _reset_plan_ctx():
yield
for sid in ("default", "plan_e2e", "plan_rogue"):
ctx = runtime.get_session_ctx(sid)
ctx.plan_file = None
ctx.prev_permission_mode = None


def test_full_plan_mode_flow_through_agent_loop(monkeypatch, tmp_path):
"""EnterPlanMode → Write(plan_file) → ExitPlanMode, all via the real agent loop."""
plan_file = str(tmp_path / ".nano_claude" / "plans" / "plan_e2e.md")
plan_body = "# Plan: Refactor X\n\n## Steps\n1. explore\n2. implement\n"
turns = [
{"tool_calls": [{
"id": "t1", "name": "EnterPlanMode",
"input": {"task_description": "Refactor X"},
}]},
{"tool_calls": [{
"id": "t2", "name": "Write",
"input": {"file_path": plan_file, "content": plan_body},
}]},
{"tool_calls": [{
"id": "t3", "name": "ExitPlanMode", "input": {},
}]},
{"text": "all done"},
]
monkeypatch.setattr("agent.stream", _scripted_stream(turns))

state = AgentState()
config = {
"model": "test",
"permission_mode": "auto", # plan mode will flip it to "plan"
"_session_id": "plan_e2e",
"_worktree_cwd": str(tmp_path),
}
list(run("plan a refactor", state, config, "sys"))

# Plan file ended up on disk with the Write-tool content.
assert Path(plan_file).read_text(encoding="utf-8") == plan_body

# ExitPlanMode restored the previous permission mode.
assert config["permission_mode"] == "auto"


def test_write_outside_plan_file_is_rejected_in_plan_mode(monkeypatch, tmp_path):
"""The permission-mode 'plan' branch must deny Writes to any file != plan_file."""
plan_file = str(tmp_path / ".nano_claude" / "plans" / "plan_rogue.md")
unrelated = str(tmp_path / "src" / "config.py")
turns = [
{"tool_calls": [{
"id": "t1", "name": "EnterPlanMode",
"input": {"task_description": "secure"},
}]},
{"tool_calls": [{
"id": "t2", "name": "Write",
"input": {"file_path": unrelated, "content": "print('pwned')"},
}]},
{"text": "stopped"},
]
monkeypatch.setattr("agent.stream", _scripted_stream(turns))

state = AgentState()
config = {
"model": "test",
"permission_mode": "auto",
"_session_id": "plan_rogue",
"_worktree_cwd": str(tmp_path),
}
list(run("try a rogue write", state, config, "sys"))

# The unrelated file was NEVER created.
assert not Path(unrelated).exists()

# The Write tool_result for t2 carries the rejection message.
t2_result = next(m for m in state.messages
if m.get("role") == "tool" and m.get("tool_call_id") == "t2")
assert "Denied" in t2_result["content"] or "plan" in t2_result["content"].lower()
107 changes: 13 additions & 94 deletions tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,21 @@ def _register_builtins() -> None:
"skill.tools",
"cc_mcp.tools",
"task.tools",
"tools.plan_mode",
]

for _mod_name in _EXTENSION_MODULES:
try:
__import__(_mod_name)
except Exception:
pass # Extension loading is best-effort; never crash startup
except Exception as _ext_err:
# Best-effort loading — a missing optional extension must not crash startup,
# but the cause should still be visible on stderr for diagnosis.
import sys as _sys
print(
f"[tools] extension {_mod_name!r} failed to load: "
f"{type(_ext_err).__name__}: {_ext_err}",
file=_sys.stderr,
)

from multi_agent.tools import get_agent_manager as _get_agent_manager # noqa: F401

Expand All @@ -525,95 +533,6 @@ def _register_builtins() -> None:
except Exception:
pass

# ── Plan mode tools (EnterPlanMode / ExitPlanMode) ────────────────────────

from pathlib import Path as _Path


def _enter_plan_mode(params: dict, config: dict) -> str:
if config.get("permission_mode") == "plan":
return "Already in plan mode. Write your plan to the plan file, then call ExitPlanMode."

session_id = config.get("_session_id", "default")
plans_dir = _Path(config.get("_worktree_cwd") or _Path.cwd()) / ".nano_claude" / "plans"
plans_dir.mkdir(parents=True, exist_ok=True)
plan_path = plans_dir / f"{session_id}.md"

task_desc = params.get("task_description", "")
if not plan_path.exists() or plan_path.stat().st_size == 0:
header = f"# Plan: {task_desc}\n\n" if task_desc else "# Plan\n\n"
plan_path.write_text(header, encoding="utf-8")

import runtime
sctx = runtime.get_ctx(config)
sctx.prev_permission_mode = config.get("permission_mode", "auto")
config["permission_mode"] = "plan"
sctx.plan_file = str(plan_path)
return (
f"Plan mode activated. Plan file: {plan_path}\n"
"Write your step-by-step plan to the plan file, then call ExitPlanMode when ready to implement."
)


def _exit_plan_mode(params: dict, config: dict) -> str:
if config.get("permission_mode") != "plan":
return "Not in plan mode."
import runtime
sctx = runtime.get_ctx(config)
plan_file = sctx.plan_file or ""
plan_content = ""
if plan_file:
try:
plan_content = _Path(plan_file).read_text(encoding="utf-8").strip()
except Exception:
plan_content = ""

# Reject if plan file is effectively empty (only whitespace / top-level title)
# A top-level title is exactly "# ..." (single #). ## sections count as content.
non_trivial_lines = [
l for l in plan_content.splitlines()
if l.strip() and not (l.strip().startswith("# ") and not l.strip().startswith("## "))
]
if not non_trivial_lines:
return (
"Plan is empty — please write your step-by-step plan to the plan file "
f"({plan_file}) before exiting plan mode."
)

config["permission_mode"] = sctx.prev_permission_mode or "auto"
sctx.prev_permission_mode = None
sctx.plan_file = None
return (
f"Plan mode exited. Resuming normal permissions.\n\n"
f"Plan content:\n{plan_content}\n\n"
"Wait for the user to approve the plan before executing any steps."
)


_plan_schema_enter = {
"name": "EnterPlanMode",
"description": (
"Switch to plan mode: read-only except for writing the plan file. "
"Use this to analyze a task and write a step-by-step plan before executing."
),
"input_schema": {
"type": "object",
"properties": {
"task_description": {
"type": "string",
"description": "Brief description of what you plan to do",
},
},
"required": [],
},
}
_plan_schema_exit = {
"name": "ExitPlanMode",
"description": "Exit plan mode and return to normal permissions to begin executing the plan.",
"input_schema": {"type": "object", "properties": {}, "required": []},
}

register_tool(ToolDef("EnterPlanMode", _plan_schema_enter, _enter_plan_mode,
read_only=True, concurrent_safe=False))
register_tool(ToolDef("ExitPlanMode", _plan_schema_exit, _exit_plan_mode,
read_only=False, concurrent_safe=False))
# Plan mode tools (EnterPlanMode / ExitPlanMode) are registered by
# tools/plan_mode.py via the extension loader above; the old inline block
# that used to live here is removed so there is a single source of truth.
Loading
Loading