diff --git a/src/arbiter/__main__.py b/src/arbiter/__main__.py
index 5011ed3..6157a64 100644
--- a/src/arbiter/__main__.py
+++ b/src/arbiter/__main__.py
@@ -71,56 +71,42 @@ def _find_git_root(path: Path) -> Path | None:
return None
-def cmd_analyze(args: argparse.Namespace) -> None:
- """Full analysis: run analyzers, score, persist to store."""
- repo_path = Path(args.repo).resolve()
- git_root = _find_git_root(repo_path)
- if not git_root:
- print(f"Warning: {repo_path} is not inside a git repository. Skipping commit analysis.", file=sys.stderr)
-
- db_path = Path(args.db) if args.db else Path("arbiter_data.db")
- store = Store(db_path)
- registry = AgentRegistry()
-
- exclude_paths = [p.strip() for p in args.exclude.split(",") if p.strip()] if args.exclude else None
- if exclude_paths:
- print(f"Analyzing {repo_path} (excluding: {', '.join(exclude_paths)})...", file=sys.stderr)
- else:
- print(f"Analyzing {repo_path}...", file=sys.stderr)
-
- # Run analyzers
- analyzers = _get_analyzers()
- findings = _run_analysis(repo_path, analyzers, exclude_paths=exclude_paths)
- loc = count_loc(repo_path)
-
- repo_name = repo_path.name
+def _parse_exclude(args: argparse.Namespace) -> list[str] | None:
+ """Parse --exclude into a list of paths."""
+ if not args.exclude:
+ return None
+ return [p.strip() for p in args.exclude.split(",") if p.strip()]
- # Score
- score = score_findings(findings, loc)
- store.record_snapshot(score, loc, repo_name=repo_name)
- # Update file-level quality
+def _persist_file_quality(findings: list[Finding], store: Store, repo_name: str) -> None:
+ """Record per-file quality metrics from findings."""
+ sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
file_findings: dict[str, list[Finding]] = {}
for f in findings:
file_findings.setdefault(f.file_path, []).append(f)
for fp, ff in file_findings.items():
- worst = max((f.severity for f in ff), key=lambda s: {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}.get(s, 0))
+ worst = max((f.severity for f in ff), key=lambda s: sev_rank.get(s, 0))
store.update_file_quality(fp, len(ff), worst, "analysis", repo_name=repo_name)
- # Walk recent commits and score per-agent (requires git root)
- commits = []
- if git_root:
- commits = walk_commits(git_root, max_count=args.commits, registry=registry)
- for commit in commits:
- # Per-commit scoring: score only the files this commit touched
- commit_score = score_commit(git_root, commit, analyzers, exclude_paths=exclude_paths)
- store.record_commit(
- commit.hash, commit.timestamp, commit.agent,
- commit.files_changed, commit.loc_added, commit.loc_removed, commit_score,
- repo_name=repo_name,
- )
- # Print summary
+def _score_commits(git_root: Path, store: Store, analyzers: list[Analyzer],
+ max_count: int, exclude_paths: list[str] | None,
+ repo_name: str) -> list:
+ """Walk recent commits, score each, and persist to store."""
+ registry = AgentRegistry()
+ commits = walk_commits(git_root, max_count=max_count, registry=registry)
+ for commit in commits:
+ commit_score = score_commit(git_root, commit, analyzers, exclude_paths=exclude_paths)
+ store.record_commit(
+ commit.hash, commit.timestamp, commit.agent,
+ commit.files_changed, commit.loc_added, commit.loc_removed, commit_score,
+ repo_name=repo_name,
+ )
+ return commits
+
+
+def _print_analyze_summary(score, loc: int, commits: list, db_path: Path, store: Store) -> None:
+ """Print full analysis summary with leaderboard."""
print(f"\n{'='*60}")
print(f" Arbiter Score: {score.overall} ({score.grade})")
print(f"{'='*60}")
@@ -132,7 +118,6 @@ def cmd_analyze(args: argparse.Namespace) -> None:
print(f" Commits: {len(commits)} analyzed")
print(f" Tools: {', '.join(score.findings_by_tool.keys()) or 'none'}")
- # Agent leaderboard
board = store.get_agent_leaderboard()
if board:
print("\n Agent Leaderboard:")
@@ -142,6 +127,36 @@ def cmd_analyze(args: argparse.Namespace) -> None:
print(f"\n Data stored in {db_path}")
+def cmd_analyze(args: argparse.Namespace) -> None:
+ """Full analysis: run analyzers, score, persist to store."""
+ repo_path = Path(args.repo).resolve()
+ git_root = _find_git_root(repo_path)
+ if not git_root:
+ print(f"Warning: {repo_path} is not inside a git repository. Skipping commit analysis.", file=sys.stderr)
+
+ db_path = Path(args.db) if args.db else Path("arbiter_data.db")
+ store = Store(db_path)
+ exclude_paths = _parse_exclude(args)
+
+ label = f" (excluding: {', '.join(exclude_paths)})" if exclude_paths else ""
+ print(f"Analyzing {repo_path}{label}...", file=sys.stderr)
+
+ analyzers = _get_analyzers()
+ findings = _run_analysis(repo_path, analyzers, exclude_paths=exclude_paths)
+ loc = count_loc(repo_path)
+ repo_name = repo_path.name
+
+ score = score_findings(findings, loc)
+ store.record_snapshot(score, loc, repo_name=repo_name)
+ _persist_file_quality(findings, store, repo_name)
+
+ commits = []
+ if git_root:
+ commits = _score_commits(git_root, store, analyzers, args.commits, exclude_paths, repo_name)
+
+ _print_analyze_summary(score, loc, commits, db_path, store)
+
+
def cmd_score(args: argparse.Namespace) -> None:
"""Quick score without persistence."""
repo_path = Path(args.repo).resolve()
@@ -274,10 +289,14 @@ def cmd_fleet_report(args: argparse.Namespace) -> None:
def _print_fleet_report(report: list[dict]) -> None:
"""Format and print the fleet report."""
def _grade(score: float) -> str:
- if score >= 90: return "A"
- if score >= 80: return "B"
- if score >= 70: return "C"
- if score >= 60: return "D"
+ if score >= 90:
+ return "A"
+ if score >= 80:
+ return "B"
+ if score >= 70:
+ return "C"
+ if score >= 60:
+ return "D"
return "F"
print(f"\n{'Repo':30s} {'Score':>6s} {'Grade':>6s} {'Findings':>9s} {'LOC':>8s}")
@@ -296,68 +315,100 @@ def _grade(score: float) -> str:
print(" | ".join(f"{g}:{c}" for g, c in sorted(grades.items()) if c > 0))
-def cmd_triage(args: argparse.Namespace) -> None:
- """Auto-classify repos and output actionable recommendations."""
- db_path = Path(args.db) if args.db else Path("arbiter_fleet.db")
- store = Store(db_path)
- report = store.get_fleet_report()
- if not report:
- print("No fleet data. Run 'arbiter audit-fleet
' first.")
- return
-
- green, yellow, red, archive = [], [], [], []
+def _classify_repo(r: dict) -> tuple[str, dict]:
+ """Classify a single repo into green/yellow/red/archive."""
+ score = r.get("overall_score", 0) or 0
+ loc = r.get("total_loc", 0) or 0
+ findings = r.get("total_findings", 0) or 0
+ entry = {"name": r.get("repo_name", "?"), "score": score, "loc": loc, "findings": findings}
+ if loc == 0 and findings == 0:
+ return "archive", entry
+ if score >= 80:
+ return "green", entry
+ if score >= 60 and findings <= 100:
+ return "yellow", entry
+ return "red", entry
- for r in report:
- score = r.get("overall_score", 0) or 0
- loc = r.get("total_loc", 0) or 0
- findings = r.get("total_findings", 0) or 0
- name = r.get("repo_name", "?")
-
- entry = {"name": name, "score": score, "loc": loc, "findings": findings}
-
- if loc == 0 and findings == 0:
- archive.append(entry)
- elif score >= 80:
- green.append(entry)
- elif score >= 60:
- if findings > 100:
- red.append(entry)
- else:
- yellow.append(entry)
- else:
- red.append(entry)
+def _print_triage(buckets: dict[str, list[dict]]) -> None:
+ """Print triage report from classified buckets."""
print("=" * 70)
print(" ARBITER FLEET TRIAGE")
print("=" * 70)
- print(f"\n GREEN ({len(green)} repos) — no action needed")
- for r in sorted(green, key=lambda x: -x["score"]):
+ print(f"\n GREEN ({len(buckets['green'])} repos) — no action needed")
+ for r in sorted(buckets["green"], key=lambda x: -x["score"]):
print(f" {r['name']:30s} {r['score']:5.1f} {r['loc']:>8,} LOC")
- print(f"\n YELLOW ({len(yellow)} repos) — minor cleanup")
- for r in sorted(yellow, key=lambda x: x["score"]):
+ print(f"\n YELLOW ({len(buckets['yellow'])} repos) — minor cleanup")
+ for r in sorted(buckets["yellow"], key=lambda x: x["score"]):
print(f" {r['name']:30s} {r['score']:5.1f} {r['findings']:>5d} findings {r['loc']:>8,} LOC")
print(f" ACTION: ruff check --fix {r['name']}/")
- print(f"\n RED ({len(red)} repos) — needs remediation or archival decision")
- for r in sorted(red, key=lambda x: x["score"]):
+ print(f"\n RED ({len(buckets['red'])} repos) — needs remediation or archival decision")
+ for r in sorted(buckets["red"], key=lambda x: x["score"]):
print(f" {r['name']:30s} {r['score']:5.1f} {r['findings']:>5d} findings {r['loc']:>8,} LOC")
- if r["findings"] > 500:
- print(f" ACTION: ruff check --fix + manual review (high finding count)")
- else:
- print(f" ACTION: ruff check --fix, then re-score")
+ threshold = "ruff check --fix + manual review (high finding count)" if r["findings"] > 500 else "ruff check --fix, then re-score"
+ print(f" ACTION: {threshold}")
- print(f"\n ARCHIVE CANDIDATES ({len(archive)} repos) — 0 LOC, no Python code")
- for r in sorted(archive, key=lambda x: x["name"]):
+ print(f"\n ARCHIVE CANDIDATES ({len(buckets['archive'])} repos) — 0 LOC, no Python code")
+ for r in sorted(buckets["archive"], key=lambda x: x["name"]):
print(f" {r['name']}")
- print(f" ACTION: review for archival → gh repo archive hummbl-dev/")
+ print(" ACTION: review for archival → gh repo archive hummbl-dev/")
+ total = {k: len(v) for k, v in buckets.items()}
print(f"\n{'='*70}")
- print(f" Summary: {len(green)} green | {len(yellow)} yellow | {len(red)} red | {len(archive)} archive candidates")
+ print(f" Summary: {total['green']} green | {total['yellow']} yellow | {total['red']} red | {total['archive']} archive candidates")
print(f"{'='*70}")
+def cmd_triage(args: argparse.Namespace) -> None:
+ """Auto-classify repos and output actionable recommendations."""
+ db_path = Path(args.db) if args.db else Path("arbiter_fleet.db")
+ store = Store(db_path)
+ report = store.get_fleet_report()
+ if not report:
+ print("No fleet data. Run 'arbiter audit-fleet ' first.")
+ return
+
+ buckets: dict[str, list[dict]] = {"green": [], "yellow": [], "red": [], "archive": []}
+ for r in report:
+ category, entry = _classify_repo(r)
+ buckets[category].append(entry)
+
+ _print_triage(buckets)
+
+
+def _run_ruff_fix(repo_path: Path, dry_run: bool) -> str:
+ """Run ruff --fix (or --diff for dry run) and return output."""
+ import subprocess
+ flags = ["--fix", "--diff"] if dry_run else ["--fix", "--unsafe-fixes"]
+ result = subprocess.run(
+ ["ruff", "check", *flags, str(repo_path)],
+ capture_output=True, text=True, timeout=120,
+ )
+ return result.stdout.strip()
+
+
+def _print_fix_delta(score_before, score_after, findings_after: list[Finding],
+ repo_path: Path, no_commit: bool) -> None:
+ """Print before/after comparison for ruff fix."""
+ delta = score_after.overall - score_before.overall
+ print(f"AFTER: {score_after.overall} ({score_after.grade}) | {score_after.total_findings} findings")
+ print(f"DELTA: {'+' if delta >= 0 else ''}{delta:.1f} points | "
+ f"{score_before.total_findings - score_after.total_findings} findings fixed")
+
+ if score_after.total_findings > 0:
+ print(f"\nRemaining findings ({score_after.total_findings}):")
+ for f in findings_after[:10]:
+ print(f" {f.file_path}:{f.line} [{f.rule_id}] {f.message[:60]}")
+ if len(findings_after) > 10:
+ print(f" ... and {len(findings_after) - 10} more")
+
+ if not no_commit and delta > 0:
+ print(f"\nTo commit: cd {repo_path} && git add -A && git commit -m 'fix: auto-remediate ruff findings (Arbiter)'")
+
+
def cmd_fix(args: argparse.Namespace) -> None:
"""Run ruff --fix on a repo and show before/after score."""
repo_path = Path(args.repo).resolve()
@@ -365,60 +416,32 @@ def cmd_fix(args: argparse.Namespace) -> None:
print(f"Error: {repo_path} is not a git repository", file=sys.stderr)
sys.exit(1)
- analyzers = _get_analyzers()
- ruff_only = [a for a in analyzers if a.name == "ruff"]
+ ruff_only = [a for a in _get_analyzers() if a.name == "ruff"]
if not ruff_only:
print("Error: ruff not available", file=sys.stderr)
sys.exit(1)
- # Before score
findings_before = _run_analysis(repo_path, ruff_only)
loc = count_loc(repo_path)
score_before = score_findings(findings_before, loc)
-
print(f"\nBEFORE: {score_before.overall} ({score_before.grade}) | {score_before.total_findings} findings")
if args.dry_run:
- # Show what would be fixed
- import subprocess
- result = subprocess.run(
- ["ruff", "check", "--fix", "--diff", str(repo_path)],
- capture_output=True, text=True, timeout=120,
- )
- if result.stdout:
- lines = result.stdout.strip().split("\n")
- print(f"\nWould fix {len([l for l in lines if l.startswith('---')])} files")
- print("(use without --dry-run to apply)")
+ output = _run_ruff_fix(repo_path, dry_run=True)
+ if output:
+ file_count = sum(1 for line in output.split("\n") if line.startswith("---"))
+ print(f"\nWould fix {file_count} files\n(use without --dry-run to apply)")
else:
print("\nNothing to fix automatically.")
return
- # Apply fixes
- import subprocess
- result = subprocess.run(
- ["ruff", "check", "--fix", "--unsafe-fixes", str(repo_path)],
- capture_output=True, text=True, timeout=120,
- )
- print(f"\nruff --fix output: {result.stdout.strip()}" if result.stdout.strip() else "")
+ output = _run_ruff_fix(repo_path, dry_run=False)
+ if output:
+ print(f"\nruff --fix output: {output}")
- # After score
findings_after = _run_analysis(repo_path, ruff_only)
score_after = score_findings(findings_after, loc)
-
- delta = score_after.overall - score_before.overall
- print(f"AFTER: {score_after.overall} ({score_after.grade}) | {score_after.total_findings} findings")
- print(f"DELTA: {'+' if delta >= 0 else ''}{delta:.1f} points | "
- f"{score_before.total_findings - score_after.total_findings} findings fixed")
-
- if score_after.total_findings > 0:
- print(f"\nRemaining findings ({score_after.total_findings}):")
- for f in findings_after[:10]:
- print(f" {f.file_path}:{f.line} [{f.rule_id}] {f.message[:60]}")
- if len(findings_after) > 10:
- print(f" ... and {len(findings_after) - 10} more")
-
- if not args.no_commit and delta > 0:
- print(f"\nTo commit: cd {repo_path} && git add -A && git commit -m 'fix: auto-remediate ruff findings (Arbiter)'")
+ _print_fix_delta(score_before, score_after, findings_after, repo_path, args.no_commit)
def cmd_diff(args: argparse.Namespace) -> None:
diff --git a/src/arbiter/analyzers/dead_code_analyzer.py b/src/arbiter/analyzers/dead_code_analyzer.py
index afec04d..a142cc8 100644
--- a/src/arbiter/analyzers/dead_code_analyzer.py
+++ b/src/arbiter/analyzers/dead_code_analyzer.py
@@ -24,10 +24,11 @@ def is_available(self) -> bool:
return False
def analyze_repo(self, repo_path: Path, exclude_paths: list[str] | None = None) -> list[Finding]:
- result = subprocess.run(
- ["vulture", str(repo_path), "--min-confidence", "80"],
- capture_output=True, text=True, timeout=120,
- )
+ cmd = ["vulture", str(repo_path), "--min-confidence", "80"]
+ whitelist = repo_path / "vulture_whitelist.py"
+ if whitelist.exists():
+ cmd.append(str(whitelist))
+ result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if not result.stdout.strip():
return []
diff --git a/vulture_whitelist.py b/vulture_whitelist.py
new file mode 100644
index 0000000..0df010c
--- /dev/null
+++ b/vulture_whitelist.py
@@ -0,0 +1,20 @@
+"""Vulture whitelist — items flagged as unused but used by external callers or frameworks."""
+
+# HTTP handler overrides (called by stdlib HTTPServer)
+do_GET # noqa
+log_message # noqa
+
+# Public API methods (called by CLI and external consumers)
+add_agent # noqa
+get_profile # noqa
+all_agents # noqa
+from_json # noqa
+post_quality_milestone # noqa
+post_quality_alert # noqa
+
+# Dataclass/config fields (used by callers)
+trust_tier # noqa
+quality_threshold # noqa
+row_factory # noqa
+DEFAULT_EXCLUDE_PATHS # noqa
+func_name # noqa