diff --git a/refactron/cli/analysis.py b/refactron/cli/analysis.py index 95877f5..f828a91 100644 --- a/refactron/cli/analysis.py +++ b/refactron/cli/analysis.py @@ -97,6 +97,13 @@ default=False, help="Disable interactive mode — dump all issues (for CI/CD or piped output)", ) +@click.option( + "--fix-on", + "fix_on", + type=click.Choice(["CRITICAL", "ERROR", "WARNING", "INFO"], case_sensitive=False), + default=None, + help="Auto-queue issues at this level and above for fixing after analysis.", +) @click.option( "--format", "output_format", @@ -123,6 +130,7 @@ def analyze( environment: Optional[str], no_cache: bool, no_interactive: bool, + fix_on: Optional[str] = None, output_format: str = "text", fail_on: Optional[str] = None, ) -> None: @@ -180,8 +188,10 @@ def analyze( cfg.log_format = log_format if metrics is not None: cfg.enable_metrics = metrics - if no_cache: - cfg.enable_incremental_analysis = False + # Always disable incremental analysis in CLI — users expect `analyze` to + # always return all issues, not skip unchanged files silently. + # (Incremental filtering is an optimization for the programmatic API only.) + cfg.enable_incremental_analysis = False if output_format != "json": _print_file_count(target_path) @@ -258,6 +268,54 @@ def analyze( ) console.print(f" Success rate: {metrics_summary.get('success_rate_percent', 0):.1f}%") + # Exit with error code if critical issues found + should_fail = summary["critical"] > 0 + + # ── Pipeline session ────────────────────────────────────────────── + from datetime import datetime, timezone + + from refactron.core.pipeline import RefactronPipeline + from refactron.core.pipeline_session import PipelineSession, SessionStore + + _FIX_LEVEL_MAP = { + "CRITICAL": IssueLevel.CRITICAL, + "ERROR": IssueLevel.ERROR, + "WARNING": IssueLevel.WARNING, + "INFO": IssueLevel.INFO, + } + + _target_path = Path(target) if target else Path.cwd() + _project_root = _target_path if _target_path.is_dir() else _target_path.parent + _pipeline = RefactronPipeline(project_root=_project_root) + + _session_id = SessionStore.make_session_id() + _pipeline_session = PipelineSession( + session_id=_session_id, + target=str(_target_path), + created_at=datetime.now(timezone.utc).isoformat(), + total_files=summary.get("total_files", 0), + total_issues=summary.get("total_issues", 0), + issues_by_level={ + "CRITICAL": summary.get("critical", 0), + "ERROR": summary.get("errors", 0), + "WARNING": summary.get("warnings", 0), + "INFO": summary.get("info", 0), + }, + ) + + # Always queue all issues so `autofix` has a full picture. + # `autofix --fix-on` controls which level actually gets applied. + _all_issues = [i for fm in result.file_metrics for i in fm.issues] + _pipeline.queue_issues(_pipeline_session, _all_issues) + + _pipeline.store.save(_pipeline_session) + _pipeline.store.set_current(_session_id) + + _fixable = len([i for i in _pipeline_session.fix_queue if i.status.value == "pending"]) + console.print(f"\n[dim]Session: {_session_id}[/dim]") + if _fixable: + console.print(f"[dim]{_fixable} fixable issues queued → refactron autofix --dry-run[/dim]") + # Exit with error code: --fail-on sets threshold, default is CRITICAL _LEVEL_RANK = {"INFO": 0, "WARNING": 1, "ERROR": 2, "CRITICAL": 3} _SUMMARY_KEY = { diff --git a/refactron/cli/main.py b/refactron/cli/main.py index 93a777a..53c7018 100644 --- a/refactron/cli/main.py +++ b/refactron/cli/main.py @@ -153,3 +153,17 @@ def main(ctx: click.Context) -> None: main.add_command(init) except ImportError: pass + +try: + from refactron.cli.status import status + + main.add_command(status) +except ImportError: + pass + +try: + from refactron.cli.run import run + + main.add_command(run) +except ImportError: + pass diff --git a/refactron/cli/refactor.py b/refactron/cli/refactor.py index 5fd7c83..bb20780 100644 --- a/refactron/cli/refactor.py +++ b/refactron/cli/refactor.py @@ -209,7 +209,7 @@ def refactor( @click.command() -@click.argument("target", type=click.Path(exists=True)) +@click.argument("target", type=click.Path(exists=True), required=False) @click.option( "--config", "-c", @@ -261,8 +261,25 @@ def refactor( default=False, help="Run verification checks (syntax, imports, tests) before applying fixes", ) +@click.option( + "--session", + "session_id", + default=None, + help=( + "Override the active workspace session. If omitted, uses the " + "session set by the last 'refactron analyze' or 'refactron run'." + ), +) +@click.option( + "--fix-on", + "fix_on", + type=click.Choice(["CRITICAL", "ERROR", "WARNING", "INFO"], case_sensitive=False), + default="CRITICAL", + show_default=True, + help="Apply only issues at this severity level and above.", +) def autofix( - target: str, + target: Optional[str], config: Optional[str], profile: Optional[str], environment: Optional[str], @@ -270,20 +287,135 @@ def autofix( dry_run: bool, safety_level: str, verify: bool, + session_id: Optional[str] = None, + fix_on: str = "CRITICAL", ) -> None: - """ - Automatically fix code issues (Phase 3 feature). + """Apply fixes from the active pipeline session. - TARGET: Path to file or directory to fix + Automatically reads the current workspace session created by + 'refactron analyze' or 'refactron run' — no session ID needed. + Use --session to target a specific session instead. + \b + Typical workflow: + refactron analyze src/ --fix-on CRITICAL # creates session + refactron autofix --dry-run # preview (uses active session) + refactron autofix # apply fixes + refactron rollback # undo if needed + + \b Examples: - refactron autofix myfile.py --preview - refactron autofix myproject/ --apply --safety-level moderate + refactron autofix --dry-run + refactron autofix --session sess_20260404_120000 """ console.print() _auth_banner("Auto-fix") console.print() + # ── Session-aware pipeline ──────────────────────────────────────── + from refactron.core.pipeline import RefactronPipeline + + _target_path = Path(target) if target else Path.cwd() + _project_root = _target_path if _target_path.is_dir() else _target_path.parent + _pipeline = RefactronPipeline(project_root=_project_root) + + if session_id: + _pipeline_session = _pipeline.store.load(session_id) + if _pipeline_session is None: + console.print(f"[red]Session not found: {session_id}[/red]") + raise SystemExit(1) + else: + # Try workspace current session first (no --session flag needed) + _pipeline_session = _pipeline.store.load_current() + if _pipeline_session is None: + if not target: + console.print( + "[red]No active session. Run 'refactron analyze ' first.[/red]" + ) + raise SystemExit(1) + console.print("[dim]No session — running fresh analysis...[/dim]") + _pipeline_session = _pipeline.analyze(_target_path) + _pipeline.store.set_current(_pipeline_session.session_id) + if _pipeline._last_result: + _all_issues = [i for fm in _pipeline._last_result.file_metrics for i in fm.issues] + _pipeline.queue_issues(_pipeline_session, _all_issues) + + _total_issues = _pipeline_session.total_issues + _fixable = len([i for i in _pipeline_session.fix_queue if i.status.value == "pending"]) + _no_fixer = len([i for i in _pipeline_session.fix_queue if i.status.value == "skipped"]) + console.print( + f"[dim]Session {_pipeline_session.session_id} · " + f"{_total_issues} issues · {_fixable} have automated fixers · " + f"{_no_fixer} no fixer available[/dim]" + ) + + # Filter queue by --fix-on level: mark items below threshold as skipped + _LEVEL_RANK = {"INFO": 0, "WARNING": 1, "ERROR": 2, "CRITICAL": 3} + _threshold = _LEVEL_RANK.get(fix_on.upper(), 3) + from refactron.core.pipeline_session import FixStatus as _FixStatus + + for _item in _pipeline_session.fix_queue: + if _item.status == _FixStatus.PENDING: + if _LEVEL_RANK.get(_item.level.upper(), 0) < _threshold: + _item.status = _FixStatus.SKIPPED + + _pending_count = len([i for i in _pipeline_session.fix_queue if i.status == _FixStatus.PENDING]) + + if _pending_count == 0: + if _no_fixer > 0: + console.print( + f"[yellow]{_no_fixer} issues found but none have automated fixers.[/yellow]\n" + f"[dim]Refactron auto-fixers cover: unused imports, magic numbers, " + f"docstrings, dead code, type hints, sorting, whitespace, quotes, " + f"booleans, f-strings, unused variables, indentation.[/dim]\n" + f"[dim]The issues in this session (complexity, code smell) " + f"require manual refactoring.[/dim]" + ) + else: + console.print( + f"[yellow]No fixable issues at {fix_on.upper()} level or above.[/yellow]\n" + f"[dim]Try: refactron autofix --fix-on WARNING[/dim]" + ) + return + + _pipeline.apply( + _pipeline_session, + dry_run=dry_run, + verify=verify, + ) + + _applied = len(_pipeline_session.applied_fixes) + _blocked = len(_pipeline_session.blocked_fixes) + _skipped = len([i for i in _pipeline_session.fix_queue if i.status.value == "skipped"]) + + if dry_run: + _diff_items = [i for i in _pipeline_session.fix_queue if i.diff] + if not _diff_items: + console.print( + "\n[dim]Dry-run: no diffs generated " + "(fixers may not support these issue types)[/dim]" + ) + else: + console.print(f"\n[bold]Dry-run preview ({len(_diff_items)} changes)[/bold]") + for _item in _diff_items: + console.print( + f"\n [cyan]{_item.file_path}:{_item.line_number}[/cyan] {_item.message}" + ) + console.print(_item.diff) + else: + console.print(f"\n[bold green]Applied:[/bold green] {_applied}") + if _blocked: + console.print(f"[bold red]Blocked:[/bold red] {_blocked}") + if _skipped: + console.print(f"[dim]Skipped: {_skipped}[/dim]") + console.print(f"\n[dim]Session: {_pipeline_session.session_id}[/dim]") + if _applied > 0: + console.print( + f"[dim]To undo: refactron rollback --session " + f"{_pipeline_session.session_id}[/dim]" + ) + return + # Setup target_path = _validate_path(target) _load_config(config, profile, environment) @@ -359,12 +491,23 @@ def autofix( default=False, help="Clear all backup sessions", ) +@click.option( + "--pipeline-session", + "pipeline_session_id", + default=None, + help=( + "Override the active workspace session to roll back. " + "If omitted, rolls back the current session automatically. " + "Use 'refactron status --list' to see all session IDs." + ), +) def rollback( session_id: Optional[str], session: Optional[str], use_git: bool, list_sessions: bool, clear: bool, + pipeline_session_id: Optional[str] = None, ) -> None: """ Rollback refactoring changes to restore original files. @@ -381,6 +524,40 @@ def rollback( refactron rollback --use-git # Use Git rollback refactron rollback --clear # Clear all backups """ + from refactron.core.backup import BackupManager + from refactron.core.pipeline_session import SessionState, SessionStore + + _store = SessionStore(root_dir=Path.cwd()) + + # Use explicit ID, else fall back to active workspace session + _resolved_id = pipeline_session_id or _store.get_current_id() + + if _resolved_id: + _pipeline_session = _store.load(_resolved_id) + if _pipeline_session is None: + console.print(f"[red]Session not found: {_resolved_id}[/red]") + raise SystemExit(1) + if not _pipeline_session.applied_fixes: + console.print("[yellow]No applied fixes in this session to roll back.[/yellow]") + return + if not _pipeline_session.backup_session_id: + console.print("[red]Session has no backup ID — cannot roll back.[/red]") + raise SystemExit(1) + + _bm = BackupManager(root_dir=Path.cwd()) + _restored_count, _failed = _bm.rollback_session(_pipeline_session.backup_session_id) + + _pipeline_session.state = SessionState.ROLLED_BACK + _store.save(_pipeline_session) + _store.clear_current() + + console.print( + f"[green]Rolled back {_restored_count} file(s) from session " f"{_resolved_id}[/green]" + ) + for _f in _failed: + console.print(f"[red] Failed to restore: {_f}[/red]") + return + # Support both argument and option for session target_session = session_id or session console.print("\n🔄 [bold blue]Refactron Rollback[/bold blue]\n") diff --git a/refactron/cli/run.py b/refactron/cli/run.py new file mode 100644 index 0000000..9da3204 --- /dev/null +++ b/refactron/cli/run.py @@ -0,0 +1,138 @@ +"""refactron run — full one-shot pipeline: analyze → queue → verify → apply.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +import click +from rich.console import Console + +from refactron.core.models import IssueLevel +from refactron.core.pipeline import RefactronPipeline + +console = Console() + +_LEVEL_MAP = { + "CRITICAL": IssueLevel.CRITICAL, + "ERROR": IssueLevel.ERROR, + "WARNING": IssueLevel.WARNING, + "INFO": IssueLevel.INFO, +} +_LEVEL_RANK = {"INFO": 0, "WARNING": 1, "ERROR": 2, "CRITICAL": 3} + + +@click.command() +@click.argument("target", type=click.Path(exists=True)) +@click.option( + "--fix-on", + "fix_on", + type=click.Choice(["CRITICAL", "ERROR", "WARNING", "INFO"], case_sensitive=False), + default="CRITICAL", + show_default=True, + help="Queue and fix issues at this level and above", +) +@click.option( + "--dry-run", + is_flag=True, + default=False, + help="Show what would be fixed without writing any files", +) +@click.option( + "--no-verify", + "skip_verify", + is_flag=True, + default=False, + help="Skip VerificationEngine checks (not recommended)", +) +@click.option( + "--fail-on", + "fail_on", + type=click.Choice(["CRITICAL", "ERROR", "WARNING", "INFO"], case_sensitive=False), + default=None, + help="Exit 1 if issues at this level or above remain after fixing", +) +def run( + target: str, + fix_on: str, + dry_run: bool, + skip_verify: bool, + fail_on: Optional[str], +) -> None: + """Run the full Refactron pipeline in one shot. + + Analyzes TARGET, queues issues at --fix-on level, verifies each fix + (syntax + imports + tests), applies safe fixes, and saves a session + for rollback. All subsequent commands (autofix, status, rollback) + automatically use the active session — no need to pass session IDs. + + \b + Pipeline steps: + 1. Analyze + 2. Queue issues at --fix-on level and above + 3. Verify each fix (syntax + imports + test suite) + 4. Apply safe fixes, block unsafe ones + 5. Save session → .refactron/current + + \b + Examples: + refactron run src/ --fix-on CRITICAL --dry-run + refactron run src/ --fix-on WARNING --fail-on ERROR + refactron run src/ --no-verify # skip verification + """ + target_path = Path(target) + pipeline = RefactronPipeline( + project_root=target_path if target_path.is_dir() else target_path.parent + ) + + # Step 1: Analyze + console.print(f"[bold]Analyzing[/bold] {target_path}...") + session = pipeline.analyze(target_path) + pipeline.store.set_current(session.session_id) + console.print( + f" {session.total_files} files · {session.total_issues} issues " + f"({session.issues_by_level.get('CRITICAL', 0)} critical, " + f"{session.issues_by_level.get('WARNING', 0)} warnings)" + ) + + # Step 2: Queue + if pipeline._last_result: + all_issues = [i for fm in pipeline._last_result.file_metrics for i in fm.issues] + pipeline.queue_issues(session, all_issues, min_level=_LEVEL_MAP[fix_on.upper()]) + queued = len([i for i in session.fix_queue if i.status.value == "pending"]) + console.print(f" {queued} issues queued at {fix_on}+ level") + + if queued == 0: + console.print("[green]Nothing to fix.[/green]") + else: + # Step 3: Apply + action = "Previewing" if dry_run else "Applying" + console.print(f"\n[bold]{action} fixes...[/bold]") + pipeline.apply(session, dry_run=dry_run, verify=not skip_verify) + + applied = len(session.applied_fixes) + blocked = len(session.blocked_fixes) + + if dry_run: + console.print(f" [yellow]Dry run — {queued} fixes previewed, nothing written[/yellow]") + else: + console.print(f" [green]Applied: {applied}[/green]") + if blocked: + console.print(f" [red]Blocked: {blocked}[/red]") + + console.print(f"\n[dim]Session: {session.session_id}[/dim]") + if not dry_run and session.applied_fixes: + console.print( + f"[dim]To undo: refactron rollback --pipeline-session {session.session_id}[/dim]" + ) + + # --fail-on gate + if fail_on: + threshold = _LEVEL_RANK[fail_on.upper()] + should_fail = any( + session.issues_by_level.get(lvl, 0) > 0 + for lvl, rank in _LEVEL_RANK.items() + if rank >= threshold + ) + if should_fail: + raise SystemExit(1) diff --git a/refactron/cli/status.py b/refactron/cli/status.py new file mode 100644 index 0000000..4771a28 --- /dev/null +++ b/refactron/cli/status.py @@ -0,0 +1,112 @@ +"""refactron status — show pipeline session state.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +import click +from rich.console import Console +from rich.table import Table + +from refactron.core.pipeline_session import FixStatus, SessionStore + +console = Console() + + +@click.command() +@click.option("--session", "session_id", default=None, help="Session ID to inspect") +@click.option("--list", "list_sessions", is_flag=True, default=False, help="List all sessions") +@click.option( + "--project-root", + default=".", + help="Project root (where .refactron/ lives)", +) +def status(session_id: Optional[str], list_sessions: bool, project_root: str) -> None: + """Show the state of the active pipeline session. + + Automatically reads the current workspace session (set by 'analyze' or + 'run'). No session ID required. Use --session to inspect a specific one. + + \b + Examples: + refactron status # active session + refactron status --session sess_xyz # specific session + refactron status --list # all sessions + """ + store = SessionStore(root_dir=Path(project_root)) + + if list_sessions: + sessions = store.list_sessions() + if not sessions: + console.print("[dim]No sessions found.[/dim]") + return + table = Table(title="Pipeline Sessions") + table.add_column("Session ID", style="cyan") + table.add_column("Target") + table.add_column("State") + table.add_column("Files") + table.add_column("Issues") + table.add_column("Created") + for s in reversed(sessions): + table.add_row( + s.session_id, + s.target, + s.state.value, + str(s.total_files), + str(s.total_issues), + s.created_at[:19], + ) + console.print(table) + return + + if session_id: + session = store.load(session_id) + else: + # Load active workspace session, fall back to latest + session = store.load_current() or store.load_latest() + + if session is None: + console.print( + "[dim]No session found. Run [bold]refactron analyze [/bold] first.[/dim]" + ) + return + + console.print(f"\n[bold]Session:[/bold] {session.session_id}") + console.print(f"[bold]Target:[/bold] {session.target}") + console.print(f"[bold]State:[/bold] {session.state.value}") + console.print(f"[bold]Created:[/bold] {session.created_at[:19]}") + + console.print("\n[bold]Analysis[/bold]") + console.print(f" Files analyzed: {session.total_files}") + console.print(f" Total issues: {session.total_issues}") + _STYLES = {"CRITICAL": "bold red", "ERROR": "red", "WARNING": "yellow", "INFO": "cyan"} + for level in ("CRITICAL", "ERROR", "WARNING", "INFO"): + count = session.issues_by_level.get(level, 0) + if count: + style = _STYLES[level] + console.print(f" [{style}]{level}[/{style}]: {count}") + + pending = [i for i in session.fix_queue if i.status == FixStatus.PENDING] + applied = session.applied_fixes + blocked = session.blocked_fixes + skipped = [i for i in session.fix_queue if i.status == FixStatus.SKIPPED] + + if session.fix_queue or applied or blocked: + console.print("\n[bold]Fixes[/bold]") + if pending: + console.print(f" [yellow]Queued (not yet applied):[/yellow] {len(pending)}") + if applied: + console.print(f" [green]Applied:[/green] {len(applied)}") + if blocked: + console.print(f" [red]Blocked:[/red] {len(blocked)}") + for b in blocked[:3]: + console.print(f" • {b.file_path}:{b.line_number} — {b.block_reason}") + if skipped: + console.print(f" [dim]Skipped (no fixer): {len(skipped)}[/dim]") + + console.print("\n[bold]Next steps[/bold]") + if pending: + console.print(f" [dim]refactron autofix --session {session.session_id} --apply[/dim]") + if applied: + console.print(f" [dim]refactron rollback --session {session.session_id}[/dim] (to undo)") diff --git a/refactron/core/pipeline.py b/refactron/core/pipeline.py new file mode 100644 index 0000000..77abd0e --- /dev/null +++ b/refactron/core/pipeline.py @@ -0,0 +1,201 @@ +""" +RefactronPipeline — orchestrates the full analyze → fix → verify → write pipeline. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +from refactron.autofix.engine import AutoFixEngine +from refactron.autofix.models import FixRiskLevel +from refactron.core.backup import BackupManager +from refactron.core.config import RefactronConfig +from refactron.core.models import CodeIssue, IssueCategory, IssueLevel +from refactron.core.pipeline_session import ( + FixQueueItem, + FixStatus, + PipelineSession, + SessionState, + SessionStore, +) +from refactron.core.refactron import Refactron + +logger = logging.getLogger(__name__) + +_LEVEL_MAP: Dict[IssueLevel, str] = { + IssueLevel.CRITICAL: "CRITICAL", + IssueLevel.ERROR: "ERROR", + IssueLevel.WARNING: "WARNING", + IssueLevel.INFO: "INFO", +} +_LEVEL_REVERSE: Dict[str, IssueLevel] = {v: k for k, v in _LEVEL_MAP.items()} +_LEVEL_RANK: Dict[IssueLevel, int] = { + IssueLevel.INFO: 0, + IssueLevel.WARNING: 1, + IssueLevel.ERROR: 2, + IssueLevel.CRITICAL: 3, +} + + +class RefactronPipeline: + """Connects analysis → fix queue → verification → file write → session persistence.""" + + def __init__( + self, + project_root: Optional[Path] = None, + safety_level: FixRiskLevel = FixRiskLevel.SAFE, + ) -> None: + self.project_root = Path(project_root) if project_root else Path.cwd() + self.store = SessionStore(root_dir=self.project_root) + self.fix_engine = AutoFixEngine(safety_level=safety_level) + self.backup_manager = BackupManager(root_dir=self.project_root) + self._last_result: Any = None + + def analyze(self, target: Path) -> PipelineSession: + """Run analysis on target, create and save a PipelineSession.""" + cfg = RefactronConfig() + # Always disable incremental analysis — CLI users expect full results every run. + cfg.enable_incremental_analysis = False + r = Refactron(cfg) + result = r.analyze(target) + self._last_result = result + + session_id = SessionStore.make_session_id() + now = datetime.now(timezone.utc).isoformat() + + issues_by_level: Dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARNING": 0, "INFO": 0} + for fm in result.file_metrics: + for issue in fm.issues: + level_str = _LEVEL_MAP.get(issue.level, "INFO") + issues_by_level[level_str] = issues_by_level.get(level_str, 0) + 1 + + session = PipelineSession( + session_id=session_id, + target=str(target), + created_at=now, + state=SessionState.ANALYZED, + total_files=result.total_files, + total_issues=result.total_issues, + issues_by_level=issues_by_level, + ) + self.store.save(session) + logger.info("Pipeline session created: %s", session_id) + return session + + def queue_issues( + self, + session: PipelineSession, + issues: List[CodeIssue], + min_level: Optional[IssueLevel] = None, + ) -> None: + """Map issues to fixers and add to session.fix_queue.""" + min_rank = _LEVEL_RANK.get(min_level, 0) if min_level else 0 + + for idx, issue in enumerate(issues): + if _LEVEL_RANK.get(issue.level, 0) < min_rank: + continue + fixer_name = self._find_fixer_name(issue) + item = FixQueueItem( + issue_id=f"issue_{idx:04d}", + file_path=str(issue.file_path), + line_number=issue.line_number, + level=_LEVEL_MAP.get(issue.level, "INFO"), + message=issue.message, + fixer_name=fixer_name or "none", + status=FixStatus.PENDING if fixer_name else FixStatus.SKIPPED, + ) + session.fix_queue.append(item) + self.store.save(session) + + def _find_fixer_name(self, issue: CodeIssue) -> Optional[str]: + if not self.fix_engine.can_fix(issue): + return None + for name, fixer in self.fix_engine.fixers.items(): + try: + result = fixer.preview(issue, "x = 1\n") + if result.success: + return name + except Exception: + continue + return None + + def apply( + self, + session: PipelineSession, + dry_run: bool = True, + verify: bool = True, + ) -> None: + """Verify + write fixes, record results in session.""" + session.state = SessionState.FIXING + self.store.save(session) + + files_to_fix: Dict[str, List[FixQueueItem]] = {} + for item in session.fix_queue: + if item.status == FixStatus.PENDING: + files_to_fix.setdefault(item.file_path, []).append(item) + + backup_session_id: Optional[str] = None + if not dry_run and files_to_fix: + backup_session_id = self.backup_manager.create_backup_session( + description=f"autofix session {session.session_id}" + ) + session.backup_session_id = backup_session_id + + for file_path_str, items in files_to_fix.items(): + file_path = Path(file_path_str) + if not file_path.exists(): + for item in items: + item.status = FixStatus.BLOCKED + item.block_reason = "file not found" + session.blocked_fixes.append(item) + continue + + issues_for_file = self._queue_items_to_issues(items, file_path) + try: + fixed_code, diff = self.fix_engine.fix_file( + file_path, issues_for_file, dry_run=dry_run, verify=verify + ) + except Exception as exc: + logger.warning("fix_file failed for %s: %s", file_path, exc) + for item in items: + item.status = FixStatus.BLOCKED + item.block_reason = str(exc) + session.blocked_fixes.append(item) + continue + + if diff: + for item in items: + item.diff = diff + if not dry_run: + item.status = FixStatus.APPLIED + if backup_session_id: + item.backup_path = str( + self.backup_manager.backup_dir / backup_session_id / file_path.name + ) + session.applied_fixes.append(item) + else: + for item in items: + item.status = FixStatus.BLOCKED + item.block_reason = "verification blocked or no changes generated" + session.blocked_fixes.append(item) + + session.state = SessionState.FIXED + session.finished_at = datetime.now(timezone.utc).isoformat() + self.store.save(session) + + def _queue_items_to_issues(self, items: List[FixQueueItem], file_path: Path) -> List[CodeIssue]: + return [ + CodeIssue( + category=IssueCategory.COMPLEXITY, + level=_LEVEL_REVERSE.get(item.level, IssueLevel.INFO), + message=item.message, + file_path=file_path, + line_number=item.line_number, + column=0, + suggestion=None, + ) + for item in items + ] diff --git a/refactron/core/pipeline_session.py b/refactron/core/pipeline_session.py new file mode 100644 index 0000000..fc1a2ce --- /dev/null +++ b/refactron/core/pipeline_session.py @@ -0,0 +1,224 @@ +""" +PipelineSession — shared state carrier for the Refactron connected pipeline. + +One session is created per `refactron analyze` invocation and consumed by +every subsequent command (autofix, status, rollback). Persisted as JSON +in /.refactron/sessions/.json. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional + +_logger = logging.getLogger(__name__) + + +class SessionState(str, Enum): + ANALYZED = "analyzed" + FIXING = "fixing" + FIXED = "fixed" + ROLLED_BACK = "rolled_back" + + +class FixStatus(str, Enum): + PENDING = "pending" + APPLIED = "applied" + BLOCKED = "blocked" + SKIPPED = "skipped" + + +@dataclass +class FixQueueItem: + """One issue queued for fixing.""" + + issue_id: str + file_path: str + line_number: int + level: str + message: str + fixer_name: str + status: FixStatus = FixStatus.PENDING + diff: Optional[str] = None + block_reason: Optional[str] = None + backup_path: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "issue_id": self.issue_id, + "file_path": self.file_path, + "line_number": self.line_number, + "level": self.level, + "message": self.message, + "fixer_name": self.fixer_name, + "status": self.status.value, + "diff": self.diff, + "block_reason": self.block_reason, + "backup_path": self.backup_path, + } + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> FixQueueItem: + return cls( + issue_id=d["issue_id"], + file_path=d["file_path"], + line_number=d["line_number"], + level=d["level"], + message=d["message"], + fixer_name=d["fixer_name"], + status=FixStatus(d.get("status", "pending")), + diff=d.get("diff"), + block_reason=d.get("block_reason"), + backup_path=d.get("backup_path"), + ) + + +@dataclass +class PipelineSession: + """ + Central state object for one Refactron pipeline run. + + Created by `refactron analyze`, consumed by `refactron autofix`, + `refactron status`, and `refactron rollback`. + """ + + session_id: str + target: str + created_at: str + state: SessionState = SessionState.ANALYZED + total_files: int = 0 + total_issues: int = 0 + issues_by_level: Dict[str, int] = field(default_factory=dict) + fix_queue: List[FixQueueItem] = field(default_factory=list) + applied_fixes: List[FixQueueItem] = field(default_factory=list) + blocked_fixes: List[FixQueueItem] = field(default_factory=list) + backup_session_id: Optional[str] = None + finished_at: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "session_id": self.session_id, + "target": self.target, + "created_at": self.created_at, + "state": self.state.value, + "total_files": self.total_files, + "total_issues": self.total_issues, + "issues_by_level": self.issues_by_level, + "fix_queue": [i.to_dict() for i in self.fix_queue], + "applied_fixes": [i.to_dict() for i in self.applied_fixes], + "blocked_fixes": [i.to_dict() for i in self.blocked_fixes], + "backup_session_id": self.backup_session_id, + "finished_at": self.finished_at, + } + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> PipelineSession: + return cls( + session_id=d["session_id"], + target=d["target"], + created_at=d["created_at"], + state=SessionState(d.get("state", "analyzed")), + total_files=d.get("total_files", 0), + total_issues=d.get("total_issues", 0), + issues_by_level=d.get("issues_by_level", {}), + fix_queue=[FixQueueItem.from_dict(i) for i in d.get("fix_queue", [])], + applied_fixes=[FixQueueItem.from_dict(i) for i in d.get("applied_fixes", [])], + blocked_fixes=[FixQueueItem.from_dict(i) for i in d.get("blocked_fixes", [])], + backup_session_id=d.get("backup_session_id"), + finished_at=d.get("finished_at"), + ) + + +class SessionStore: + """Persists PipelineSession objects to /.refactron/sessions/.""" + + SESSIONS_DIR = Path(".refactron") / "sessions" + + def __init__(self, root_dir: Optional[Path] = None): + self.root_dir = Path(root_dir) if root_dir else Path.cwd() + self.sessions_dir = self.root_dir / self.SESSIONS_DIR + + def _session_path(self, session_id: str) -> Path: + return self.sessions_dir / f"{session_id}.json" + + def save(self, session: PipelineSession) -> None: + self.sessions_dir.mkdir(parents=True, exist_ok=True) + path = self._session_path(session.session_id) + path.write_text(json.dumps(session.to_dict(), indent=2), encoding="utf-8") + + def load(self, session_id: str) -> Optional[PipelineSession]: + path = self._session_path(session_id) + if not path.exists(): + return None + try: + session = PipelineSession.from_dict(json.loads(path.read_text(encoding="utf-8"))) + except (json.JSONDecodeError, KeyError, ValueError): + return None + if session.session_id != session_id: + _logger.warning( + "Session ID mismatch: requested %s, got %s", session_id, session.session_id + ) + return session + + def load_latest(self) -> Optional[PipelineSession]: + if not self.sessions_dir.exists(): + return None + paths = sorted(self.sessions_dir.glob("*.json")) + if not paths: + return None + try: + return PipelineSession.from_dict(json.loads(paths[-1].read_text(encoding="utf-8"))) + except (json.JSONDecodeError, KeyError, ValueError): + return None + + def list_sessions(self) -> List[PipelineSession]: + if not self.sessions_dir.exists(): + return [] + sessions: List[PipelineSession] = [] + for p in sorted(self.sessions_dir.glob("*.json")): + try: + data = json.loads(p.read_text(encoding="utf-8")) + sessions.append(PipelineSession.from_dict(data)) + except (json.JSONDecodeError, KeyError, ValueError): + pass + return sessions + + # ------------------------------------------------------------------ + # Current session pointer (.refactron/current) + # ------------------------------------------------------------------ + + @property + def _current_ptr(self) -> Path: + return self.root_dir / ".refactron" / "current" + + def set_current(self, session_id: str) -> None: + """Write session_id as the active session for this workspace.""" + self._current_ptr.parent.mkdir(parents=True, exist_ok=True) + self._current_ptr.write_text(session_id, encoding="utf-8") + + def get_current_id(self) -> Optional[str]: + """Return the active session ID, or None if not set.""" + if not self._current_ptr.exists(): + return None + sid = self._current_ptr.read_text(encoding="utf-8").strip() + return sid if sid else None + + def load_current(self) -> Optional[PipelineSession]: + """Load the active session for this workspace.""" + sid = self.get_current_id() + return self.load(sid) if sid else None + + def clear_current(self) -> None: + """Remove the current session pointer (e.g. after rollback).""" + if self._current_ptr.exists(): + self._current_ptr.unlink() + + @staticmethod + def make_session_id() -> str: + now = datetime.now(timezone.utc) + return f"sess_{now.strftime('%Y%m%d_%H%M%S_%f')}" diff --git a/tests/test_cli.py b/tests/test_cli.py index 9f66585..1e21f22 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1043,25 +1043,47 @@ def test_refactor_apply_with_operations(self, runner, tmp_path, mock_cfg, mock_r class TestAutofixCommand: + def _make_mock_pipeline(self, tmp_path): + """Return a mock RefactronPipeline suitable for autofix tests.""" + mock_session = MagicMock() + mock_session.session_id = "sess_test_123" + mock_session.fix_queue = [] + mock_session.applied_fixes = [] + mock_session.blocked_fixes = [] + + mock_pipeline = MagicMock() + mock_pipeline.store.load.return_value = None + mock_pipeline.analyze.return_value = mock_session + mock_pipeline._last_result = None + mock_pipeline.apply.return_value = None + + return mock_pipeline, mock_session + def test_autofix_preview_mode(self, runner, tmp_path, mock_cfg): py_file = tmp_path / "sample.py" py_file.write_text("x = 1\n") + mock_pipeline, _ = self._make_mock_pipeline(tmp_path) with patch("refactron.cli.refactor._load_config", return_value=mock_cfg), patch( "refactron.cli.refactor._validate_path", return_value=tmp_path ), patch("refactron.cli.refactor._print_file_count"), patch( "refactron.cli.refactor._auth_banner" + ), patch( + "refactron.core.pipeline.RefactronPipeline", return_value=mock_pipeline ): result = runner.invoke(autofix, [str(py_file)]) assert result.exit_code == 0 - assert "LOW" in result.output or "Available" in result.output + assert "Dry-run complete" in result.output or "session" in result.output.lower() def test_autofix_apply_mode(self, runner, tmp_path, mock_cfg): py_file = tmp_path / "sample.py" py_file.write_text("x = 1\n") + mock_pipeline, _ = self._make_mock_pipeline(tmp_path) with patch("refactron.cli.refactor._load_config", return_value=mock_cfg), patch( "refactron.cli.refactor._validate_path", return_value=tmp_path ), patch("refactron.cli.refactor._print_file_count"), patch( "refactron.cli.refactor._auth_banner" + ), patch( + "refactron.core.pipeline.RefactronPipeline", return_value=mock_pipeline ): result = runner.invoke(autofix, [str(py_file), "--apply"]) assert result.exit_code == 0 @@ -1069,11 +1091,14 @@ def test_autofix_apply_mode(self, runner, tmp_path, mock_cfg): def test_autofix_safety_levels(self, runner, tmp_path, mock_cfg): py_file = tmp_path / "s.py" py_file.write_text("x = 1\n") + mock_pipeline, _ = self._make_mock_pipeline(tmp_path) for level in ["safe", "low", "moderate", "high"]: with patch("refactron.cli.refactor._load_config", return_value=mock_cfg), patch( "refactron.cli.refactor._validate_path", return_value=tmp_path ), patch("refactron.cli.refactor._print_file_count"), patch( "refactron.cli.refactor._auth_banner" + ), patch( + "refactron.core.pipeline.RefactronPipeline", return_value=mock_pipeline ): result = runner.invoke(autofix, [str(py_file), "--safety-level", level]) assert result.exit_code == 0 diff --git a/tests/test_cli_run.py b/tests/test_cli_run.py new file mode 100644 index 0000000..43bc506 --- /dev/null +++ b/tests/test_cli_run.py @@ -0,0 +1,70 @@ +"""Tests for `refactron run` one-shot pipeline command.""" + +from unittest.mock import MagicMock, patch + +from click.testing import CliRunner + +from refactron.cli.run import run +from refactron.core.pipeline_session import PipelineSession, SessionState + + +def _make_session(tmp_path, state=SessionState.FIXED): + return PipelineSession( + session_id="sess_run_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00+00:00", + state=state, + total_files=2, + total_issues=3, + issues_by_level={"CRITICAL": 1, "ERROR": 0, "WARNING": 2, "INFO": 0}, + applied_fixes=[], + blocked_fixes=[], + ) + + +class TestRunCommand: + def test_run_dry_run_exits_0(self, tmp_path): + (tmp_path / "foo.py").write_text("x = 1\n") + runner = CliRunner() + session = _make_session(tmp_path) + + with patch("refactron.cli.run.RefactronPipeline") as mock_cls: + mock_pipeline = MagicMock() + mock_pipeline.analyze.return_value = session + mock_pipeline._last_result = None + mock_pipeline.store = MagicMock() + mock_cls.return_value = mock_pipeline + result = runner.invoke(run, [str(tmp_path), "--dry-run"]) + + assert result.exit_code == 0 + + def test_run_fail_on_critical_exits_1_when_critical_found(self, tmp_path): + (tmp_path / "foo.py").write_text("x = 1\n") + runner = CliRunner() + session = _make_session(tmp_path) + session.issues_by_level = {"CRITICAL": 1, "ERROR": 0, "WARNING": 0, "INFO": 0} + + with patch("refactron.cli.run.RefactronPipeline") as mock_cls: + mock_pipeline = MagicMock() + mock_pipeline.analyze.return_value = session + mock_pipeline._last_result = None + mock_pipeline.store = MagicMock() + mock_cls.return_value = mock_pipeline + result = runner.invoke(run, [str(tmp_path), "--dry-run", "--fail-on", "CRITICAL"]) + + assert result.exit_code == 1 + + def test_run_prints_session_id(self, tmp_path): + (tmp_path / "foo.py").write_text("x = 1\n") + runner = CliRunner() + session = _make_session(tmp_path) + + with patch("refactron.cli.run.RefactronPipeline") as mock_cls: + mock_pipeline = MagicMock() + mock_pipeline.analyze.return_value = session + mock_pipeline._last_result = None + mock_pipeline.store = MagicMock() + mock_cls.return_value = mock_pipeline + result = runner.invoke(run, [str(tmp_path), "--dry-run"]) + + assert "sess_run_001" in result.output diff --git a/tests/test_cli_status.py b/tests/test_cli_status.py new file mode 100644 index 0000000..5e798b7 --- /dev/null +++ b/tests/test_cli_status.py @@ -0,0 +1,70 @@ +"""Tests for `refactron status` command.""" + +from unittest.mock import patch + +from click.testing import CliRunner + +from refactron.cli.status import status +from refactron.core.pipeline_session import PipelineSession, SessionState + + +def _make_session(tmp_path, state=SessionState.ANALYZED): + return PipelineSession( + session_id="sess_test_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00+00:00", + state=state, + total_files=5, + total_issues=12, + issues_by_level={"CRITICAL": 2, "ERROR": 0, "WARNING": 7, "INFO": 3}, + ) + + +class TestStatusCommand: + def test_status_shows_session_info(self, tmp_path): + session = _make_session(tmp_path) + runner = CliRunner() + with patch("refactron.cli.status.SessionStore") as mock_store_cls: + mock_store_cls.return_value.load_current.return_value = session + mock_store_cls.return_value.load_latest.return_value = session + result = runner.invoke(status, ["--project-root", str(tmp_path)]) + assert result.exit_code == 0 + assert "sess_test_001" in result.output + assert "5" in result.output + + def test_status_by_session_id(self, tmp_path): + session = _make_session(tmp_path) + runner = CliRunner() + with patch("refactron.cli.status.SessionStore") as mock_store_cls: + mock_store_cls.return_value.load.return_value = session + result = runner.invoke( + status, ["--session", "sess_test_001", "--project-root", str(tmp_path)] + ) + assert result.exit_code == 0 + assert "sess_test_001" in result.output + + def test_status_no_session_shows_message(self, tmp_path): + runner = CliRunner() + with patch("refactron.cli.status.SessionStore") as mock_store_cls: + mock_store_cls.return_value.load_current.return_value = None + mock_store_cls.return_value.load_latest.return_value = None + result = runner.invoke(status, ["--project-root", str(tmp_path)]) + assert result.exit_code == 0 + assert "No session" in result.output + + def test_status_lists_all_sessions(self, tmp_path): + runner = CliRunner() + sessions = [ + PipelineSession( + session_id=f"sess_{i:03d}", + target=str(tmp_path), + created_at="2026-04-03T18:00:00+00:00", + ) + for i in range(3) + ] + with patch("refactron.cli.status.SessionStore") as mock_store_cls: + mock_store_cls.return_value.list_sessions.return_value = sessions + result = runner.invoke(status, ["--list", "--project-root", str(tmp_path)]) + assert result.exit_code == 0 + assert "sess_000" in result.output + assert "sess_002" in result.output diff --git a/tests/test_pipeline_integration.py b/tests/test_pipeline_integration.py new file mode 100644 index 0000000..e04d3c6 --- /dev/null +++ b/tests/test_pipeline_integration.py @@ -0,0 +1,131 @@ +"""Integration tests: full pipeline analyze → queue → apply → state checks.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from refactron.core.models import CodeIssue, FileMetrics, IssueCategory, IssueLevel +from refactron.core.pipeline import RefactronPipeline +from refactron.core.pipeline_session import FixStatus, SessionState + + +def _make_file_metrics(file_path: Path, issues=None): + return FileMetrics( + file_path=file_path, + lines_of_code=20, + comment_lines=0, + blank_lines=0, + complexity=5.0, + maintainability_index=70.0, + functions=1, + classes=0, + issues=issues or [], + ) + + +def _make_issue(file_path: Path, level=IssueLevel.WARNING): + return CodeIssue( + category=IssueCategory.COMPLEXITY, + level=level, + message="magic number 42", + file_path=file_path, + line_number=3, + column=0, + suggestion="extract to constant", + ) + + +class TestFullPipeline: + def test_analyze_queue_apply_roundtrip(self, tmp_path): + py_file = tmp_path / "sample.py" + py_file.write_text("x = 42\ny = x + 1\n") + issue = _make_issue(py_file) + + pipeline = RefactronPipeline(project_root=tmp_path) + + mock_result = MagicMock() + mock_result.total_files = 1 + mock_result.total_issues = 1 + mock_result.file_metrics = [_make_file_metrics(py_file, issues=[issue])] + + with patch("refactron.core.pipeline.Refactron") as mock_cls: + mock_cls.return_value.analyze.return_value = mock_result + session = pipeline.analyze(tmp_path) + + assert session.total_files == 1 + assert session.state == SessionState.ANALYZED + + pipeline.queue_issues(session, [issue]) + assert len(session.fix_queue) > 0 + + pipeline.apply(session, dry_run=True) + assert session.state == SessionState.FIXED + + def test_session_persisted_after_analyze(self, tmp_path): + py_file = tmp_path / "sample.py" + py_file.write_text("x = 1\n") + + pipeline = RefactronPipeline(project_root=tmp_path) + mock_result = MagicMock() + mock_result.total_files = 1 + mock_result.total_issues = 0 + mock_result.file_metrics = [] + + with patch("refactron.core.pipeline.Refactron") as mock_cls: + mock_cls.return_value.analyze.return_value = mock_result + session = pipeline.analyze(tmp_path) + + loaded = pipeline.store.load(session.session_id) + assert loaded is not None + assert loaded.session_id == session.session_id + + def test_blocked_fix_recorded_when_file_missing(self, tmp_path): + pipeline = RefactronPipeline(project_root=tmp_path) + mock_result = MagicMock() + mock_result.total_files = 1 + mock_result.total_issues = 1 + mock_result.file_metrics = [] + + with patch("refactron.core.pipeline.Refactron") as mock_cls: + mock_cls.return_value.analyze.return_value = mock_result + session = pipeline.analyze(tmp_path) + + ghost_issue = _make_issue(Path("/nonexistent/ghost.py")) + + # Patch fix_engine so it thinks it can fix the issue (fixer_name != None), + # causing the item to be PENDING and then BLOCKED when file is missing. + with patch.object(pipeline.fix_engine, "can_fix", return_value=True), patch.object( + pipeline.fix_engine, + "fixers", + {"magic_number": MagicMock(preview=MagicMock(return_value=MagicMock(success=True)))}, + ): + pipeline.queue_issues(session, [ghost_issue]) + + pipeline.apply(session, dry_run=False) + + # Ghost file doesn't exist → PENDING item should be moved to blocked_fixes + all_blocked = session.blocked_fixes + [ + i for i in session.fix_queue if i.status == FixStatus.BLOCKED + ] + assert len(all_blocked) > 0 + + def test_min_level_filter_in_queue_issues(self, tmp_path): + py_file = tmp_path / "sample.py" + py_file.write_text("x = 1\n") + pipeline = RefactronPipeline(project_root=tmp_path) + mock_result = MagicMock() + mock_result.total_files = 1 + mock_result.total_issues = 2 + mock_result.file_metrics = [] + + with patch("refactron.core.pipeline.Refactron") as mock_cls: + mock_cls.return_value.analyze.return_value = mock_result + session = pipeline.analyze(tmp_path) + + critical_issue = _make_issue(py_file, level=IssueLevel.CRITICAL) + info_issue = _make_issue(py_file, level=IssueLevel.INFO) + + pipeline.queue_issues(session, [critical_issue, info_issue], min_level=IssueLevel.ERROR) + + # INFO issue should be filtered out (rank 0 < ERROR rank 2) + assert len(session.fix_queue) == 1 + assert session.fix_queue[0].level == "CRITICAL" diff --git a/tests/test_pipeline_orchestrator.py b/tests/test_pipeline_orchestrator.py new file mode 100644 index 0000000..9079c38 --- /dev/null +++ b/tests/test_pipeline_orchestrator.py @@ -0,0 +1,118 @@ +"""Tests for RefactronPipeline orchestrator.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from refactron.core.models import CodeIssue, IssueCategory, IssueLevel +from refactron.core.pipeline import RefactronPipeline +from refactron.core.pipeline_session import ( # noqa: F401 + FixStatus, + PipelineSession, + SessionState, + SessionStore, +) + + +def _make_issue(file_path: Path, level: IssueLevel = IssueLevel.CRITICAL) -> CodeIssue: + return CodeIssue( + category=IssueCategory.COMPLEXITY, + level=level, + message="too complex", + file_path=file_path, + line_number=5, + column=0, + suggestion="simplify", + ) + + +class TestRefactronPipeline: + def test_analyze_creates_session(self, tmp_path): + (tmp_path / "foo.py").write_text("x = 1\n") + pipeline = RefactronPipeline(project_root=tmp_path) + + mock_result = MagicMock() + mock_result.total_files = 1 + mock_result.total_issues = 0 + mock_result.file_metrics = [] + + with patch("refactron.core.pipeline.Refactron") as mock_cls: + mock_cls.return_value.analyze.return_value = mock_result + session = pipeline.analyze(tmp_path) + + assert session.session_id.startswith("sess_") + assert session.total_files == 1 + assert session.state == SessionState.ANALYZED + + def test_analyze_saves_session_to_disk(self, tmp_path): + (tmp_path / "foo.py").write_text("x = 1\n") + pipeline = RefactronPipeline(project_root=tmp_path) + + mock_result = MagicMock() + mock_result.total_files = 1 + mock_result.total_issues = 0 + mock_result.file_metrics = [] + + with patch("refactron.core.pipeline.Refactron") as mock_cls: + mock_cls.return_value.analyze.return_value = mock_result + session = pipeline.analyze(tmp_path) + + store = SessionStore(root_dir=tmp_path) + loaded = store.load(session.session_id) + assert loaded is not None + assert loaded.session_id == session.session_id + + def test_queue_issues_adds_to_fix_queue(self, tmp_path): + py_file = tmp_path / "foo.py" + py_file.write_text("x = 1\n") + pipeline = RefactronPipeline(project_root=tmp_path) + session = PipelineSession( + session_id="sess_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + issue = _make_issue(py_file) + pipeline.queue_issues(session, [issue]) + # Queue should have the issue (either PENDING or SKIPPED) + assert len(session.fix_queue) == 1 + + def test_apply_dry_run_does_not_write(self, tmp_path): + py_file = tmp_path / "foo.py" + py_file.write_text("x = 1\n") + pipeline = RefactronPipeline(project_root=tmp_path) + session = PipelineSession( + session_id="sess_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + issue = _make_issue(py_file) + pipeline.queue_issues(session, [issue]) + pipeline.apply(session, dry_run=True) + assert py_file.read_text() == "x = 1\n" + + def test_state_transitions_to_fixed_after_apply(self, tmp_path): + py_file = tmp_path / "foo.py" + py_file.write_text("x = 1\n") + pipeline = RefactronPipeline(project_root=tmp_path) + session = PipelineSession( + session_id="sess_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + pipeline.apply(session, dry_run=True) + assert session.state == SessionState.FIXED + + def test_last_result_stored_after_analyze(self, tmp_path): + (tmp_path / "foo.py").write_text("x = 1\n") + pipeline = RefactronPipeline(project_root=tmp_path) + + mock_result = MagicMock() + mock_result.total_files = 2 + mock_result.total_issues = 0 + mock_result.file_metrics = [] + + with patch("refactron.core.pipeline.Refactron") as mock_cls: + mock_cls.return_value.analyze.return_value = mock_result + pipeline.analyze(tmp_path) + + assert pipeline._last_result is not None + assert pipeline._last_result.total_files == 2 diff --git a/tests/test_pipeline_session.py b/tests/test_pipeline_session.py new file mode 100644 index 0000000..bc4b89c --- /dev/null +++ b/tests/test_pipeline_session.py @@ -0,0 +1,128 @@ +"""Tests for PipelineSession data model and SessionStore.""" + +from refactron.core.pipeline_session import ( + FixQueueItem, + FixStatus, + PipelineSession, + SessionState, + SessionStore, +) + + +class TestPipelineSession: + def test_session_created_with_defaults(self, tmp_path): + session = PipelineSession( + session_id="sess_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + assert session.session_id == "sess_001" + assert session.state == SessionState.ANALYZED + assert session.fix_queue == [] + assert session.applied_fixes == [] + assert session.blocked_fixes == [] + + def test_session_to_dict_roundtrip(self, tmp_path): + item = FixQueueItem( + issue_id="i001", + file_path="/tmp/foo.py", + line_number=5, + level="CRITICAL", + message="test", + fixer_name="SomeFixer", + status=FixStatus.APPLIED, + diff="--- a\n+++ b", + block_reason=None, + backup_path="/tmp/backup/foo.py", + ) + session = PipelineSession( + session_id="sess_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + total_files=3, + total_issues=7, + issues_by_level={"CRITICAL": 2, "WARNING": 5}, + fix_queue=[item], + backup_session_id="backup_abc", + finished_at="2026-04-03T18:05:00", + ) + d = session.to_dict() + restored = PipelineSession.from_dict(d) + assert restored.session_id == session.session_id + assert restored.target == session.target + assert restored.created_at == session.created_at + assert restored.total_files == 3 + assert restored.issues_by_level == {"CRITICAL": 2, "WARNING": 5} + assert restored.backup_session_id == "backup_abc" + assert restored.finished_at == "2026-04-03T18:05:00" + assert len(restored.fix_queue) == 1 + restored_item = restored.fix_queue[0] + assert restored_item.issue_id == "i001" + assert restored_item.status == FixStatus.APPLIED + assert restored_item.diff == "--- a\n+++ b" + assert restored_item.backup_path == "/tmp/backup/foo.py" + + def test_queue_item_pending_by_default(self): + item = FixQueueItem( + issue_id="issue_001", + file_path="/tmp/foo.py", + line_number=10, + level="CRITICAL", + message="too complex", + fixer_name="ExtractMagicNumbersFixer", + ) + assert item.status == FixStatus.PENDING + + +class TestSessionStore: + def test_save_and_load(self, tmp_path): + store = SessionStore(root_dir=tmp_path) + session = PipelineSession( + session_id="sess_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + store.save(session) + loaded = store.load("sess_001") + assert loaded.session_id == "sess_001" + + def test_load_latest(self, tmp_path): + store = SessionStore(root_dir=tmp_path) + for sid in ["sess_001", "sess_002"]: + store.save( + PipelineSession( + session_id=sid, + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + ) + latest = store.load_latest() + assert latest is not None + assert latest.session_id == "sess_002" + + def test_load_missing_returns_none(self, tmp_path): + store = SessionStore(root_dir=tmp_path) + assert store.load("does_not_exist") is None + + def test_list_sessions(self, tmp_path): + store = SessionStore(root_dir=tmp_path) + for sid in ["sess_001", "sess_002", "sess_003"]: + store.save( + PipelineSession( + session_id=sid, + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + ) + sessions = store.list_sessions() + assert len(sessions) == 3 + + def test_sessions_dir_created_automatically(self, tmp_path): + store = SessionStore(root_dir=tmp_path) + session = PipelineSession( + session_id="sess_001", + target=str(tmp_path), + created_at="2026-04-03T18:00:00", + ) + store.save(session) + assert (tmp_path / ".refactron" / "sessions").exists()