Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 149 additions & 126 deletions src/arbiter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,56 +71,42 @@ def _find_git_root(path: Path) -> Path | None:
return None


def cmd_analyze(args: argparse.Namespace) -> None:
"""Full analysis: run analyzers, score, persist to store."""
repo_path = Path(args.repo).resolve()
git_root = _find_git_root(repo_path)
if not git_root:
print(f"Warning: {repo_path} is not inside a git repository. Skipping commit analysis.", file=sys.stderr)

db_path = Path(args.db) if args.db else Path("arbiter_data.db")
store = Store(db_path)
registry = AgentRegistry()

exclude_paths = [p.strip() for p in args.exclude.split(",") if p.strip()] if args.exclude else None
if exclude_paths:
print(f"Analyzing {repo_path} (excluding: {', '.join(exclude_paths)})...", file=sys.stderr)
else:
print(f"Analyzing {repo_path}...", file=sys.stderr)

# Run analyzers
analyzers = _get_analyzers()
findings = _run_analysis(repo_path, analyzers, exclude_paths=exclude_paths)
loc = count_loc(repo_path)

repo_name = repo_path.name
def _parse_exclude(args: argparse.Namespace) -> list[str] | None:
"""Parse --exclude into a list of paths."""
if not args.exclude:
return None
return [p.strip() for p in args.exclude.split(",") if p.strip()]

# Score
score = score_findings(findings, loc)
store.record_snapshot(score, loc, repo_name=repo_name)

# Update file-level quality
def _persist_file_quality(findings: list[Finding], store: Store, repo_name: str) -> None:
"""Record per-file quality metrics from findings."""
sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
file_findings: dict[str, list[Finding]] = {}
for f in findings:
file_findings.setdefault(f.file_path, []).append(f)
for fp, ff in file_findings.items():
worst = max((f.severity for f in ff), key=lambda s: {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}.get(s, 0))
worst = max((f.severity for f in ff), key=lambda s: sev_rank.get(s, 0))
store.update_file_quality(fp, len(ff), worst, "analysis", repo_name=repo_name)

# Walk recent commits and score per-agent (requires git root)
commits = []
if git_root:
commits = walk_commits(git_root, max_count=args.commits, registry=registry)
for commit in commits:
# Per-commit scoring: score only the files this commit touched
commit_score = score_commit(git_root, commit, analyzers, exclude_paths=exclude_paths)
store.record_commit(
commit.hash, commit.timestamp, commit.agent,
commit.files_changed, commit.loc_added, commit.loc_removed, commit_score,
repo_name=repo_name,
)

# Print summary
def _score_commits(git_root: Path, store: Store, analyzers: list[Analyzer],
max_count: int, exclude_paths: list[str] | None,
repo_name: str) -> list:
"""Walk recent commits, score each, and persist to store."""
registry = AgentRegistry()
commits = walk_commits(git_root, max_count=max_count, registry=registry)
for commit in commits:
commit_score = score_commit(git_root, commit, analyzers, exclude_paths=exclude_paths)
store.record_commit(
commit.hash, commit.timestamp, commit.agent,
commit.files_changed, commit.loc_added, commit.loc_removed, commit_score,
repo_name=repo_name,
)
return commits


def _print_analyze_summary(score, loc: int, commits: list, db_path: Path, store: Store) -> None:
"""Print full analysis summary with leaderboard."""
print(f"\n{'='*60}")
print(f" Arbiter Score: {score.overall} ({score.grade})")
print(f"{'='*60}")
Expand All @@ -132,7 +118,6 @@ def cmd_analyze(args: argparse.Namespace) -> None:
print(f" Commits: {len(commits)} analyzed")
print(f" Tools: {', '.join(score.findings_by_tool.keys()) or 'none'}")

# Agent leaderboard
board = store.get_agent_leaderboard()
if board:
print("\n Agent Leaderboard:")
Expand All @@ -142,6 +127,36 @@ def cmd_analyze(args: argparse.Namespace) -> None:
print(f"\n Data stored in {db_path}")


def cmd_analyze(args: argparse.Namespace) -> None:
"""Full analysis: run analyzers, score, persist to store."""
repo_path = Path(args.repo).resolve()
git_root = _find_git_root(repo_path)
if not git_root:
print(f"Warning: {repo_path} is not inside a git repository. Skipping commit analysis.", file=sys.stderr)

db_path = Path(args.db) if args.db else Path("arbiter_data.db")
store = Store(db_path)
exclude_paths = _parse_exclude(args)

label = f" (excluding: {', '.join(exclude_paths)})" if exclude_paths else ""
print(f"Analyzing {repo_path}{label}...", file=sys.stderr)

analyzers = _get_analyzers()
findings = _run_analysis(repo_path, analyzers, exclude_paths=exclude_paths)
loc = count_loc(repo_path)
repo_name = repo_path.name

score = score_findings(findings, loc)
store.record_snapshot(score, loc, repo_name=repo_name)
_persist_file_quality(findings, store, repo_name)

commits = []
if git_root:
commits = _score_commits(git_root, store, analyzers, args.commits, exclude_paths, repo_name)

_print_analyze_summary(score, loc, commits, db_path, store)


def cmd_score(args: argparse.Namespace) -> None:
"""Quick score without persistence."""
repo_path = Path(args.repo).resolve()
Expand Down Expand Up @@ -274,10 +289,14 @@ def cmd_fleet_report(args: argparse.Namespace) -> None:
def _print_fleet_report(report: list[dict]) -> None:
"""Format and print the fleet report."""
def _grade(score: float) -> str:
if score >= 90: return "A"
if score >= 80: return "B"
if score >= 70: return "C"
if score >= 60: return "D"
if score >= 90:
return "A"
if score >= 80:
return "B"
if score >= 70:
return "C"
if score >= 60:
return "D"
return "F"

print(f"\n{'Repo':30s} {'Score':>6s} {'Grade':>6s} {'Findings':>9s} {'LOC':>8s}")
Expand All @@ -296,129 +315,133 @@ def _grade(score: float) -> str:
print(" | ".join(f"{g}:{c}" for g, c in sorted(grades.items()) if c > 0))


def cmd_triage(args: argparse.Namespace) -> None:
"""Auto-classify repos and output actionable recommendations."""
db_path = Path(args.db) if args.db else Path("arbiter_fleet.db")
store = Store(db_path)
report = store.get_fleet_report()
if not report:
print("No fleet data. Run 'arbiter audit-fleet <dir>' first.")
return

green, yellow, red, archive = [], [], [], []
def _classify_repo(r: dict) -> tuple[str, dict]:
"""Classify a single repo into green/yellow/red/archive."""
score = r.get("overall_score", 0) or 0
loc = r.get("total_loc", 0) or 0
findings = r.get("total_findings", 0) or 0
entry = {"name": r.get("repo_name", "?"), "score": score, "loc": loc, "findings": findings}
if loc == 0 and findings == 0:
return "archive", entry
if score >= 80:
return "green", entry
if score >= 60 and findings <= 100:
return "yellow", entry
return "red", entry

for r in report:
score = r.get("overall_score", 0) or 0
loc = r.get("total_loc", 0) or 0
findings = r.get("total_findings", 0) or 0
name = r.get("repo_name", "?")

entry = {"name": name, "score": score, "loc": loc, "findings": findings}

if loc == 0 and findings == 0:
archive.append(entry)
elif score >= 80:
green.append(entry)
elif score >= 60:
if findings > 100:
red.append(entry)
else:
yellow.append(entry)
else:
red.append(entry)

def _print_triage(buckets: dict[str, list[dict]]) -> None:
"""Print triage report from classified buckets."""
print("=" * 70)
print(" ARBITER FLEET TRIAGE")
print("=" * 70)

print(f"\n GREEN ({len(green)} repos) — no action needed")
for r in sorted(green, key=lambda x: -x["score"]):
print(f"\n GREEN ({len(buckets['green'])} repos) — no action needed")
for r in sorted(buckets["green"], key=lambda x: -x["score"]):
print(f" {r['name']:30s} {r['score']:5.1f} {r['loc']:>8,} LOC")

print(f"\n YELLOW ({len(yellow)} repos) — minor cleanup")
for r in sorted(yellow, key=lambda x: x["score"]):
print(f"\n YELLOW ({len(buckets['yellow'])} repos) — minor cleanup")
for r in sorted(buckets["yellow"], key=lambda x: x["score"]):
print(f" {r['name']:30s} {r['score']:5.1f} {r['findings']:>5d} findings {r['loc']:>8,} LOC")
print(f" ACTION: ruff check --fix {r['name']}/")

print(f"\n RED ({len(red)} repos) — needs remediation or archival decision")
for r in sorted(red, key=lambda x: x["score"]):
print(f"\n RED ({len(buckets['red'])} repos) — needs remediation or archival decision")
for r in sorted(buckets["red"], key=lambda x: x["score"]):
print(f" {r['name']:30s} {r['score']:5.1f} {r['findings']:>5d} findings {r['loc']:>8,} LOC")
if r["findings"] > 500:
print(f" ACTION: ruff check --fix + manual review (high finding count)")
else:
print(f" ACTION: ruff check --fix, then re-score")
threshold = "ruff check --fix + manual review (high finding count)" if r["findings"] > 500 else "ruff check --fix, then re-score"
print(f" ACTION: {threshold}")

print(f"\n ARCHIVE CANDIDATES ({len(archive)} repos) — 0 LOC, no Python code")
for r in sorted(archive, key=lambda x: x["name"]):
print(f"\n ARCHIVE CANDIDATES ({len(buckets['archive'])} repos) — 0 LOC, no Python code")
for r in sorted(buckets["archive"], key=lambda x: x["name"]):
print(f" {r['name']}")
print(f" ACTION: review for archival → gh repo archive hummbl-dev/<name>")
print(" ACTION: review for archival → gh repo archive hummbl-dev/<name>")

total = {k: len(v) for k, v in buckets.items()}
print(f"\n{'='*70}")
print(f" Summary: {len(green)} green | {len(yellow)} yellow | {len(red)} red | {len(archive)} archive candidates")
print(f" Summary: {total['green']} green | {total['yellow']} yellow | {total['red']} red | {total['archive']} archive candidates")
print(f"{'='*70}")


def cmd_triage(args: argparse.Namespace) -> None:
"""Auto-classify repos and output actionable recommendations."""
db_path = Path(args.db) if args.db else Path("arbiter_fleet.db")
store = Store(db_path)
report = store.get_fleet_report()
if not report:
print("No fleet data. Run 'arbiter audit-fleet <dir>' first.")
return

buckets: dict[str, list[dict]] = {"green": [], "yellow": [], "red": [], "archive": []}
for r in report:
category, entry = _classify_repo(r)
buckets[category].append(entry)

_print_triage(buckets)


def _run_ruff_fix(repo_path: Path, dry_run: bool) -> str:
"""Run ruff --fix (or --diff for dry run) and return output."""
import subprocess
flags = ["--fix", "--diff"] if dry_run else ["--fix", "--unsafe-fixes"]
result = subprocess.run(
["ruff", "check", *flags, str(repo_path)],
capture_output=True, text=True, timeout=120,
)
return result.stdout.strip()


def _print_fix_delta(score_before, score_after, findings_after: list[Finding],
repo_path: Path, no_commit: bool) -> None:
"""Print before/after comparison for ruff fix."""
delta = score_after.overall - score_before.overall
print(f"AFTER: {score_after.overall} ({score_after.grade}) | {score_after.total_findings} findings")
print(f"DELTA: {'+' if delta >= 0 else ''}{delta:.1f} points | "
f"{score_before.total_findings - score_after.total_findings} findings fixed")

if score_after.total_findings > 0:
print(f"\nRemaining findings ({score_after.total_findings}):")
for f in findings_after[:10]:
print(f" {f.file_path}:{f.line} [{f.rule_id}] {f.message[:60]}")
if len(findings_after) > 10:
print(f" ... and {len(findings_after) - 10} more")

if not no_commit and delta > 0:
print(f"\nTo commit: cd {repo_path} && git add -A && git commit -m 'fix: auto-remediate ruff findings (Arbiter)'")


def cmd_fix(args: argparse.Namespace) -> None:
"""Run ruff --fix on a repo and show before/after score."""
repo_path = Path(args.repo).resolve()
if not (repo_path / ".git").exists():
print(f"Error: {repo_path} is not a git repository", file=sys.stderr)
sys.exit(1)

analyzers = _get_analyzers()
ruff_only = [a for a in analyzers if a.name == "ruff"]
ruff_only = [a for a in _get_analyzers() if a.name == "ruff"]
if not ruff_only:
print("Error: ruff not available", file=sys.stderr)
sys.exit(1)

# Before score
findings_before = _run_analysis(repo_path, ruff_only)
loc = count_loc(repo_path)
score_before = score_findings(findings_before, loc)

print(f"\nBEFORE: {score_before.overall} ({score_before.grade}) | {score_before.total_findings} findings")

if args.dry_run:
# Show what would be fixed
import subprocess
result = subprocess.run(
["ruff", "check", "--fix", "--diff", str(repo_path)],
capture_output=True, text=True, timeout=120,
)
if result.stdout:
lines = result.stdout.strip().split("\n")
print(f"\nWould fix {len([l for l in lines if l.startswith('---')])} files")
print("(use without --dry-run to apply)")
output = _run_ruff_fix(repo_path, dry_run=True)
if output:
file_count = sum(1 for line in output.split("\n") if line.startswith("---"))
print(f"\nWould fix {file_count} files\n(use without --dry-run to apply)")
else:
print("\nNothing to fix automatically.")
return

# Apply fixes
import subprocess
result = subprocess.run(
["ruff", "check", "--fix", "--unsafe-fixes", str(repo_path)],
capture_output=True, text=True, timeout=120,
)
print(f"\nruff --fix output: {result.stdout.strip()}" if result.stdout.strip() else "")
output = _run_ruff_fix(repo_path, dry_run=False)
if output:
print(f"\nruff --fix output: {output}")

# After score
findings_after = _run_analysis(repo_path, ruff_only)
score_after = score_findings(findings_after, loc)

delta = score_after.overall - score_before.overall
print(f"AFTER: {score_after.overall} ({score_after.grade}) | {score_after.total_findings} findings")
print(f"DELTA: {'+' if delta >= 0 else ''}{delta:.1f} points | "
f"{score_before.total_findings - score_after.total_findings} findings fixed")

if score_after.total_findings > 0:
print(f"\nRemaining findings ({score_after.total_findings}):")
for f in findings_after[:10]:
print(f" {f.file_path}:{f.line} [{f.rule_id}] {f.message[:60]}")
if len(findings_after) > 10:
print(f" ... and {len(findings_after) - 10} more")

if not args.no_commit and delta > 0:
print(f"\nTo commit: cd {repo_path} && git add -A && git commit -m 'fix: auto-remediate ruff findings (Arbiter)'")
_print_fix_delta(score_before, score_after, findings_after, repo_path, args.no_commit)


def cmd_diff(args: argparse.Namespace) -> None:
Expand Down
9 changes: 5 additions & 4 deletions src/arbiter/analyzers/dead_code_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ def is_available(self) -> bool:
return False

def analyze_repo(self, repo_path: Path, exclude_paths: list[str] | None = None) -> list[Finding]:
result = subprocess.run(
["vulture", str(repo_path), "--min-confidence", "80"],
capture_output=True, text=True, timeout=120,
)
cmd = ["vulture", str(repo_path), "--min-confidence", "80"]
whitelist = repo_path / "vulture_whitelist.py"
if whitelist.exists():
cmd.append(str(whitelist))
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if not result.stdout.strip():
return []

Expand Down
20 changes: 20 additions & 0 deletions vulture_whitelist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Vulture whitelist — items flagged as unused but used by external callers or frameworks."""

# HTTP handler overrides (called by stdlib HTTPServer)
do_GET # noqa
log_message # noqa

# Public API methods (called by CLI and external consumers)
add_agent # noqa
get_profile # noqa
all_agents # noqa
from_json # noqa
post_quality_milestone # noqa
post_quality_alert # noqa

# Dataclass/config fields (used by callers)
trust_tier # noqa
quality_threshold # noqa
row_factory # noqa
DEFAULT_EXCLUDE_PATHS # noqa
func_name # noqa
Loading