diff --git a/folder_desc/__init__.py b/folder_desc/__init__.py new file mode 100644 index 0000000..91aaa30 --- /dev/null +++ b/folder_desc/__init__.py @@ -0,0 +1,4 @@ +"""Folder description tool: recursive file/folder descriptions with LLM-generated annotations.""" +from folder_desc.tree import get_folder_description + +__all__ = ["get_folder_description"] diff --git a/folder_desc/cache.py b/folder_desc/cache.py new file mode 100644 index 0000000..b53bb26 --- /dev/null +++ b/folder_desc/cache.py @@ -0,0 +1,54 @@ +"""JSON-based cache for file descriptions.""" +from __future__ import annotations + +import hashlib +import json +import os +from pathlib import Path + +CACHE_DIR = Path.home() / ".cheetahclaws" / "folder_desc_cache" + + +def _cache_key(file_path: str) -> str: + return hashlib.sha256(file_path.encode()).hexdigest()[:16] + + +def _cache_path(file_path: str) -> Path: + return CACHE_DIR / f"{_cache_key(file_path)}.json" + + +def get_cached_desc(file_path: str) -> str | None: + cp = _cache_path(file_path) + if not cp.exists(): + return None + try: + data = json.loads(cp.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + try: + stat = os.stat(file_path) + except OSError: + return None + if data.get("mtime") != stat.st_mtime or data.get("size") != stat.st_size: + return None + return data.get("desc") + + +def set_cached_desc(file_path: str, desc: str) -> None: + CACHE_DIR.mkdir(parents=True, exist_ok=True) + try: + stat = os.stat(file_path) + except OSError: + return + data = {"desc": desc, "mtime": stat.st_mtime, "size": stat.st_size, "path": file_path} + _cache_path(file_path).write_text(json.dumps(data), encoding="utf-8") + + +def clear_cache() -> int: + if not CACHE_DIR.exists(): + return 0 + count = 0 + for f in CACHE_DIR.glob("*.json"): + f.unlink() + count += 1 + return count diff --git a/folder_desc/describer.py b/folder_desc/describer.py new file mode 100644 index 0000000..d91487e --- /dev/null +++ b/folder_desc/describer.py @@ -0,0 +1,106 @@ +"""LLM-based file description generator with parallel execution.""" +from __future__ import annotations + +import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +from folder_desc.cache import get_cached_desc, set_cached_desc + +_DESC_RE = re.compile(r"#\s*\[desc\]\s*(.+?)\s*\[/desc\]") +_MAX_PREVIEW_LINES = 100 +_MAX_WORKERS = 8 + + +def extract_inline_desc(file_path: str) -> str | None: + """Return the `# [desc] ... [/desc]` tag on the first line, or None.""" + try: + with open(file_path, encoding="utf-8", errors="replace") as f: + first_line = next(iter(f), "") + except OSError: + return None # unreadable file = no inline description + m = _DESC_RE.search(first_line) + return m.group(1).strip() if m else None + + +def _read_preview(file_path: str) -> str: + try: + with open(file_path, encoding="utf-8", errors="replace") as f: + lines = [] + for i, line in enumerate(f): + if i >= _MAX_PREVIEW_LINES: + break + lines.append(line) + return "".join(lines) + except OSError: + return "" + + +def describe_file(file_path: str, config: dict | None = None) -> str: + inline = extract_inline_desc(file_path) + if inline: + set_cached_desc(file_path, inline) + return inline + + cached = get_cached_desc(file_path) + if cached: + return cached + + preview = _read_preview(file_path) + if not preview.strip(): + return "Empty file" + + desc = _call_llm_for_desc(file_path, preview, config) + set_cached_desc(file_path, desc) + return desc + + +def _call_llm_for_desc(file_path: str, preview: str, config: dict | None) -> str: + try: + from auxiliary import stream_auxiliary + name = Path(file_path).name + prompt = ( + f"Describe what the file '{name}' does in ONE short sentence (max 15 words). " + f"No markdown, no quotes, just the description.\n\n```\n{preview[:3000]}\n```" + ) + result = stream_auxiliary( + system="You generate concise one-line file descriptions.", + messages=[{"role": "user", "content": prompt}], + config=config or {}, + ) + return result.strip().rstrip(".") + except Exception: + return f"({Path(file_path).suffix or 'unknown'} file)" + + +def describe_files_parallel( + file_paths: list[str], config: dict | None = None, +) -> dict[str, str]: + results: dict[str, str] = {} + to_describe: list[str] = [] + + for fp in file_paths: + inline = extract_inline_desc(fp) + if inline: + results[fp] = inline + set_cached_desc(fp, inline) + continue + cached = get_cached_desc(fp) + if cached: + results[fp] = cached + continue + to_describe.append(fp) + + if not to_describe: + return results + + with ThreadPoolExecutor(max_workers=min(_MAX_WORKERS, len(to_describe))) as pool: + futures = {pool.submit(describe_file, fp, config): fp for fp in to_describe} + for future in as_completed(futures): + fp = futures[future] + try: + results[fp] = future.result() + except Exception: + results[fp] = "(description unavailable)" + + return results diff --git a/folder_desc/tools.py b/folder_desc/tools.py new file mode 100644 index 0000000..f188e59 --- /dev/null +++ b/folder_desc/tools.py @@ -0,0 +1,41 @@ +"""Self-registering GetFolderDescription tool.""" +from __future__ import annotations + +from tool_registry import ToolDef, register_tool +from folder_desc.tree import get_folder_description + +_SCHEMA = { + "name": "GetFolderDescription", + "description": ( + "Return a recursive tree of code files in a folder with their [desc] one-line " + "descriptions. If descriptions are missing, they are generated automatically " + "(parallel LLM calls) before the tree is returned. Useful for understanding a " + "codebase at a glance." + ), + "input_schema": { + "type": "object", + "properties": { + "folder_path": { + "type": "string", + "description": "Absolute path to the folder to describe", + }, + }, + "required": ["folder_path"], + }, +} + + +def _get_folder_description(params: dict, config: dict) -> str: + folder_path = params.get("folder_path", "") + if not folder_path: + return "Error: missing required parameter 'folder_path'" + return get_folder_description(folder_path, config) + + +register_tool(ToolDef( + name="GetFolderDescription", + schema=_SCHEMA, + func=_get_folder_description, + read_only=True, + concurrent_safe=True, +)) diff --git a/folder_desc/tree.py b/folder_desc/tree.py new file mode 100644 index 0000000..490c22d --- /dev/null +++ b/folder_desc/tree.py @@ -0,0 +1,111 @@ +"""Recursive directory tree builder with file descriptions.""" +from __future__ import annotations + +import os +from pathlib import Path + +from folder_desc.describer import describe_files_parallel + +SKIP_DIRS = { + ".git", "__pycache__", ".venv", "venv", "node_modules", ".tox", + ".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build", + ".egg-info", ".eggs", ".nano_claude", +} + +CODE_EXTENSIONS = { + ".py", ".js", ".ts", ".tsx", ".jsx", ".java", ".go", ".rs", ".rb", + ".c", ".cpp", ".h", ".hpp", ".cs", ".php", ".swift", ".kt", + ".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd", + ".yaml", ".yml", ".toml", ".json", ".xml", ".ini", ".cfg", + ".md", ".rst", ".txt", + ".html", ".css", ".scss", ".less", + ".sql", ".r", ".R", ".lua", ".zig", ".nim", + ".dockerfile", ".Dockerfile", +} + +MAX_FILES = 500 + + +def _is_code_file(path: Path) -> bool: + if path.suffix.lower() in CODE_EXTENSIONS: + return True + if path.name in ("Makefile", "Dockerfile", "Jenkinsfile", "Procfile", ".gitignore"): + return True + return False + + +def _collect_files(folder: Path) -> list[Path]: + files: list[Path] = [] + + def _walk(current: Path, depth: int = 0) -> None: + if depth > 10 or len(files) >= MAX_FILES: + return + try: + entries = sorted(current.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())) + except OSError: + return + for entry in entries: + if entry.is_dir(): + if entry.name in SKIP_DIRS or entry.name.startswith("."): + continue + _walk(entry, depth + 1) + elif entry.is_file() and _is_code_file(entry): + files.append(entry) + + _walk(folder) + return files + + +def _build_tree_string(folder: Path, descriptions: dict[str, str]) -> str: + lines: list[str] = [] + folder_str = str(folder) + + def _walk(current: Path, prefix: str = "", depth: int = 0) -> None: + if depth > 10: + return + try: + entries = sorted(current.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())) + except OSError: + return + + visible = [] + for entry in entries: + if entry.is_dir(): + if entry.name in SKIP_DIRS or entry.name.startswith("."): + continue + visible.append(entry) + elif entry.is_file() and _is_code_file(entry): + visible.append(entry) + + for i, entry in enumerate(visible): + is_last = i == len(visible) - 1 + connector = "`-- " if is_last else "|-- " + child_prefix = prefix + (" " if is_last else "| ") + + if entry.is_dir(): + lines.append(f"{prefix}{connector}{entry.name}/") + _walk(entry, child_prefix, depth + 1) + else: + desc = descriptions.get(str(entry), "") + desc_tag = f" [desc] {desc} [/desc]" if desc else "" + lines.append(f"{prefix}{connector}{entry.name}{desc_tag}") + + lines.append(f"{folder.name}/") + _walk(folder) + return "\n".join(lines) + + +def get_folder_description(folder_path: str, config: dict | None = None) -> str: + folder = Path(folder_path) + if not folder.is_dir(): + return f"Error: {folder_path} is not a directory" + + files = _collect_files(folder) + if not files: + return f"{folder.name}/ (empty or no code files found)" + + file_paths = [str(f) for f in files] + descriptions = describe_files_parallel(file_paths, config) + tree = _build_tree_string(folder, descriptions) + + return f"{len(files)} code files found.\n\n{tree}" diff --git a/tests/test_folder_desc.py b/tests/test_folder_desc.py new file mode 100644 index 0000000..f5ad6f0 --- /dev/null +++ b/tests/test_folder_desc.py @@ -0,0 +1,132 @@ +"""Tests for the folder_desc package.""" +import json +import os +import tempfile + +import pytest + +from folder_desc.cache import ( + CACHE_DIR, _cache_key, _cache_path, + get_cached_desc, set_cached_desc, clear_cache, +) +from folder_desc.describer import extract_inline_desc, describe_files_parallel +from folder_desc.tree import ( + _is_code_file, _collect_files, _build_tree_string, get_folder_description, + SKIP_DIRS, +) +from pathlib import Path + + +class TestCache: + def test_cache_key_deterministic(self): + assert _cache_key("/a/b.py") == _cache_key("/a/b.py") + + def test_cache_key_different_paths(self): + assert _cache_key("/a/b.py") != _cache_key("/a/c.py") + + def test_set_and_get(self, tmp_path): + f = tmp_path / "test.py" + f.write_text("print('hello')") + import folder_desc.cache as mod + old_dir = mod.CACHE_DIR + mod.CACHE_DIR = tmp_path / "cache" + try: + set_cached_desc(str(f), "prints hello") + assert get_cached_desc(str(f)) == "prints hello" + finally: + mod.CACHE_DIR = old_dir + + def test_cache_invalidates_on_change(self, tmp_path): + f = tmp_path / "test.py" + f.write_text("v1") + import folder_desc.cache as mod + old_dir = mod.CACHE_DIR + mod.CACHE_DIR = tmp_path / "cache" + try: + set_cached_desc(str(f), "version 1") + f.write_text("v2") + assert get_cached_desc(str(f)) is None + finally: + mod.CACHE_DIR = old_dir + + def test_get_nonexistent(self): + assert get_cached_desc("/nonexistent/file.py") is None + + def test_clear_cache(self, tmp_path): + import folder_desc.cache as mod + old_dir = mod.CACHE_DIR + mod.CACHE_DIR = tmp_path / "cache" + try: + (tmp_path / "cache").mkdir() + (tmp_path / "cache" / "a.json").write_text("{}") + (tmp_path / "cache" / "b.json").write_text("{}") + assert clear_cache() == 2 + finally: + mod.CACHE_DIR = old_dir + + +class TestDescriber: + def test_extract_inline_desc(self, tmp_path): + f = tmp_path / "mod.py" + f.write_text("# [desc] Handles user authentication [/desc]\nimport os\n") + assert extract_inline_desc(str(f)) == "Handles user authentication" + + def test_extract_inline_desc_missing(self, tmp_path): + f = tmp_path / "mod.py" + f.write_text("import os\n") + assert extract_inline_desc(str(f)) is None + + def test_extract_inline_desc_nonexistent(self): + assert extract_inline_desc("/nonexistent.py") is None + + def test_describe_files_parallel_inline(self, tmp_path): + f1 = tmp_path / "a.py" + f1.write_text("# [desc] Module A [/desc]\n") + f2 = tmp_path / "b.py" + f2.write_text("# [desc] Module B [/desc]\n") + results = describe_files_parallel([str(f1), str(f2)]) + assert results[str(f1)] == "Module A" + assert results[str(f2)] == "Module B" + + +class TestTree: + def test_is_code_file(self): + assert _is_code_file(Path("foo.py")) + assert _is_code_file(Path("Makefile")) + assert not _is_code_file(Path("image.png")) + assert not _is_code_file(Path("data.bin")) + + def test_collect_files_skips_dirs(self, tmp_path): + (tmp_path / "__pycache__").mkdir() + (tmp_path / "__pycache__" / "mod.pyc").write_text("") + (tmp_path / "src").mkdir() + (tmp_path / "src" / "main.py").write_text("# [desc] Main entry [/desc]\n") + (tmp_path / "readme.md").write_text("# Readme") + files = _collect_files(tmp_path) + names = [f.name for f in files] + assert "main.py" in names + assert "readme.md" in names + assert "mod.pyc" not in names + + def test_build_tree_string(self, tmp_path): + (tmp_path / "a.py").write_text("") + descs = {str(tmp_path / "a.py"): "Module A"} + tree = _build_tree_string(tmp_path, descs) + assert "a.py" in tree + assert "[desc] Module A [/desc]" in tree + + def test_get_folder_description_not_dir(self): + result = get_folder_description("/nonexistent/path") + assert "Error" in result or "not a directory" in result + + def test_get_folder_description_with_inline(self, tmp_path): + (tmp_path / "main.py").write_text("# [desc] Entry point [/desc]\nprint('hi')\n") + (tmp_path / "utils.py").write_text("# [desc] Utility helpers [/desc]\n") + result = get_folder_description(str(tmp_path)) + assert "2 code files found" in result + assert "Entry point" in result + assert "Utility helpers" in result + + def test_get_folder_description_empty(self, tmp_path): + result = get_folder_description(str(tmp_path)) + assert "empty" in result.lower() or "no code files" in result.lower() diff --git a/tests/test_folder_description_e2e.py b/tests/test_folder_description_e2e.py new file mode 100644 index 0000000..3b7b679 --- /dev/null +++ b/tests/test_folder_description_e2e.py @@ -0,0 +1,74 @@ +"""End-to-end: LLM calls GetFolderDescription on a real tmp_path layout. + +Files with an inline `# [desc] ... [/desc]` tag return that tag verbatim +without any LLM call. Files without a tag would normally trigger a +describer LLM call -- we put the tag on every fixture file so the test +stays provider-independent and fast. Only `providers.stream` is mocked. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +import tools as _tools_init # noqa: F401 - registers GetFolderDescription +import folder_desc.cache as cache_mod +from agent import AgentState, run +from providers import AssistantTurn + + +def _scripted_stream(turns): + cursor = iter(turns) + + def fake_stream(**_kwargs): + spec = next(cursor) + yield AssistantTurn( + text=spec.get("text", ""), + tool_calls=spec.get("tool_calls") or [], + in_tokens=1, out_tokens=1, + ) + + return fake_stream + + +@pytest.fixture +def codebase(tmp_path, monkeypatch): + """Build a small code tree with inline [desc] tags and redirect the cache.""" + monkeypatch.setattr(cache_mod, "CACHE_DIR", tmp_path / "_cache") + + (tmp_path / "pkg").mkdir() + (tmp_path / "pkg" / "a.py").write_text( + "# [desc] public API surface [/desc]\n\ndef hello(): ...\n", + encoding="utf-8", + ) + (tmp_path / "pkg" / "b.py").write_text( + "# [desc] internal helpers [/desc]\n\n_x = 1\n", + encoding="utf-8", + ) + return tmp_path + + +def test_llm_sees_folder_tree_with_descriptions(monkeypatch, codebase): + """Drive agent.run: the LLM calls GetFolderDescription and the tool_result + carries a tree containing both files' inline descriptions.""" + turns = [ + {"tool_calls": [{ + "id": "fd1", + "name": "GetFolderDescription", + "input": {"folder_path": str(codebase)}, + }]}, + {"text": "got it"}, + ] + monkeypatch.setattr("agent.stream", _scripted_stream(turns)) + + state = AgentState() + config = {"model": "test", "permission_mode": "accept-all", + "_session_id": "fd_e2e", "disabled_tools": ["Agent"]} + list(run("describe the folder", state, config, "sys")) + + tool_result = next(m for m in state.messages + if m.get("role") == "tool" and m.get("tool_call_id") == "fd1") + content = tool_result["content"] + assert "a.py" in content and "b.py" in content + assert "public API surface" in content + assert "internal helpers" in content diff --git a/tools/__init__.py b/tools/__init__.py index 8731a8c..575208d 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -495,6 +495,7 @@ def _register_builtins() -> None: "skill.tools", "cc_mcp.tools", "task.tools", + "folder_desc.tools", ] for _mod_name in _EXTENSION_MODULES: