From adc6368d239710cce5681ccec58f0ce8607b05a4 Mon Sep 17 00:00:00 2001 From: "Claude (agent)" Date: Wed, 25 Mar 2026 10:14:51 -0400 Subject: [PATCH] refactor: reduce CLI complexity + add vulture whitelist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract helpers from 4 high-CC CLI handlers in __main__.py: - cmd_analyze: _parse_exclude, _persist_file_quality, _score_commits, _print_analyze_summary - cmd_triage: _classify_repo, _print_triage - cmd_fix: _run_ruff_fix, _print_fix_delta - Fix E701 one-liners in _print_fleet_report Add vulture_whitelist.py for known false positives (HTTP handler overrides, public API methods, dataclass fields). Dead code analyzer now auto-detects vulture_whitelist.py in repo root. Score: 62.1 (D) → 98.0 (A) | Complexity: 6→3 findings | Vulture: 91→0 Co-Authored-By: Claude Opus 4.6 (1M context) --- src/arbiter/__main__.py | 275 +++++++++++--------- src/arbiter/analyzers/dead_code_analyzer.py | 9 +- vulture_whitelist.py | 20 ++ 3 files changed, 174 insertions(+), 130 deletions(-) create mode 100644 vulture_whitelist.py diff --git a/src/arbiter/__main__.py b/src/arbiter/__main__.py index 5011ed3..6157a64 100644 --- a/src/arbiter/__main__.py +++ b/src/arbiter/__main__.py @@ -71,56 +71,42 @@ def _find_git_root(path: Path) -> Path | None: return None -def cmd_analyze(args: argparse.Namespace) -> None: - """Full analysis: run analyzers, score, persist to store.""" - repo_path = Path(args.repo).resolve() - git_root = _find_git_root(repo_path) - if not git_root: - print(f"Warning: {repo_path} is not inside a git repository. Skipping commit analysis.", file=sys.stderr) - - db_path = Path(args.db) if args.db else Path("arbiter_data.db") - store = Store(db_path) - registry = AgentRegistry() - - exclude_paths = [p.strip() for p in args.exclude.split(",") if p.strip()] if args.exclude else None - if exclude_paths: - print(f"Analyzing {repo_path} (excluding: {', '.join(exclude_paths)})...", file=sys.stderr) - else: - print(f"Analyzing {repo_path}...", file=sys.stderr) - - # Run analyzers - analyzers = _get_analyzers() - findings = _run_analysis(repo_path, analyzers, exclude_paths=exclude_paths) - loc = count_loc(repo_path) - - repo_name = repo_path.name +def _parse_exclude(args: argparse.Namespace) -> list[str] | None: + """Parse --exclude into a list of paths.""" + if not args.exclude: + return None + return [p.strip() for p in args.exclude.split(",") if p.strip()] - # Score - score = score_findings(findings, loc) - store.record_snapshot(score, loc, repo_name=repo_name) - # Update file-level quality +def _persist_file_quality(findings: list[Finding], store: Store, repo_name: str) -> None: + """Record per-file quality metrics from findings.""" + sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} file_findings: dict[str, list[Finding]] = {} for f in findings: file_findings.setdefault(f.file_path, []).append(f) for fp, ff in file_findings.items(): - worst = max((f.severity for f in ff), key=lambda s: {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}.get(s, 0)) + worst = max((f.severity for f in ff), key=lambda s: sev_rank.get(s, 0)) store.update_file_quality(fp, len(ff), worst, "analysis", repo_name=repo_name) - # Walk recent commits and score per-agent (requires git root) - commits = [] - if git_root: - commits = walk_commits(git_root, max_count=args.commits, registry=registry) - for commit in commits: - # Per-commit scoring: score only the files this commit touched - commit_score = score_commit(git_root, commit, analyzers, exclude_paths=exclude_paths) - store.record_commit( - commit.hash, commit.timestamp, commit.agent, - commit.files_changed, commit.loc_added, commit.loc_removed, commit_score, - repo_name=repo_name, - ) - # Print summary +def _score_commits(git_root: Path, store: Store, analyzers: list[Analyzer], + max_count: int, exclude_paths: list[str] | None, + repo_name: str) -> list: + """Walk recent commits, score each, and persist to store.""" + registry = AgentRegistry() + commits = walk_commits(git_root, max_count=max_count, registry=registry) + for commit in commits: + commit_score = score_commit(git_root, commit, analyzers, exclude_paths=exclude_paths) + store.record_commit( + commit.hash, commit.timestamp, commit.agent, + commit.files_changed, commit.loc_added, commit.loc_removed, commit_score, + repo_name=repo_name, + ) + return commits + + +def _print_analyze_summary(score, loc: int, commits: list, db_path: Path, store: Store) -> None: + """Print full analysis summary with leaderboard.""" print(f"\n{'='*60}") print(f" Arbiter Score: {score.overall} ({score.grade})") print(f"{'='*60}") @@ -132,7 +118,6 @@ def cmd_analyze(args: argparse.Namespace) -> None: print(f" Commits: {len(commits)} analyzed") print(f" Tools: {', '.join(score.findings_by_tool.keys()) or 'none'}") - # Agent leaderboard board = store.get_agent_leaderboard() if board: print("\n Agent Leaderboard:") @@ -142,6 +127,36 @@ def cmd_analyze(args: argparse.Namespace) -> None: print(f"\n Data stored in {db_path}") +def cmd_analyze(args: argparse.Namespace) -> None: + """Full analysis: run analyzers, score, persist to store.""" + repo_path = Path(args.repo).resolve() + git_root = _find_git_root(repo_path) + if not git_root: + print(f"Warning: {repo_path} is not inside a git repository. Skipping commit analysis.", file=sys.stderr) + + db_path = Path(args.db) if args.db else Path("arbiter_data.db") + store = Store(db_path) + exclude_paths = _parse_exclude(args) + + label = f" (excluding: {', '.join(exclude_paths)})" if exclude_paths else "" + print(f"Analyzing {repo_path}{label}...", file=sys.stderr) + + analyzers = _get_analyzers() + findings = _run_analysis(repo_path, analyzers, exclude_paths=exclude_paths) + loc = count_loc(repo_path) + repo_name = repo_path.name + + score = score_findings(findings, loc) + store.record_snapshot(score, loc, repo_name=repo_name) + _persist_file_quality(findings, store, repo_name) + + commits = [] + if git_root: + commits = _score_commits(git_root, store, analyzers, args.commits, exclude_paths, repo_name) + + _print_analyze_summary(score, loc, commits, db_path, store) + + def cmd_score(args: argparse.Namespace) -> None: """Quick score without persistence.""" repo_path = Path(args.repo).resolve() @@ -274,10 +289,14 @@ def cmd_fleet_report(args: argparse.Namespace) -> None: def _print_fleet_report(report: list[dict]) -> None: """Format and print the fleet report.""" def _grade(score: float) -> str: - if score >= 90: return "A" - if score >= 80: return "B" - if score >= 70: return "C" - if score >= 60: return "D" + if score >= 90: + return "A" + if score >= 80: + return "B" + if score >= 70: + return "C" + if score >= 60: + return "D" return "F" print(f"\n{'Repo':30s} {'Score':>6s} {'Grade':>6s} {'Findings':>9s} {'LOC':>8s}") @@ -296,68 +315,100 @@ def _grade(score: float) -> str: print(" | ".join(f"{g}:{c}" for g, c in sorted(grades.items()) if c > 0)) -def cmd_triage(args: argparse.Namespace) -> None: - """Auto-classify repos and output actionable recommendations.""" - db_path = Path(args.db) if args.db else Path("arbiter_fleet.db") - store = Store(db_path) - report = store.get_fleet_report() - if not report: - print("No fleet data. Run 'arbiter audit-fleet ' first.") - return - - green, yellow, red, archive = [], [], [], [] +def _classify_repo(r: dict) -> tuple[str, dict]: + """Classify a single repo into green/yellow/red/archive.""" + score = r.get("overall_score", 0) or 0 + loc = r.get("total_loc", 0) or 0 + findings = r.get("total_findings", 0) or 0 + entry = {"name": r.get("repo_name", "?"), "score": score, "loc": loc, "findings": findings} + if loc == 0 and findings == 0: + return "archive", entry + if score >= 80: + return "green", entry + if score >= 60 and findings <= 100: + return "yellow", entry + return "red", entry - for r in report: - score = r.get("overall_score", 0) or 0 - loc = r.get("total_loc", 0) or 0 - findings = r.get("total_findings", 0) or 0 - name = r.get("repo_name", "?") - - entry = {"name": name, "score": score, "loc": loc, "findings": findings} - - if loc == 0 and findings == 0: - archive.append(entry) - elif score >= 80: - green.append(entry) - elif score >= 60: - if findings > 100: - red.append(entry) - else: - yellow.append(entry) - else: - red.append(entry) +def _print_triage(buckets: dict[str, list[dict]]) -> None: + """Print triage report from classified buckets.""" print("=" * 70) print(" ARBITER FLEET TRIAGE") print("=" * 70) - print(f"\n GREEN ({len(green)} repos) — no action needed") - for r in sorted(green, key=lambda x: -x["score"]): + print(f"\n GREEN ({len(buckets['green'])} repos) — no action needed") + for r in sorted(buckets["green"], key=lambda x: -x["score"]): print(f" {r['name']:30s} {r['score']:5.1f} {r['loc']:>8,} LOC") - print(f"\n YELLOW ({len(yellow)} repos) — minor cleanup") - for r in sorted(yellow, key=lambda x: x["score"]): + print(f"\n YELLOW ({len(buckets['yellow'])} repos) — minor cleanup") + for r in sorted(buckets["yellow"], key=lambda x: x["score"]): print(f" {r['name']:30s} {r['score']:5.1f} {r['findings']:>5d} findings {r['loc']:>8,} LOC") print(f" ACTION: ruff check --fix {r['name']}/") - print(f"\n RED ({len(red)} repos) — needs remediation or archival decision") - for r in sorted(red, key=lambda x: x["score"]): + print(f"\n RED ({len(buckets['red'])} repos) — needs remediation or archival decision") + for r in sorted(buckets["red"], key=lambda x: x["score"]): print(f" {r['name']:30s} {r['score']:5.1f} {r['findings']:>5d} findings {r['loc']:>8,} LOC") - if r["findings"] > 500: - print(f" ACTION: ruff check --fix + manual review (high finding count)") - else: - print(f" ACTION: ruff check --fix, then re-score") + threshold = "ruff check --fix + manual review (high finding count)" if r["findings"] > 500 else "ruff check --fix, then re-score" + print(f" ACTION: {threshold}") - print(f"\n ARCHIVE CANDIDATES ({len(archive)} repos) — 0 LOC, no Python code") - for r in sorted(archive, key=lambda x: x["name"]): + print(f"\n ARCHIVE CANDIDATES ({len(buckets['archive'])} repos) — 0 LOC, no Python code") + for r in sorted(buckets["archive"], key=lambda x: x["name"]): print(f" {r['name']}") - print(f" ACTION: review for archival → gh repo archive hummbl-dev/") + print(" ACTION: review for archival → gh repo archive hummbl-dev/") + total = {k: len(v) for k, v in buckets.items()} print(f"\n{'='*70}") - print(f" Summary: {len(green)} green | {len(yellow)} yellow | {len(red)} red | {len(archive)} archive candidates") + print(f" Summary: {total['green']} green | {total['yellow']} yellow | {total['red']} red | {total['archive']} archive candidates") print(f"{'='*70}") +def cmd_triage(args: argparse.Namespace) -> None: + """Auto-classify repos and output actionable recommendations.""" + db_path = Path(args.db) if args.db else Path("arbiter_fleet.db") + store = Store(db_path) + report = store.get_fleet_report() + if not report: + print("No fleet data. Run 'arbiter audit-fleet ' first.") + return + + buckets: dict[str, list[dict]] = {"green": [], "yellow": [], "red": [], "archive": []} + for r in report: + category, entry = _classify_repo(r) + buckets[category].append(entry) + + _print_triage(buckets) + + +def _run_ruff_fix(repo_path: Path, dry_run: bool) -> str: + """Run ruff --fix (or --diff for dry run) and return output.""" + import subprocess + flags = ["--fix", "--diff"] if dry_run else ["--fix", "--unsafe-fixes"] + result = subprocess.run( + ["ruff", "check", *flags, str(repo_path)], + capture_output=True, text=True, timeout=120, + ) + return result.stdout.strip() + + +def _print_fix_delta(score_before, score_after, findings_after: list[Finding], + repo_path: Path, no_commit: bool) -> None: + """Print before/after comparison for ruff fix.""" + delta = score_after.overall - score_before.overall + print(f"AFTER: {score_after.overall} ({score_after.grade}) | {score_after.total_findings} findings") + print(f"DELTA: {'+' if delta >= 0 else ''}{delta:.1f} points | " + f"{score_before.total_findings - score_after.total_findings} findings fixed") + + if score_after.total_findings > 0: + print(f"\nRemaining findings ({score_after.total_findings}):") + for f in findings_after[:10]: + print(f" {f.file_path}:{f.line} [{f.rule_id}] {f.message[:60]}") + if len(findings_after) > 10: + print(f" ... and {len(findings_after) - 10} more") + + if not no_commit and delta > 0: + print(f"\nTo commit: cd {repo_path} && git add -A && git commit -m 'fix: auto-remediate ruff findings (Arbiter)'") + + def cmd_fix(args: argparse.Namespace) -> None: """Run ruff --fix on a repo and show before/after score.""" repo_path = Path(args.repo).resolve() @@ -365,60 +416,32 @@ def cmd_fix(args: argparse.Namespace) -> None: print(f"Error: {repo_path} is not a git repository", file=sys.stderr) sys.exit(1) - analyzers = _get_analyzers() - ruff_only = [a for a in analyzers if a.name == "ruff"] + ruff_only = [a for a in _get_analyzers() if a.name == "ruff"] if not ruff_only: print("Error: ruff not available", file=sys.stderr) sys.exit(1) - # Before score findings_before = _run_analysis(repo_path, ruff_only) loc = count_loc(repo_path) score_before = score_findings(findings_before, loc) - print(f"\nBEFORE: {score_before.overall} ({score_before.grade}) | {score_before.total_findings} findings") if args.dry_run: - # Show what would be fixed - import subprocess - result = subprocess.run( - ["ruff", "check", "--fix", "--diff", str(repo_path)], - capture_output=True, text=True, timeout=120, - ) - if result.stdout: - lines = result.stdout.strip().split("\n") - print(f"\nWould fix {len([l for l in lines if l.startswith('---')])} files") - print("(use without --dry-run to apply)") + output = _run_ruff_fix(repo_path, dry_run=True) + if output: + file_count = sum(1 for line in output.split("\n") if line.startswith("---")) + print(f"\nWould fix {file_count} files\n(use without --dry-run to apply)") else: print("\nNothing to fix automatically.") return - # Apply fixes - import subprocess - result = subprocess.run( - ["ruff", "check", "--fix", "--unsafe-fixes", str(repo_path)], - capture_output=True, text=True, timeout=120, - ) - print(f"\nruff --fix output: {result.stdout.strip()}" if result.stdout.strip() else "") + output = _run_ruff_fix(repo_path, dry_run=False) + if output: + print(f"\nruff --fix output: {output}") - # After score findings_after = _run_analysis(repo_path, ruff_only) score_after = score_findings(findings_after, loc) - - delta = score_after.overall - score_before.overall - print(f"AFTER: {score_after.overall} ({score_after.grade}) | {score_after.total_findings} findings") - print(f"DELTA: {'+' if delta >= 0 else ''}{delta:.1f} points | " - f"{score_before.total_findings - score_after.total_findings} findings fixed") - - if score_after.total_findings > 0: - print(f"\nRemaining findings ({score_after.total_findings}):") - for f in findings_after[:10]: - print(f" {f.file_path}:{f.line} [{f.rule_id}] {f.message[:60]}") - if len(findings_after) > 10: - print(f" ... and {len(findings_after) - 10} more") - - if not args.no_commit and delta > 0: - print(f"\nTo commit: cd {repo_path} && git add -A && git commit -m 'fix: auto-remediate ruff findings (Arbiter)'") + _print_fix_delta(score_before, score_after, findings_after, repo_path, args.no_commit) def cmd_diff(args: argparse.Namespace) -> None: diff --git a/src/arbiter/analyzers/dead_code_analyzer.py b/src/arbiter/analyzers/dead_code_analyzer.py index afec04d..a142cc8 100644 --- a/src/arbiter/analyzers/dead_code_analyzer.py +++ b/src/arbiter/analyzers/dead_code_analyzer.py @@ -24,10 +24,11 @@ def is_available(self) -> bool: return False def analyze_repo(self, repo_path: Path, exclude_paths: list[str] | None = None) -> list[Finding]: - result = subprocess.run( - ["vulture", str(repo_path), "--min-confidence", "80"], - capture_output=True, text=True, timeout=120, - ) + cmd = ["vulture", str(repo_path), "--min-confidence", "80"] + whitelist = repo_path / "vulture_whitelist.py" + if whitelist.exists(): + cmd.append(str(whitelist)) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if not result.stdout.strip(): return [] diff --git a/vulture_whitelist.py b/vulture_whitelist.py new file mode 100644 index 0000000..0df010c --- /dev/null +++ b/vulture_whitelist.py @@ -0,0 +1,20 @@ +"""Vulture whitelist — items flagged as unused but used by external callers or frameworks.""" + +# HTTP handler overrides (called by stdlib HTTPServer) +do_GET # noqa +log_message # noqa + +# Public API methods (called by CLI and external consumers) +add_agent # noqa +get_profile # noqa +all_agents # noqa +from_json # noqa +post_quality_milestone # noqa +post_quality_alert # noqa + +# Dataclass/config fields (used by callers) +trust_tier # noqa +quality_threshold # noqa +row_factory # noqa +DEFAULT_EXCLUDE_PATHS # noqa +func_name # noqa