From 4741f0e0636452bdf04b77d831fae547970ed56a Mon Sep 17 00:00:00 2001 From: research-developer Date: Thu, 18 Dec 2025 09:18:41 -0600 Subject: [PATCH 1/7] feat(polling): Add local polling infrastructure for GitHub and Railway MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the foundation for a local PR-Agent daemon that polls: - GitHub notifications for @mentions on PRs - Railway API for deployment status changes New packages: - pr_agent/polling/ - Polling infrastructure - base.py: Abstract BasePoller with async generator pattern - events.py: Pydantic event models (GitHub/Railway) - github.py: GitHubPoller stub for notification polling - railway.py: RailwayPoller stub for deployment tracking - dispatcher.py: Event routing and handler registration - pr_agent/cli/ - CLI for daemon management - config.py: Config management (~/.pr-agent/config.json) - main.py: Typer CLI (init, start, stop, status, config) Dependencies added: typer, rich, httpx πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pr_agent/cli/__init__.py | 15 + pr_agent/cli/config.py | 243 ++++++++++++++++ pr_agent/cli/main.py | 513 +++++++++++++++++++++++++++++++++ pr_agent/polling/__init__.py | 20 ++ pr_agent/polling/base.py | 181 ++++++++++++ pr_agent/polling/dispatcher.py | 257 +++++++++++++++++ pr_agent/polling/events.py | 162 +++++++++++ pr_agent/polling/github.py | 222 ++++++++++++++ pr_agent/polling/railway.py | 230 +++++++++++++++ requirements.txt | 3 + 10 files changed, 1846 insertions(+) create mode 100644 pr_agent/cli/__init__.py create mode 100644 pr_agent/cli/config.py create mode 100644 pr_agent/cli/main.py create mode 100644 pr_agent/polling/__init__.py create mode 100644 pr_agent/polling/base.py create mode 100644 pr_agent/polling/dispatcher.py create mode 100644 pr_agent/polling/events.py create mode 100644 pr_agent/polling/github.py create mode 100644 pr_agent/polling/railway.py diff --git a/pr_agent/cli/__init__.py b/pr_agent/cli/__init__.py new file mode 100644 index 0000000000..d4fbe1c8e4 --- /dev/null +++ b/pr_agent/cli/__init__.py @@ -0,0 +1,15 @@ +""" +PR-Agent CLI for local daemon management. + +Commands: + pr-agent init - Add current repo to config + pr-agent start - Start polling daemon + pr-agent stop - Stop daemon + pr-agent status - Show daemon status + pr-agent config - View/edit config +""" + +from pr_agent.cli.config import Config, load_config, save_config +from pr_agent.cli.main import main as run + +__all__ = ["Config", "load_config", "save_config", "run"] diff --git a/pr_agent/cli/config.py b/pr_agent/cli/config.py new file mode 100644 index 0000000000..70a67f92b3 --- /dev/null +++ b/pr_agent/cli/config.py @@ -0,0 +1,243 @@ +""" +Configuration management for PR-Agent CLI. + +Handles loading/saving config from ~/.pr-agent/config.json +""" + +import json +import os +from pathlib import Path +from typing import Optional +from pydantic import BaseModel, Field + + +# Default config directory +DEFAULT_CONFIG_DIR = Path.home() / ".pr-agent" +DEFAULT_CONFIG_FILE = DEFAULT_CONFIG_DIR / "config.json" +DEFAULT_LOG_DIR = DEFAULT_CONFIG_DIR / "logs" +DEFAULT_PID_FILE = DEFAULT_CONFIG_DIR / "daemon.pid" + + +class Credentials(BaseModel): + """API credentials for various services.""" + + github_token: Optional[str] = Field(None, description="GitHub personal access token") + anthropic_api_key: Optional[str] = Field(None, description="Anthropic API key") + openai_api_key: Optional[str] = Field(None, description="OpenAI API key") + railway_token: Optional[str] = Field(None, description="Railway API token") + + +class Settings(BaseModel): + """Global settings for the daemon.""" + + poll_interval_github: int = Field(30, description="GitHub poll interval in seconds") + poll_interval_railway: int = Field(60, description="Railway poll interval in seconds") + model: str = Field("claude-sonnet-4-20250514", description="Default AI model") + log_level: str = Field("INFO", description="Logging level") + github_username: Optional[str] = Field(None, description="GitHub bot username") + + +class RepoConfig(BaseModel): + """Configuration for a tracked repository.""" + + path: str = Field(..., description="Local path to repository") + github_repo: str = Field(..., description="GitHub repo (owner/repo)") + + # Railway integration (optional) + railway_project_id: Optional[str] = Field(None, description="Railway project ID") + railway_service_ids: list[str] = Field( + default_factory=list, description="Railway service IDs to monitor" + ) + + # Auto-commands on PR open + auto_review: bool = Field(True, description="Auto-run /review on new PRs") + auto_describe: bool = Field(False, description="Auto-run /describe on new PRs") + + +class Config(BaseModel): + """ + Root configuration model for PR-Agent CLI. + + Stored at ~/.pr-agent/config.json + """ + + version: str = Field("1.0", description="Config version") + credentials: Credentials = Field( + default_factory=Credentials, description="API credentials" + ) + settings: Settings = Field(default_factory=Settings, description="Global settings") + repos: list[RepoConfig] = Field( + default_factory=list, description="Tracked repositories" + ) + + def get_repo(self, path: Optional[str] = None, github_repo: Optional[str] = None) -> Optional[RepoConfig]: + """ + Find a repo config by path or GitHub repo name. + + Args: + path: Local path to search for + github_repo: GitHub repo name to search for + + Returns: + RepoConfig if found, None otherwise + """ + for repo in self.repos: + if path and repo.path == path: + return repo + if github_repo and repo.github_repo == github_repo: + return repo + return None + + def add_repo(self, repo: RepoConfig) -> None: + """ + Add or update a repository config. + + Args: + repo: RepoConfig to add + """ + # Check if repo already exists + existing = self.get_repo(path=repo.path) + if existing: + # Update existing + idx = self.repos.index(existing) + self.repos[idx] = repo + else: + self.repos.append(repo) + + def remove_repo(self, path: str) -> bool: + """ + Remove a repository from config. + + Args: + path: Local path of repo to remove + + Returns: + True if removed, False if not found + """ + repo = self.get_repo(path=path) + if repo: + self.repos.remove(repo) + return True + return False + + +def ensure_config_dir() -> Path: + """ + Ensure the config directory exists. + + Returns: + Path to config directory + """ + DEFAULT_CONFIG_DIR.mkdir(parents=True, exist_ok=True) + DEFAULT_LOG_DIR.mkdir(parents=True, exist_ok=True) + return DEFAULT_CONFIG_DIR + + +def load_config(config_path: Optional[Path] = None) -> Config: + """ + Load configuration from file. + + Args: + config_path: Path to config file (default: ~/.pr-agent/config.json) + + Returns: + Config object (empty config if file doesn't exist) + """ + path = config_path or DEFAULT_CONFIG_FILE + + if not path.exists(): + return Config() + + try: + with open(path, "r") as f: + data = json.load(f) + return Config.model_validate(data) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in config file: {e}") + except Exception as e: + raise ValueError(f"Failed to load config: {e}") + + +def save_config(config: Config, config_path: Optional[Path] = None) -> None: + """ + Save configuration to file. + + Args: + config: Config object to save + config_path: Path to config file (default: ~/.pr-agent/config.json) + """ + path = config_path or DEFAULT_CONFIG_FILE + ensure_config_dir() + + with open(path, "w") as f: + json.dump(config.model_dump(), f, indent=2) + + +def get_credential(name: str, config: Optional[Config] = None) -> Optional[str]: + """ + Get a credential from config or environment. + + Environment variables take precedence over config file. + + Args: + name: Credential name (e.g., "github_token") + config: Config object (will load if not provided) + + Returns: + Credential value or None + """ + # Check environment first + env_name = name.upper() + env_value = os.environ.get(env_name) + if env_value: + return env_value + + # Fall back to config + if config is None: + config = load_config() + + return getattr(config.credentials, name, None) + + +def detect_github_repo(path: Path) -> Optional[str]: + """ + Detect GitHub repo from git remote. + + Args: + path: Path to git repository + + Returns: + GitHub repo name (owner/repo) or None + """ + import subprocess + + try: + result = subprocess.run( + ["git", "remote", "get-url", "origin"], + cwd=path, + capture_output=True, + text=True, + ) + if result.returncode != 0: + return None + + url = result.stdout.strip() + + # Parse GitHub URL + # https://github.com/owner/repo.git + # git@github.com:owner/repo.git + if "github.com" in url: + if url.startswith("git@"): + # git@github.com:owner/repo.git + parts = url.split(":")[-1] + else: + # https://github.com/owner/repo.git + parts = url.split("github.com/")[-1] + + # Remove .git suffix + repo = parts.rstrip(".git") + return repo + + return None + except Exception: + return None diff --git a/pr_agent/cli/main.py b/pr_agent/cli/main.py new file mode 100644 index 0000000000..a2c9491f39 --- /dev/null +++ b/pr_agent/cli/main.py @@ -0,0 +1,513 @@ +""" +PR-Agent CLI entry point. + +Commands: + pr-agent init - Add current repo to config + pr-agent start - Start polling daemon + pr-agent stop - Stop daemon + pr-agent status - Show daemon status + pr-agent config - View/edit config +""" + +import os +import signal +import sys +from pathlib import Path +from typing import Optional + +import typer +from rich.console import Console +from rich.table import Table + +from pr_agent.cli.config import ( + Config, + RepoConfig, + DEFAULT_CONFIG_FILE, + DEFAULT_LOG_DIR, + DEFAULT_PID_FILE, + load_config, + save_config, + get_credential, + detect_github_repo, + ensure_config_dir, +) + +app = typer.Typer( + name="pr-agent", + help="Local PR-Agent daemon for GitHub and Railway polling.", + add_completion=False, +) +console = Console() + + +def _get_pid() -> Optional[int]: + """Get the PID of the running daemon, if any.""" + if not DEFAULT_PID_FILE.exists(): + return None + try: + pid = int(DEFAULT_PID_FILE.read_text().strip()) + # Check if process is still running + os.kill(pid, 0) + return pid + except (ValueError, ProcessLookupError, PermissionError): + # PID file exists but process is dead + DEFAULT_PID_FILE.unlink(missing_ok=True) + return None + + +def _is_daemon_running() -> bool: + """Check if the daemon is currently running.""" + return _get_pid() is not None + + +@app.command() +def init( + path: Optional[Path] = typer.Argument( + None, + help="Path to repository (default: current directory)", + ), + github_repo: Optional[str] = typer.Option( + None, + "--github-repo", "-g", + help="GitHub repo (owner/repo). Auto-detected if not provided.", + ), + railway_project: Optional[str] = typer.Option( + None, + "--railway-project", "-r", + help="Railway project ID to link.", + ), + auto_review: bool = typer.Option( + True, + "--auto-review/--no-auto-review", + help="Auto-run /review on new PRs.", + ), + auto_describe: bool = typer.Option( + False, + "--auto-describe/--no-auto-describe", + help="Auto-run /describe on new PRs.", + ), +): + """ + Add current repository to PR-Agent config. + + Detects the GitHub repo from git remote and stores it in ~/.pr-agent/config.json + """ + repo_path = path or Path.cwd() + repo_path = repo_path.resolve() + + # Check if it's a git repo + if not (repo_path / ".git").exists(): + console.print(f"[red]Error:[/red] {repo_path} is not a git repository") + raise typer.Exit(1) + + # Detect or validate GitHub repo + detected_repo = detect_github_repo(repo_path) + if github_repo: + repo_name = github_repo + elif detected_repo: + repo_name = detected_repo + console.print(f"[dim]Detected GitHub repo:[/dim] {repo_name}") + else: + console.print("[red]Error:[/red] Could not detect GitHub repo from git remote.") + console.print("Please specify with --github-repo owner/repo") + raise typer.Exit(1) + + # Load existing config + config = load_config() + + # Check for required credentials + github_token = get_credential("github_token", config) + if not github_token: + console.print("[yellow]Warning:[/yellow] GitHub token not configured.") + console.print("Set GITHUB_TOKEN environment variable or run 'pr-agent config set github_token '") + + anthropic_key = get_credential("anthropic_api_key", config) + if not anthropic_key: + console.print("[yellow]Warning:[/yellow] Anthropic API key not configured.") + console.print("Set ANTHROPIC_API_KEY environment variable or run 'pr-agent config set anthropic_api_key '") + + # Create repo config + repo_config = RepoConfig( + path=str(repo_path), + github_repo=repo_name, + railway_project_id=railway_project, + auto_review=auto_review, + auto_describe=auto_describe, + ) + + # Add to config + existing = config.get_repo(path=str(repo_path)) + if existing: + console.print(f"[yellow]Updating existing config for:[/yellow] {repo_path}") + else: + console.print(f"[green]Adding repository:[/green] {repo_path}") + + config.add_repo(repo_config) + save_config(config) + + console.print(f"[green]βœ“[/green] Saved to {DEFAULT_CONFIG_FILE}") + + +@app.command() +def start( + foreground: bool = typer.Option( + False, + "--foreground", "-f", + help="Run in foreground instead of as daemon.", + ), +): + """ + Start the PR-Agent polling daemon. + + The daemon polls GitHub notifications and Railway deployments for tracked repos. + """ + if _is_daemon_running(): + pid = _get_pid() + console.print(f"[yellow]Daemon already running[/yellow] (PID: {pid})") + raise typer.Exit(1) + + config = load_config() + + if not config.repos: + console.print("[red]Error:[/red] No repositories configured.") + console.print("Run 'pr-agent init' in a repository first.") + raise typer.Exit(1) + + # Check credentials + github_token = get_credential("github_token", config) + if not github_token: + console.print("[red]Error:[/red] GitHub token not configured.") + raise typer.Exit(1) + + ensure_config_dir() + + if foreground: + console.print("[green]Starting PR-Agent in foreground...[/green]") + console.print(f"Watching {len(config.repos)} repo(s)") + console.print("Press Ctrl+C to stop") + _run_daemon(config) + else: + # Fork to background + console.print("[green]Starting PR-Agent daemon...[/green]") + + pid = os.fork() + if pid > 0: + # Parent process + console.print(f"[green]βœ“[/green] Daemon started (PID: {pid})") + console.print(f" Watching: {len(config.repos)} repo(s)") + console.print(f" GitHub poll: {config.settings.poll_interval_github}s") + console.print(f" Railway poll: {config.settings.poll_interval_railway}s") + console.print(f" Logs: {DEFAULT_LOG_DIR}") + return + + # Child process - become daemon + os.setsid() + os.chdir("/") + + # Redirect stdio to log file + log_file = DEFAULT_LOG_DIR / "daemon.log" + with open(log_file, "a") as f: + os.dup2(f.fileno(), sys.stdout.fileno()) + os.dup2(f.fileno(), sys.stderr.fileno()) + + # Write PID file + DEFAULT_PID_FILE.write_text(str(os.getpid())) + + try: + _run_daemon(config) + finally: + DEFAULT_PID_FILE.unlink(missing_ok=True) + + +def _run_daemon(config: Config): + """ + Run the polling daemon. + + This is the main daemon loop that runs until stopped. + """ + import asyncio + import logging + + # Setup logging + logging.basicConfig( + level=getattr(logging, config.settings.log_level), + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + ) + logger = logging.getLogger("pr_agent.daemon") + + async def main(): + from pr_agent.polling.dispatcher import EventDispatcher, create_pr_agent_handler + from pr_agent.polling.github import GitHubPoller + from pr_agent.polling.events import EventType + + dispatcher = EventDispatcher() + + # Setup GitHub poller + github_token = get_credential("github_token", config) + username = config.settings.github_username or "pr-agent" + + tracked_repos = [r.github_repo for r in config.repos] + + github_poller = GitHubPoller( + token=github_token, + username=username, + poll_interval=config.settings.poll_interval_github, + tracked_repos=tracked_repos, + ) + dispatcher.add_poller(github_poller) + + # Register handlers + handler = await create_pr_agent_handler() + dispatcher.register_handler(EventType.GITHUB_PR_MENTION, handler) + + # Setup Railway pollers for repos that have it configured + railway_token = get_credential("railway_token", config) + if railway_token: + from pr_agent.polling.railway import RailwayPoller + + for repo in config.repos: + if repo.railway_project_id: + railway_poller = RailwayPoller( + token=railway_token, + project_id=repo.railway_project_id, + poll_interval=config.settings.poll_interval_railway, + service_ids=repo.railway_service_ids or None, + ) + dispatcher.add_poller(railway_poller) + + logger.info(f"Starting daemon with {len(dispatcher.pollers)} poller(s)") + + # Handle shutdown signals + def handle_signal(sig, frame): + logger.info(f"Received signal {sig}, shutting down...") + asyncio.create_task(dispatcher.stop()) + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + await dispatcher.run() + + asyncio.run(main()) + + +@app.command() +def stop(): + """ + Stop the PR-Agent polling daemon. + """ + pid = _get_pid() + + if not pid: + console.print("[yellow]Daemon is not running[/yellow]") + return + + console.print(f"Stopping daemon (PID: {pid})...") + + try: + os.kill(pid, signal.SIGTERM) + + # Wait for process to exit + import time + for _ in range(10): + time.sleep(0.5) + try: + os.kill(pid, 0) + except ProcessLookupError: + break + else: + # Force kill if still running + console.print("[yellow]Process not responding, sending SIGKILL...[/yellow]") + os.kill(pid, signal.SIGKILL) + + DEFAULT_PID_FILE.unlink(missing_ok=True) + console.print("[green]βœ“[/green] Daemon stopped") + + except ProcessLookupError: + console.print("[yellow]Process already stopped[/yellow]") + DEFAULT_PID_FILE.unlink(missing_ok=True) + except PermissionError: + console.print("[red]Error:[/red] Permission denied. Try with sudo.") + raise typer.Exit(1) + + +@app.command() +def status(): + """ + Show PR-Agent daemon status. + """ + config = load_config() + pid = _get_pid() + + # Status header + if pid: + console.print(f"[green]Daemon:[/green] Running (PID: {pid})") + else: + console.print("[yellow]Daemon:[/yellow] Stopped") + + console.print() + + # Configuration + console.print("[bold]Configuration:[/bold]") + console.print(f" Config file: {DEFAULT_CONFIG_FILE}") + console.print(f" Log directory: {DEFAULT_LOG_DIR}") + console.print(f" GitHub poll: {config.settings.poll_interval_github}s") + console.print(f" Railway poll: {config.settings.poll_interval_railway}s") + console.print(f" Model: {config.settings.model}") + + console.print() + + # Credentials status + console.print("[bold]Credentials:[/bold]") + github_token = get_credential("github_token", config) + anthropic_key = get_credential("anthropic_api_key", config) + railway_token = get_credential("railway_token", config) + + console.print(f" GitHub token: {'[green]βœ“[/green]' if github_token else '[red]βœ—[/red]'}") + console.print(f" Anthropic key: {'[green]βœ“[/green]' if anthropic_key else '[red]βœ—[/red]'}") + console.print(f" Railway token: {'[green]βœ“[/green]' if railway_token else '[dim]not set[/dim]'}") + + console.print() + + # Repos table + console.print("[bold]Tracked Repositories:[/bold]") + if not config.repos: + console.print(" [dim]No repositories configured. Run 'pr-agent init' to add one.[/dim]") + else: + table = Table(show_header=True, header_style="bold") + table.add_column("Path") + table.add_column("GitHub") + table.add_column("Railway") + table.add_column("Auto") + + for repo in config.repos: + auto_flags = [] + if repo.auto_review: + auto_flags.append("review") + if repo.auto_describe: + auto_flags.append("describe") + + table.add_row( + repo.path, + repo.github_repo, + repo.railway_project_id or "-", + ", ".join(auto_flags) if auto_flags else "-", + ) + + console.print(table) + + +@app.command() +def config( + action: str = typer.Argument( + "show", + help="Action: show, set, get", + ), + key: Optional[str] = typer.Argument( + None, + help="Config key (e.g., 'github_token', 'settings.poll_interval')", + ), + value: Optional[str] = typer.Argument( + None, + help="Value to set", + ), +): + """ + View or edit PR-Agent configuration. + + Examples: + pr-agent config # Show all config + pr-agent config get github_token # Get a credential + pr-agent config set github_token ghp_xxx # Set a credential + """ + cfg = load_config() + + if action == "show": + # Show full config (redact secrets) + import json + data = cfg.model_dump() + + # Redact credentials + for cred_key in data.get("credentials", {}): + val = data["credentials"][cred_key] + if val: + data["credentials"][cred_key] = val[:8] + "..." if len(val) > 8 else "***" + + console.print_json(json.dumps(data, indent=2)) + + elif action == "get": + if not key: + console.print("[red]Error:[/red] Key required for 'get' action") + raise typer.Exit(1) + + # Check credentials first + if hasattr(cfg.credentials, key): + val = getattr(cfg.credentials, key) + if val: + console.print(val[:8] + "..." if len(val) > 8 else val) + else: + console.print("[dim]not set[/dim]") + elif hasattr(cfg.settings, key): + console.print(str(getattr(cfg.settings, key))) + else: + console.print(f"[red]Error:[/red] Unknown key: {key}") + raise typer.Exit(1) + + elif action == "set": + if not key or value is None: + console.print("[red]Error:[/red] Key and value required for 'set' action") + raise typer.Exit(1) + + # Set credential or setting + if hasattr(cfg.credentials, key): + setattr(cfg.credentials, key, value) + save_config(cfg) + console.print(f"[green]βœ“[/green] Set credentials.{key}") + elif hasattr(cfg.settings, key): + # Try to convert to appropriate type + current = getattr(cfg.settings, key) + if isinstance(current, int): + value = int(value) + elif isinstance(current, bool): + value = value.lower() in ("true", "1", "yes") + setattr(cfg.settings, key, value) + save_config(cfg) + console.print(f"[green]βœ“[/green] Set settings.{key}") + else: + console.print(f"[red]Error:[/red] Unknown key: {key}") + raise typer.Exit(1) + + else: + console.print(f"[red]Error:[/red] Unknown action: {action}") + console.print("Valid actions: show, get, set") + raise typer.Exit(1) + + +@app.command() +def remove( + path: Optional[Path] = typer.Argument( + None, + help="Path to repository to remove (default: current directory)", + ), +): + """ + Remove a repository from PR-Agent config. + """ + repo_path = path or Path.cwd() + repo_path = repo_path.resolve() + + cfg = load_config() + + if cfg.remove_repo(str(repo_path)): + save_config(cfg) + console.print(f"[green]βœ“[/green] Removed {repo_path} from config") + else: + console.print(f"[yellow]Repository not found in config:[/yellow] {repo_path}") + + +def main(): + """Entry point for the CLI.""" + app() + + +if __name__ == "__main__": + main() diff --git a/pr_agent/polling/__init__.py b/pr_agent/polling/__init__.py new file mode 100644 index 0000000000..aa0021b67f --- /dev/null +++ b/pr_agent/polling/__init__.py @@ -0,0 +1,20 @@ +""" +Polling infrastructure for local PR-Agent daemon. + +This module provides polling-based event sources as an alternative to webhooks, +enabling PR-Agent to run as a local service that monitors GitHub notifications +and Railway deployments. +""" + +from pr_agent.polling.base import BasePoller +from pr_agent.polling.events import Event, EventType, GitHubMentionEvent, RailwayDeployEvent +from pr_agent.polling.dispatcher import EventDispatcher + +__all__ = [ + "BasePoller", + "Event", + "EventType", + "GitHubMentionEvent", + "RailwayDeployEvent", + "EventDispatcher", +] diff --git a/pr_agent/polling/base.py b/pr_agent/polling/base.py new file mode 100644 index 0000000000..8de3628d0d --- /dev/null +++ b/pr_agent/polling/base.py @@ -0,0 +1,181 @@ +""" +Abstract base class for polling event sources. + +Adapted from pr_agent/servers/github_polling.py patterns. +""" + +from abc import ABC, abstractmethod +from typing import AsyncIterator, Optional +from datetime import datetime +import asyncio +import logging + +from pr_agent.polling.events import Event + +logger = logging.getLogger(__name__) + + +class BasePoller(ABC): + """ + Abstract base class for notification pollers. + + Subclasses implement poll() to fetch events from their respective sources + (GitHub notifications, Railway deployments, etc.). + + Usage: + class MyPoller(BasePoller): + async def poll(self) -> list[Event]: + # Fetch and return events + ... + + poller = MyPoller(poll_interval=30) + async for event in poller.run(): + await handle(event) + """ + + def __init__( + self, + poll_interval: int = 30, + name: str = "poller", + max_consecutive_errors: int = 5, + error_backoff_multiplier: float = 2.0, + ): + """ + Initialize the poller. + + Args: + poll_interval: Seconds between polls (default: 30) + name: Identifier for logging + max_consecutive_errors: Errors before entering backoff mode + error_backoff_multiplier: Multiplier for backoff duration + """ + self.poll_interval = poll_interval + self.name = name + self.max_consecutive_errors = max_consecutive_errors + self.error_backoff_multiplier = error_backoff_multiplier + + self._running = False + self._last_poll: Optional[datetime] = None + self._error_count = 0 + self._total_events = 0 + self._total_polls = 0 + + @abstractmethod + async def poll(self) -> list[Event]: + """ + Poll for new events. + + Subclasses must implement this method to fetch events from their + respective sources. Should return an empty list if no new events. + + Returns: + List of Event objects (may be empty) + + Raises: + Exception: On polling errors (will be caught by run()) + """ + pass + + async def setup(self) -> None: + """ + Optional setup hook called before polling starts. + + Override to perform initialization (e.g., verify credentials, + establish connections). + """ + pass + + async def teardown(self) -> None: + """ + Optional teardown hook called when polling stops. + + Override to perform cleanup (e.g., close connections). + """ + pass + + async def run(self) -> AsyncIterator[Event]: + """ + Run the polling loop, yielding events as they arrive. + + This is an async generator that: + 1. Calls setup() once + 2. Repeatedly calls poll() at poll_interval + 3. Yields events as they arrive + 4. Handles errors with exponential backoff + 5. Calls teardown() when stopped + + Yields: + Event objects from the poll source + + Example: + async for event in poller.run(): + await process(event) + """ + self._running = True + logger.info(f"[{self.name}] Starting polling (interval={self.poll_interval}s)") + + try: + await self.setup() + + while self._running: + try: + events = await self.poll() + self._last_poll = datetime.utcnow() + self._total_polls += 1 + self._error_count = 0 # Reset on success + + for event in events: + self._total_events += 1 + logger.debug(f"[{self.name}] Yielding event: {event.type.value}") + yield event + + except Exception as e: + self._error_count += 1 + logger.error( + f"[{self.name}] Poll error ({self._error_count}/{self.max_consecutive_errors}): {e}" + ) + + if self._error_count >= self.max_consecutive_errors: + backoff = self.poll_interval * self.error_backoff_multiplier + logger.warning( + f"[{self.name}] Max errors reached, backing off for {backoff}s" + ) + await asyncio.sleep(backoff) + self._error_count = 0 + + await asyncio.sleep(self.poll_interval) + + finally: + await self.teardown() + logger.info( + f"[{self.name}] Stopped. " + f"Total polls: {self._total_polls}, events: {self._total_events}" + ) + + def stop(self) -> None: + """Stop the polling loop gracefully.""" + self._running = False + logger.info(f"[{self.name}] Stop requested") + + @property + def is_running(self) -> bool: + """Check if the poller is currently running.""" + return self._running + + @property + def last_poll(self) -> Optional[datetime]: + """Get the timestamp of the last successful poll.""" + return self._last_poll + + @property + def stats(self) -> dict: + """Get polling statistics.""" + return { + "name": self.name, + "running": self._running, + "poll_interval": self.poll_interval, + "last_poll": self._last_poll.isoformat() if self._last_poll else None, + "total_polls": self._total_polls, + "total_events": self._total_events, + "consecutive_errors": self._error_count, + } diff --git a/pr_agent/polling/dispatcher.py b/pr_agent/polling/dispatcher.py new file mode 100644 index 0000000000..823fc61b2d --- /dev/null +++ b/pr_agent/polling/dispatcher.py @@ -0,0 +1,257 @@ +""" +Event dispatcher for routing polling events to handlers. + +Aggregates events from multiple pollers and routes them to appropriate +handlers based on event type. +""" + +import asyncio +from collections import defaultdict +from typing import Awaitable, Callable, Optional +import logging + +from pr_agent.polling.base import BasePoller +from pr_agent.polling.events import Event, EventType, GitHubMentionEvent + +logger = logging.getLogger(__name__) + +# Type alias for event handlers +EventHandler = Callable[[Event], Awaitable[None]] + + +class EventDispatcher: + """ + Aggregates events from multiple pollers and dispatches to handlers. + + Usage: + dispatcher = EventDispatcher() + + # Add pollers + dispatcher.add_poller(github_poller) + dispatcher.add_poller(railway_poller) + + # Register handlers + @dispatcher.on(EventType.GITHUB_PR_MENTION) + async def handle_mention(event: GitHubMentionEvent): + await pr_agent.handle_request(event.pr_url, event.command) + + # Run + await dispatcher.run() + """ + + def __init__(self): + self.pollers: list[BasePoller] = [] + self.handlers: dict[EventType, list[EventHandler]] = defaultdict(list) + self._running = False + self._tasks: list[asyncio.Task] = [] + + def add_poller(self, poller: BasePoller) -> None: + """ + Add a poller to the dispatcher. + + Args: + poller: BasePoller instance to add + """ + self.pollers.append(poller) + logger.info(f"Added poller: {poller.name}") + + def on(self, event_type: EventType) -> Callable[[EventHandler], EventHandler]: + """ + Decorator to register an event handler. + + Args: + event_type: Type of event to handle + + Returns: + Decorator function + + Example: + @dispatcher.on(EventType.GITHUB_PR_MENTION) + async def handle_mention(event: GitHubMentionEvent): + ... + """ + + def decorator(func: EventHandler) -> EventHandler: + self.handlers[event_type].append(func) + logger.debug(f"Registered handler for {event_type.value}: {func.__name__}") + return func + + return decorator + + def register_handler( + self, event_type: EventType, handler: EventHandler + ) -> None: + """ + Register an event handler programmatically. + + Args: + event_type: Type of event to handle + handler: Async function to call when event occurs + """ + self.handlers[event_type].append(handler) + logger.debug(f"Registered handler for {event_type.value}: {handler.__name__}") + + async def dispatch(self, event: Event) -> None: + """ + Dispatch an event to registered handlers. + + Args: + event: Event to dispatch + """ + handlers = self.handlers.get(event.type, []) + + if not handlers: + logger.debug(f"No handlers for event type: {event.type.value}") + return + + logger.info(f"Dispatching {event.type.value} to {len(handlers)} handler(s)") + + # Run handlers concurrently + tasks = [self._safe_call(handler, event) for handler in handlers] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Log any errors + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.error( + f"Handler {handlers[i].__name__} failed for {event.type.value}: {result}" + ) + + async def _safe_call(self, handler: EventHandler, event: Event) -> None: + """ + Safely call a handler with error handling. + + Args: + handler: Handler function to call + event: Event to pass to handler + """ + try: + await handler(event) + except Exception as e: + logger.exception(f"Handler {handler.__name__} raised exception: {e}") + raise + + async def _run_poller(self, poller: BasePoller) -> None: + """ + Run a single poller and dispatch its events. + + Args: + poller: Poller to run + """ + try: + async for event in poller.run(): + await self.dispatch(event) + except asyncio.CancelledError: + logger.info(f"Poller {poller.name} cancelled") + raise + except Exception as e: + logger.error(f"Poller {poller.name} crashed: {e}") + raise + + async def run(self) -> None: + """ + Run all pollers concurrently and dispatch events. + + This is the main entry point. Runs until stop() is called + or all pollers complete. + """ + if not self.pollers: + logger.warning("No pollers configured, nothing to do") + return + + self._running = True + logger.info(f"Starting dispatcher with {len(self.pollers)} poller(s)") + + # Create tasks for each poller + self._tasks = [ + asyncio.create_task(self._run_poller(poller), name=f"poller-{poller.name}") + for poller in self.pollers + ] + + try: + # Wait for all pollers (they run indefinitely unless stopped) + await asyncio.gather(*self._tasks) + except asyncio.CancelledError: + logger.info("Dispatcher cancelled") + finally: + self._running = False + + async def stop(self) -> None: + """ + Stop all pollers gracefully. + """ + logger.info("Stopping dispatcher...") + + # Signal all pollers to stop + for poller in self.pollers: + poller.stop() + + # Cancel all tasks + for task in self._tasks: + task.cancel() + + # Wait for tasks to complete + if self._tasks: + await asyncio.gather(*self._tasks, return_exceptions=True) + + self._tasks = [] + self._running = False + logger.info("Dispatcher stopped") + + @property + def is_running(self) -> bool: + """Check if the dispatcher is currently running.""" + return self._running + + @property + def stats(self) -> dict: + """Get aggregated statistics from all pollers.""" + return { + "running": self._running, + "poller_count": len(self.pollers), + "handler_count": sum(len(h) for h in self.handlers.values()), + "pollers": [p.stats for p in self.pollers], + } + + +async def create_pr_agent_handler() -> EventHandler: + """ + Create a handler that integrates with existing PR-Agent tools. + + Returns: + EventHandler that routes to PRAgent.handle_request() + """ + from pr_agent.agent.pr_agent import PRAgent + + agent = PRAgent() + + async def handle_github_mention(event: Event) -> None: + """Route GitHub mentions to PR-Agent.""" + if not isinstance(event, GitHubMentionEvent): + return + + if not event.command: + logger.debug(f"No command in mention, skipping: {event.id}") + return + + if not event.pr_url: + logger.debug(f"Not a PR mention, skipping: {event.id}") + return + + logger.info(f"Processing {event.command} for PR: {event.pr_url}") + + try: + # Build request string from command and args + request = event.command + if event.command_args: + request += " " + " ".join(event.command_args) + + await agent.handle_request( + pr_url=event.pr_url, + request=request, + ) + except Exception as e: + logger.error(f"PR-Agent failed to process {event.command}: {e}") + raise + + return handle_github_mention diff --git a/pr_agent/polling/events.py b/pr_agent/polling/events.py new file mode 100644 index 0000000000..44f23cbad4 --- /dev/null +++ b/pr_agent/polling/events.py @@ -0,0 +1,162 @@ +""" +Event models for polling-based PR-Agent. + +Defines structured Pydantic models for events from different sources +(GitHub notifications, Railway deployments). +""" + +from enum import Enum +from datetime import datetime +from typing import Any, Optional +from pydantic import BaseModel, Field + + +class EventType(str, Enum): + """Types of events the polling agent can process.""" + + # GitHub events + GITHUB_PR_MENTION = "github.pr.mention" + GITHUB_PR_OPENED = "github.pr.opened" + GITHUB_PR_UPDATED = "github.pr.updated" + GITHUB_PR_REVIEW_REQUESTED = "github.pr.review_requested" + GITHUB_ISSUE_MENTION = "github.issue.mention" + + # Railway events + RAILWAY_DEPLOY_SUCCESS = "railway.deploy.success" + RAILWAY_DEPLOY_FAILED = "railway.deploy.failed" + RAILWAY_DEPLOY_BUILDING = "railway.deploy.building" + RAILWAY_SERVICE_CRASHED = "railway.service.crashed" + + +class Event(BaseModel): + """ + Base event model with common fields. + + All event types inherit from this base class. + """ + + id: str = Field(..., description="Unique event identifier") + type: EventType = Field(..., description="Event type") + timestamp: datetime = Field( + default_factory=datetime.utcnow, description="When the event occurred" + ) + source: str = Field(..., description="Event source (github, railway)") + raw_data: dict[str, Any] = Field( + default_factory=dict, description="Original payload from source" + ) + + model_config = {"extra": "allow"} + + +class GitHubMentionEvent(Event): + """ + Event for @mentions on GitHub PRs or Issues. + + Triggered when the bot is mentioned in a comment with an optional command. + """ + + source: str = "github" + + # Repository info + repo_full_name: str = Field(..., description="Full repo name (owner/repo)") + + # PR/Issue info (one will be set) + pr_number: Optional[int] = Field(None, description="PR number if mention is on PR") + issue_number: Optional[int] = Field( + None, description="Issue number if mention is on issue" + ) + + # Comment info + comment_id: int = Field(..., description="GitHub comment ID") + comment_body: str = Field(..., description="Full comment text") + comment_author: str = Field(..., description="GitHub username of commenter") + html_url: str = Field(..., description="URL to the comment") + + # Parsed command info + command: Optional[str] = Field( + None, description="Parsed command (e.g., '/review', '/describe')" + ) + command_args: list[str] = Field( + default_factory=list, description="Arguments to the command" + ) + + @property + def pr_url(self) -> Optional[str]: + """Get the PR URL if this is a PR mention.""" + if self.pr_number: + return f"https://github.com/{self.repo_full_name}/pull/{self.pr_number}" + return None + + @property + def is_pr_mention(self) -> bool: + """Check if this mention is on a PR (vs an issue).""" + return self.pr_number is not None + + +class RailwayDeployEvent(Event): + """ + Event for Railway deployment status changes. + + Triggered when a deployment succeeds, fails, or crashes. + """ + + source: str = "railway" + + # Service info + service_id: str = Field(..., description="Railway service ID") + service_name: str = Field(..., description="Human-readable service name") + + # Environment info + environment_id: str = Field(..., description="Railway environment ID") + environment_name: str = Field( + ..., description="Environment name (e.g., 'production', 'staging')" + ) + + # Deployment info + deploy_id: str = Field(..., description="Railway deployment ID") + status: str = Field( + ..., description="Deployment status (SUCCESS, FAILED, CRASHED, BUILDING)" + ) + + # Git info (if available) + commit_sha: Optional[str] = Field(None, description="Git commit SHA") + commit_message: Optional[str] = Field(None, description="Git commit message") + + # Error info (if failed/crashed) + error_message: Optional[str] = Field( + None, description="Error message if deployment failed" + ) + logs_url: Optional[str] = Field(None, description="URL to deployment logs") + + @property + def is_failure(self) -> bool: + """Check if this is a failure event.""" + return self.status in ("FAILED", "CRASHED") + + @property + def is_success(self) -> bool: + """Check if this is a success event.""" + return self.status == "SUCCESS" + + +class GitHubPROpenedEvent(Event): + """ + Event for newly opened or reopened PRs. + + Triggered when auto-review is enabled for a repository. + """ + + source: str = "github" + + repo_full_name: str = Field(..., description="Full repo name (owner/repo)") + pr_number: int = Field(..., description="PR number") + pr_title: str = Field(..., description="PR title") + pr_author: str = Field(..., description="GitHub username of PR author") + pr_url: str = Field(..., description="URL to the PR") + is_draft: bool = Field(False, description="Whether the PR is a draft") + + # Auto-commands to run + auto_commands: list[str] = Field( + default_factory=list, + description="Commands to run automatically (e.g., ['/review', '/describe'])", + ) diff --git a/pr_agent/polling/github.py b/pr_agent/polling/github.py new file mode 100644 index 0000000000..af62778cc1 --- /dev/null +++ b/pr_agent/polling/github.py @@ -0,0 +1,222 @@ +""" +GitHub Notification Poller. + +Polls GitHub's Notifications API for @mentions on PRs and issues. +Adapted from pr_agent/servers/github_polling.py patterns. + +Implementation Status: STUB +Branch: feature/github-poller +""" + +import re +from typing import Optional +import logging + +import httpx + +from pr_agent.polling.base import BasePoller +from pr_agent.polling.events import ( + Event, + EventType, + GitHubMentionEvent, + GitHubPROpenedEvent, +) + +logger = logging.getLogger(__name__) + +# GitHub API endpoints +GITHUB_API_BASE = "https://api.github.com" +NOTIFICATIONS_URL = f"{GITHUB_API_BASE}/notifications" + + +class GitHubPoller(BasePoller): + """ + Poll GitHub notifications for @mentions. + + Uses If-Modified-Since header for efficient polling (304 responses). + Filters for mentions on PRs/issues in tracked repositories. + + Usage: + poller = GitHubPoller( + token="ghp_xxx", + username="my-bot", + poll_interval=30, + ) + async for event in poller.run(): + if event.type == EventType.GITHUB_PR_MENTION: + await handle_pr_mention(event) + """ + + def __init__( + self, + token: str, + username: str, + poll_interval: int = 30, + tracked_repos: Optional[list[str]] = None, + ): + """ + Initialize the GitHub poller. + + Args: + token: GitHub personal access token or app token + username: Bot's GitHub username (for detecting @mentions) + poll_interval: Seconds between polls (default: 30) + tracked_repos: Optional list of repos to filter (e.g., ["owner/repo"]) + If None, processes all notifications + """ + super().__init__(poll_interval=poll_interval, name="github") + + self.token = token + self.username = username + self.user_tag = f"@{username}" + self.tracked_repos = set(tracked_repos) if tracked_repos else None + + # Caching for efficient polling (304 Not Modified) + self._last_modified: Optional[str] = None + self._etag: Optional[str] = None + + # Deduplication + self._handled_notification_ids: set[str] = set() + self._handled_comment_ids: set[int] = set() + + # HTTP client (created in setup) + self._client: Optional[httpx.AsyncClient] = None + + async def setup(self) -> None: + """Initialize HTTP client.""" + self._client = httpx.AsyncClient(timeout=30.0) + logger.info(f"[{self.name}] Initialized for user @{self.username}") + + async def teardown(self) -> None: + """Close HTTP client.""" + if self._client: + await self._client.aclose() + self._client = None + + @property + def _headers(self) -> dict[str, str]: + """Get headers for GitHub API requests.""" + headers = { + "Authorization": f"Bearer {self.token}", + "Accept": "application/vnd.github.v3+json", + "X-GitHub-Api-Version": "2022-11-28", + } + # Add caching headers for efficient polling + if self._last_modified: + headers["If-Modified-Since"] = self._last_modified + if self._etag: + headers["If-None-Match"] = self._etag + return headers + + async def poll(self) -> list[Event]: + """ + Poll GitHub notifications for new events. + + Returns: + List of GitHubMentionEvent objects + + TODO: Implement in feature/github-poller branch + - Fetch notifications with participating=true + - Handle 304 Not Modified responses + - Filter for mentions on PRs/issues + - Parse commands from comment body + - Mark notifications as read + """ + # STUB: Implementation goes in feature/github-poller branch + raise NotImplementedError( + "GitHubPoller.poll() not yet implemented. " + "See feature/github-poller branch." + ) + + async def _fetch_notifications(self) -> Optional[list[dict]]: + """ + Fetch notifications from GitHub API. + + Returns: + List of notification dicts, or None if not modified + + TODO: Implement + - GET /notifications?participating=true + - Update _last_modified and _etag from response headers + - Return None on 304 Not Modified + """ + raise NotImplementedError("See feature/github-poller branch") + + async def _process_notification(self, notification: dict) -> Optional[Event]: + """ + Process a single notification into an Event. + + Args: + notification: Raw notification from GitHub API + + Returns: + Event object if valid, None if should be skipped + + TODO: Implement + - Check if notification is a mention + - Fetch the comment that triggered it + - Check if our user_tag is in the comment + - Parse any commands (/review, /describe, etc.) + - Create and return GitHubMentionEvent + """ + raise NotImplementedError("See feature/github-poller branch") + + async def _mark_notification_read(self, notification_id: str) -> None: + """ + Mark a notification as read. + + Args: + notification_id: GitHub notification thread ID + + TODO: Implement + - PATCH /notifications/threads/{id} + """ + raise NotImplementedError("See feature/github-poller branch") + + def _parse_command(self, comment_body: str) -> tuple[Optional[str], list[str]]: + """ + Parse command and arguments from comment body. + + Looks for patterns like: + - @bot /review + - @bot /describe --generate_ai_title=true + - @bot /ask What is this PR about? + + Args: + comment_body: Full comment text + + Returns: + Tuple of (command, args) or (None, []) if no command found + + Example: + >>> _parse_command("@bot /review --focus=security please review") + ("/review", ["--focus=security", "please", "review"]) + """ + # Find text after mention + pattern = rf"{re.escape(self.user_tag)}\s*(/\w+)?\s*(.*)" + match = re.search(pattern, comment_body, re.IGNORECASE | re.DOTALL) + + if not match: + return None, [] + + command = match.group(1) # e.g., "/review" or None + rest = match.group(2).strip() + + # Parse arguments (simple split for now) + args = rest.split() if rest else [] + + return command, args + + def _should_process_repo(self, repo_full_name: str) -> bool: + """ + Check if a repository should be processed. + + Args: + repo_full_name: Full repo name (owner/repo) + + Returns: + True if repo should be processed + """ + if self.tracked_repos is None: + return True # No filter, process all + return repo_full_name in self.tracked_repos diff --git a/pr_agent/polling/railway.py b/pr_agent/polling/railway.py new file mode 100644 index 0000000000..d363e53eb3 --- /dev/null +++ b/pr_agent/polling/railway.py @@ -0,0 +1,230 @@ +""" +Railway Deployment Poller. + +Polls Railway's GraphQL API for deployment status changes. + +Implementation Status: STUB +Branch: feature/railway-poller +""" + +from typing import Optional +import logging + +import httpx + +from pr_agent.polling.base import BasePoller +from pr_agent.polling.events import Event, EventType, RailwayDeployEvent + +logger = logging.getLogger(__name__) + +# Railway API endpoint +RAILWAY_API_URL = "https://backboard.railway.app/graphql/v2" + + +class RailwayPoller(BasePoller): + """ + Poll Railway API for deployment status changes. + + Tracks deployments across services and emits events when status changes + to SUCCESS, FAILED, or CRASHED. + + Usage: + poller = RailwayPoller( + token="railway_xxx", + project_id="abc123", + poll_interval=60, + ) + async for event in poller.run(): + if event.is_failure: + await alert_team(event) + """ + + def __init__( + self, + token: str, + project_id: str, + poll_interval: int = 60, + service_ids: Optional[list[str]] = None, + environments: Optional[list[str]] = None, + ): + """ + Initialize the Railway poller. + + Args: + token: Railway API token + project_id: Railway project ID to monitor + poll_interval: Seconds between polls (default: 60) + service_ids: Optional list of service IDs to filter + If None, monitors all services in project + environments: Optional list of environment names to filter + (e.g., ["production", "staging"]) + """ + super().__init__(poll_interval=poll_interval, name="railway") + + self.token = token + self.project_id = project_id + self.service_ids = set(service_ids) if service_ids else None + self.environments = set(environments) if environments else None + + # Track deployment states for change detection + self._deployment_states: dict[str, str] = {} # deploy_id -> status + + # HTTP client (created in setup) + self._client: Optional[httpx.AsyncClient] = None + + async def setup(self) -> None: + """Initialize HTTP client.""" + self._client = httpx.AsyncClient(timeout=30.0) + logger.info(f"[{self.name}] Initialized for project {self.project_id}") + + async def teardown(self) -> None: + """Close HTTP client.""" + if self._client: + await self._client.aclose() + self._client = None + + @property + def _headers(self) -> dict[str, str]: + """Get headers for Railway API requests.""" + return { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json", + } + + async def poll(self) -> list[Event]: + """ + Poll Railway API for deployment status changes. + + Returns: + List of RailwayDeployEvent objects for deployments that changed + to terminal states (SUCCESS, FAILED, CRASHED) + + TODO: Implement in feature/railway-poller branch + - Query deployments via GraphQL + - Track state changes + - Emit events only for terminal state transitions + - Filter by service_ids and environments if specified + """ + # STUB: Implementation goes in feature/railway-poller branch + raise NotImplementedError( + "RailwayPoller.poll() not yet implemented. " + "See feature/railway-poller branch." + ) + + async def _fetch_deployments(self) -> list[dict]: + """ + Fetch recent deployments from Railway API. + + Returns: + List of deployment dicts from GraphQL response + + TODO: Implement + - GraphQL query for deployments + - Handle pagination if needed + - Return normalized deployment data + """ + raise NotImplementedError("See feature/railway-poller branch") + + def _build_deployments_query(self) -> str: + """ + Build GraphQL query for fetching deployments. + + Returns: + GraphQL query string + + Example query: + query GetDeployments($projectId: String!) { + deployments(input: { projectId: $projectId }, first: 20) { + edges { + node { + id + status + createdAt + service { id name } + environment { id name } + meta { commitHash commitMessage } + } + } + } + } + """ + return """ + query GetDeployments($projectId: String!) { + deployments(input: { projectId: $projectId }, first: 20) { + edges { + node { + id + status + createdAt + service { id name } + environment { id name } + meta { commitHash commitMessage } + } + } + } + } + """ + + def _process_deployment(self, deployment: dict) -> Optional[RailwayDeployEvent]: + """ + Process a deployment into an event if status changed to terminal state. + + Args: + deployment: Deployment data from GraphQL + + Returns: + RailwayDeployEvent if should emit, None otherwise + + TODO: Implement + - Check if deployment matches filters (service_ids, environments) + - Check if status changed from previous poll + - Only emit for terminal states (SUCCESS, FAILED, CRASHED) + - Create and return RailwayDeployEvent + """ + raise NotImplementedError("See feature/railway-poller branch") + + def _should_process_service(self, service_id: str) -> bool: + """ + Check if a service should be processed. + + Args: + service_id: Railway service ID + + Returns: + True if service should be processed + """ + if self.service_ids is None: + return True # No filter, process all + return service_id in self.service_ids + + def _should_process_environment(self, env_name: str) -> bool: + """ + Check if an environment should be processed. + + Args: + env_name: Environment name + + Returns: + True if environment should be processed + """ + if self.environments is None: + return True # No filter, process all + return env_name in self.environments + + def _map_status_to_event_type(self, status: str) -> Optional[EventType]: + """ + Map Railway deployment status to EventType. + + Args: + status: Railway status string + + Returns: + EventType or None if status should not trigger event + """ + status_map = { + "SUCCESS": EventType.RAILWAY_DEPLOY_SUCCESS, + "FAILED": EventType.RAILWAY_DEPLOY_FAILED, + "CRASHED": EventType.RAILWAY_SERVICE_CRASHED, + "BUILDING": EventType.RAILWAY_DEPLOY_BUILDING, + } + return status_map.get(status) diff --git a/requirements.txt b/requirements.txt index d303ea347a..deeceb7f6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,6 +31,9 @@ gunicorn==23.0.0 pytest-cov==5.0.0 pydantic==2.8.2 html2text==2024.2.26 +typer>=0.12.0 +rich>=13.7.0 +httpx>=0.27.0 giteapy==1.0.8 # Uncomment the following lines to enable the 'similar issue' tool # pinecone-client From dc8d6889cdc66c74ab9710f0711028a0a00aecca Mon Sep 17 00:00:00 2001 From: research-developer Date: Thu, 18 Dec 2025 09:22:58 -0600 Subject: [PATCH 2/7] docs: Update README for local daemon fork MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Qodo marketing content with documentation for the local polling-based daemon: - Quick start guide - CLI command reference - Configuration file format - Architecture diagram - Usage examples for PR mentions and Railway integration - Development setup instructions πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 212 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 188 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index cf0f433443..b6875cb284 100644 --- a/README.md +++ b/README.md @@ -1,36 +1,200 @@ -> [!NOTE] -> PR Agent is an open-source project originally created by Qodo, the company behind next-generation AI code review. This repository represents the legacy version of the project and is provided as-is for the community to explore, learn from, and build upon. The project now has its first external maintainer, Naor (([@naorpeled](https://github.com/naorpeled))), and is currently in the process of being donated to an open-source foundation. +# PR-Agent Local Daemon -## πŸš€ About -PR Agent was the first AI assistant for pull requests, built by Qodo, and contributed to the open-source community. -It represents the first generation of intelligent code review - the project that started Qodo’s journey toward fully AI-driven development, Code Review. -If you enjoy this project, you’ll love the next-level PR Agent - Qodo free tier version, which is faster, smarter, and built for today’s workflows. +A local polling-based PR agent that monitors GitHub notifications and Railway deployments, running as a background service on your machine. -πŸš€ Qodo includes a free user trial, 250 tokens, bonus tokens for active contributors, and 50% more advanced features than this open-source version. +> **Fork Note**: This is a fork of [Qodo's PR-Agent](https://github.com/qodo-ai/pr-agent) adapted for local daemon operation with polling instead of webhooks. -If you have an open-source project, you can get the Qodo paid version for free for your project, powered by Google Gemini 2.5 Pro – [https://www.qodo.ai/solutions/open-source/](https://www.qodo.ai/solutions/open-source/) +## Features ---- +- **GitHub Notification Polling** - Responds to @mentions on PRs with AI-powered reviews, descriptions, and suggestions +- **Railway Deployment Monitoring** - Tracks deployment status and can analyze failures +- **Local Daemon** - Runs as a background service, no webhook infrastructure needed +- **Multi-Repo Support** - Monitor multiple repositories from a single daemon +- **Simple CLI** - Easy setup and management -## ✨ Advanced Features in Qodo +## Quick Start -### 🧭 PR β†’ Ticket Automation -Seamlessly links pull requests to your project tracking system for end-to-end visibility. +```bash +# Install +pip install -e . -### βœ… Auto Best Practices -Learns your team’s standards and automatically enforces them during code reviews. +# Initialize a repository +cd ~/projects/myapp +pr-agent init -### πŸ§ͺ Code Validation -Performs advanced static and semantic analysis to catch issues before merge. +# Configure credentials (or set environment variables) +pr-agent config set github_token ghp_xxx +pr-agent config set anthropic_api_key sk-ant-xxx -### πŸ’¬ PR Chat Interface -Lets you converse with your PR to explain, summarize, or suggest improvements instantly. +# Start the daemon +pr-agent start -### πŸ” Impact Evaluation -Analyzes the business and technical effect of each change before approval. +# Check status +pr-agent status +``` ---- +## CLI Commands -## ❀️ Community -This open-source release remains here as a community contribution from Qodo β€” the origin of modern AI-powered code collaboration. -We’re proud to share it and inspire developers worldwide. +| Command | Description | +|---------|-------------| +| `pr-agent init` | Add current repo to config | +| `pr-agent start` | Start polling daemon | +| `pr-agent stop` | Stop daemon | +| `pr-agent status` | Show daemon status and tracked repos | +| `pr-agent config` | View/edit configuration | +| `pr-agent remove` | Remove repo from config | + +## Configuration + +Configuration is stored at `~/.pr-agent/config.json`: + +```json +{ + "version": "1.0", + "credentials": { + "github_token": "ghp_xxx", + "anthropic_api_key": "sk-ant-xxx", + "railway_token": "railway_xxx" + }, + "settings": { + "poll_interval_github": 30, + "poll_interval_railway": 60, + "model": "claude-sonnet-4-20250514", + "log_level": "INFO", + "github_username": "my-bot" + }, + "repos": [ + { + "path": "/Users/you/projects/myapp", + "github_repo": "owner/myapp", + "railway_project_id": "abc123", + "auto_review": true, + "auto_describe": false + } + ] +} +``` + +### Environment Variables + +Credentials can also be set via environment variables (takes precedence over config file): + +- `GITHUB_TOKEN` - GitHub personal access token +- `ANTHROPIC_API_KEY` - Anthropic API key +- `RAILWAY_TOKEN` - Railway API token + +## Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ pr-agent CLI β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ init - Add repo to config β”‚ +β”‚ start - Start polling daemon β”‚ +β”‚ stop - Stop daemon β”‚ +β”‚ status - Show status β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ ~/.pr-agent/ β”‚ +β”‚ config.json - Repos, credentials, settings β”‚ +β”‚ daemon.pid - PID file β”‚ +β”‚ logs/ - Log files β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Polling Daemon β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ GitHubPoller - Poll notifications for @mentions β”‚ +β”‚ RailwayPoller - Poll deployment status changes β”‚ +β”‚ EventDispatcher - Route events to handlers β”‚ +β”‚ PRAgent - Generate AI responses β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## Usage Examples + +### Respond to PR Mentions + +When someone mentions your bot on a PR: + +``` +@my-bot /review +``` + +The daemon will: +1. Detect the notification via polling +2. Parse the command (`/review`) +3. Run PR-Agent's review tool +4. Post the response as a comment + +### Available Commands + +When mentioned on a PR, the bot responds to: + +| Command | Description | +|---------|-------------| +| `/review` | Comprehensive code review | +| `/describe` | Generate PR description | +| `/improve` | Suggest code improvements | +| `/ask ` | Answer questions about the PR | +| `/update_changelog` | Update changelog | +| `/add_docs` | Generate documentation | + +### Railway Integration + +Link a Railway project to get deployment notifications: + +```bash +pr-agent init --railway-project abc123 +``` + +The daemon will monitor deployments and can: +- Alert on failed deployments +- Analyze crash logs +- Correlate deployments with recent PRs + +## Development + +### Project Structure + +``` +pr_agent/ +β”œβ”€β”€ cli/ # CLI commands +β”‚ β”œβ”€β”€ config.py # Config management +β”‚ └── main.py # Typer CLI +β”œβ”€β”€ polling/ # Polling infrastructure +β”‚ β”œβ”€β”€ base.py # Abstract BasePoller +β”‚ β”œβ”€β”€ events.py # Event models +β”‚ β”œβ”€β”€ github.py # GitHub notification poller +β”‚ β”œβ”€β”€ railway.py # Railway deployment poller +β”‚ └── dispatcher.py # Event routing +└── ... # Original PR-Agent tools +``` + +### Running in Development + +```bash +# Run in foreground with debug output +pr-agent start --foreground + +# Or run directly +python -m pr_agent.cli.main start -f +``` + +## Requirements + +- Python 3.12+ +- GitHub personal access token with `notifications` and `repo` scopes +- Anthropic API key (or OpenAI API key) +- Railway API token (optional, for deployment monitoring) + +## License + +Apache 2.0 - See [LICENSE](LICENSE) + +## Acknowledgments + +Based on [Qodo PR-Agent](https://github.com/qodo-ai/pr-agent), the original AI-powered PR assistant. From 3adebdd9be2acd34e69bb161e0c197fc1a606491 Mon Sep 17 00:00:00 2001 From: research-developer Date: Thu, 18 Dec 2025 09:57:26 -0600 Subject: [PATCH 3/7] feat(webhooks): Add universal webhook router with multi-platform detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a modular webhook infrastructure that can: - Detect webhook source from headers and payload structure - Verify signatures for each platform - Parse into typed Pydantic event models - Route to registered handlers Supported platforms: - GitHub (X-GitHub-Event, HMAC SHA256) - Vercel (x-vercel-signature, HMAC SHA1) - Linear (Linear-Signature, HMAC SHA256) - Slack (X-Slack-Signature, HMAC SHA256) - Discord (X-Signature-Ed25519, Ed25519) - Gmail (Cloud Pub/Sub message structure) - Railway (User-Agent, payload structure) New files: - pr_agent/webhooks/models.py - WebhookSource, WebhookPayload, *Event models - pr_agent/webhooks/router.py - Detection, verification, routing logic Closes #8 πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pr_agent/webhooks/__init__.py | 20 ++ pr_agent/webhooks/models.py | 436 +++++++++++++++++++++++ pr_agent/webhooks/router.py | 633 ++++++++++++++++++++++++++++++++++ 3 files changed, 1089 insertions(+) create mode 100644 pr_agent/webhooks/__init__.py create mode 100644 pr_agent/webhooks/models.py create mode 100644 pr_agent/webhooks/router.py diff --git a/pr_agent/webhooks/__init__.py b/pr_agent/webhooks/__init__.py new file mode 100644 index 0000000000..05d152a46f --- /dev/null +++ b/pr_agent/webhooks/__init__.py @@ -0,0 +1,20 @@ +""" +Webhook handling infrastructure. + +Provides modular webhook parsing, routing, and verification for multiple platforms. +""" + +from pr_agent.webhooks.models import ( + WebhookSource, + WebhookPayload, + WebhookEvent, +) +from pr_agent.webhooks.router import WebhookRouter, detect_webhook_source + +__all__ = [ + "WebhookSource", + "WebhookPayload", + "WebhookEvent", + "WebhookRouter", + "detect_webhook_source", +] diff --git a/pr_agent/webhooks/models.py b/pr_agent/webhooks/models.py new file mode 100644 index 0000000000..1c4e08b284 --- /dev/null +++ b/pr_agent/webhooks/models.py @@ -0,0 +1,436 @@ +""" +Modular webhook models for multi-platform support. + +Designed to identify webhook sources and normalize payloads across platforms. +""" + +from abc import ABC, abstractmethod +from datetime import datetime +from enum import Enum +from typing import Any, Optional +from pydantic import BaseModel, Field + + +class WebhookSource(str, Enum): + """Supported webhook sources.""" + + GITHUB = "github" + VERCEL = "vercel" + NOTION = "notion" + LINEAR = "linear" + SLACK = "slack" + GMAIL = "gmail" # Via Pub/Sub + DISCORD = "discord" + RAILWAY = "railway" + UNKNOWN = "unknown" + + +class WebhookSignature(BaseModel): + """Signature/verification info from webhook.""" + + algorithm: Optional[str] = Field(None, description="Signing algorithm (sha256, ed25519, etc.)") + signature: Optional[str] = Field(None, description="The signature value") + timestamp: Optional[str] = Field(None, description="Timestamp used in signing") + + +class WebhookHeaders(BaseModel): + """Normalized webhook headers for source detection.""" + + # Raw headers (lowercased keys) + raw: dict[str, str] = Field(default_factory=dict) + + # Common identifiers extracted from headers + user_agent: Optional[str] = None + content_type: Optional[str] = None + + # Platform-specific headers + github_event: Optional[str] = Field(None, description="X-GitHub-Event") + github_delivery: Optional[str] = Field(None, description="X-GitHub-Delivery") + github_signature: Optional[str] = Field(None, description="X-Hub-Signature-256") + + vercel_signature: Optional[str] = Field(None, description="x-vercel-signature") + + linear_signature: Optional[str] = Field(None, description="Linear-Signature") + linear_event: Optional[str] = Field(None, description="Linear-Event") + + slack_signature: Optional[str] = Field(None, description="X-Slack-Signature") + slack_timestamp: Optional[str] = Field(None, description="X-Slack-Request-Timestamp") + + discord_signature: Optional[str] = Field(None, description="X-Signature-Ed25519") + discord_timestamp: Optional[str] = Field(None, description="X-Signature-Timestamp") + + @classmethod + def from_raw(cls, headers: dict[str, str]) -> "WebhookHeaders": + """Create from raw headers dict, normalizing keys to lowercase.""" + normalized = {k.lower(): v for k, v in headers.items()} + + return cls( + raw=normalized, + user_agent=normalized.get("user-agent"), + content_type=normalized.get("content-type"), + # GitHub + github_event=normalized.get("x-github-event"), + github_delivery=normalized.get("x-github-delivery"), + github_signature=normalized.get("x-hub-signature-256"), + # Vercel + vercel_signature=normalized.get("x-vercel-signature"), + # Linear + linear_signature=normalized.get("linear-signature"), + linear_event=normalized.get("linear-event"), + # Slack + slack_signature=normalized.get("x-slack-signature"), + slack_timestamp=normalized.get("x-slack-request-timestamp"), + # Discord + discord_signature=normalized.get("x-signature-ed25519"), + discord_timestamp=normalized.get("x-signature-timestamp"), + ) + + +class WebhookPayload(BaseModel): + """ + Universal webhook payload container. + + Wraps the raw payload with metadata for routing and verification. + """ + + # Detection results + source: WebhookSource = Field(WebhookSource.UNKNOWN, description="Detected source") + confidence: float = Field(0.0, description="Detection confidence 0-1") + + # Raw data + headers: WebhookHeaders = Field(default_factory=WebhookHeaders) + body: dict[str, Any] = Field(default_factory=dict, description="Parsed JSON body") + raw_body: Optional[bytes] = Field(None, description="Raw body for signature verification") + + # Extracted common fields + event_type: Optional[str] = Field(None, description="Normalized event type") + event_id: Optional[str] = Field(None, description="Unique event/delivery ID") + timestamp: Optional[datetime] = Field(None, description="Event timestamp") + + # Source-specific metadata + metadata: dict[str, Any] = Field(default_factory=dict) + + class Config: + arbitrary_types_allowed = True + + +class WebhookEvent(BaseModel, ABC): + """ + Base class for normalized webhook events. + + Each platform implements its own subclass with typed fields. + """ + + source: WebhookSource + event_type: str + event_id: str + timestamp: datetime + raw_payload: dict[str, Any] = Field(default_factory=dict) + + @classmethod + @abstractmethod + def from_payload(cls, payload: WebhookPayload) -> "WebhookEvent": + """Parse a WebhookPayload into a typed event.""" + pass + + +# ============================================================================= +# Platform-Specific Event Models +# ============================================================================= + +class GitHubWebhookEvent(WebhookEvent): + """GitHub webhook event.""" + + source: WebhookSource = WebhookSource.GITHUB + + # GitHub-specific fields + action: Optional[str] = None + repository: Optional[str] = None # owner/repo + sender: Optional[str] = None # username + + # PR-specific + pr_number: Optional[int] = None + pr_url: Optional[str] = None + + # Issue-specific + issue_number: Optional[int] = None + + # Comment-specific + comment_id: Optional[int] = None + comment_body: Optional[str] = None + + @classmethod + def from_payload(cls, payload: WebhookPayload) -> "GitHubWebhookEvent": + body = payload.body + repo = body.get("repository", {}) + sender = body.get("sender", {}) + pr = body.get("pull_request", {}) + issue = body.get("issue", {}) + comment = body.get("comment", {}) + + return cls( + event_type=payload.headers.github_event or "unknown", + event_id=payload.headers.github_delivery or "", + timestamp=payload.timestamp or datetime.now(), + raw_payload=body, + action=body.get("action"), + repository=repo.get("full_name"), + sender=sender.get("login"), + pr_number=pr.get("number") or issue.get("number"), + pr_url=pr.get("html_url"), + issue_number=issue.get("number"), + comment_id=comment.get("id"), + comment_body=comment.get("body"), + ) + + +class VercelWebhookEvent(WebhookEvent): + """Vercel webhook event.""" + + source: WebhookSource = WebhookSource.VERCEL + + # Vercel-specific fields + deployment_id: Optional[str] = None + deployment_url: Optional[str] = None + project_id: Optional[str] = None + project_name: Optional[str] = None + team_id: Optional[str] = None + + # Deployment status + status: Optional[str] = None # BUILDING, READY, ERROR, CANCELED + + # Git info + git_commit_sha: Optional[str] = None + git_commit_message: Optional[str] = None + git_branch: Optional[str] = None + + @classmethod + def from_payload(cls, payload: WebhookPayload) -> "VercelWebhookEvent": + body = payload.body + deployment = body.get("payload", body) # Vercel wraps in "payload" + + return cls( + event_type=body.get("type", "deployment"), + event_id=body.get("id", ""), + timestamp=payload.timestamp or datetime.now(), + raw_payload=body, + deployment_id=deployment.get("deployment", {}).get("id") or deployment.get("id"), + deployment_url=deployment.get("deployment", {}).get("url") or deployment.get("url"), + project_id=deployment.get("project", {}).get("id") or deployment.get("projectId"), + project_name=deployment.get("project", {}).get("name") or deployment.get("name"), + team_id=deployment.get("team", {}).get("id"), + status=deployment.get("deployment", {}).get("state") or deployment.get("state"), + git_commit_sha=deployment.get("deployment", {}).get("meta", {}).get("githubCommitSha"), + git_commit_message=deployment.get("deployment", {}).get("meta", {}).get("githubCommitMessage"), + git_branch=deployment.get("deployment", {}).get("meta", {}).get("githubCommitRef"), + ) + + +class LinearWebhookEvent(WebhookEvent): + """Linear webhook event.""" + + source: WebhookSource = WebhookSource.LINEAR + + # Linear-specific fields + action: Optional[str] = None # create, update, remove + + # Issue fields + issue_id: Optional[str] = None + issue_identifier: Optional[str] = None # e.g., "ENG-123" + issue_title: Optional[str] = None + issue_url: Optional[str] = None + + # State + state_name: Optional[str] = None + priority: Optional[int] = None + + # Relations + team_key: Optional[str] = None + assignee: Optional[str] = None + + # Comment + comment_id: Optional[str] = None + comment_body: Optional[str] = None + + @classmethod + def from_payload(cls, payload: WebhookPayload) -> "LinearWebhookEvent": + body = payload.body + data = body.get("data", {}) + + return cls( + event_type=body.get("type", "Issue"), + event_id=body.get("webhookId", ""), + timestamp=payload.timestamp or datetime.now(), + raw_payload=body, + action=body.get("action"), + issue_id=data.get("id"), + issue_identifier=data.get("identifier"), + issue_title=data.get("title"), + issue_url=data.get("url"), + state_name=data.get("state", {}).get("name"), + priority=data.get("priority"), + team_key=data.get("team", {}).get("key"), + assignee=data.get("assignee", {}).get("name"), + comment_id=data.get("comment", {}).get("id") if "comment" in body.get("type", "").lower() else None, + comment_body=data.get("body") if "comment" in body.get("type", "").lower() else None, + ) + + +class SlackWebhookEvent(WebhookEvent): + """Slack webhook/event.""" + + source: WebhookSource = WebhookSource.SLACK + + # Slack-specific fields + team_id: Optional[str] = None + channel_id: Optional[str] = None + channel_name: Optional[str] = None + user_id: Optional[str] = None + user_name: Optional[str] = None + + # Message/event data + text: Optional[str] = None + thread_ts: Optional[str] = None + + # Slash command specific + command: Optional[str] = None + response_url: Optional[str] = None + trigger_id: Optional[str] = None + + # Challenge (for URL verification) + challenge: Optional[str] = None + + @classmethod + def from_payload(cls, payload: WebhookPayload) -> "SlackWebhookEvent": + body = payload.body + event = body.get("event", {}) + + # Handle URL verification challenge + if body.get("type") == "url_verification": + return cls( + event_type="url_verification", + event_id=body.get("token", ""), + timestamp=datetime.now(), + raw_payload=body, + challenge=body.get("challenge"), + ) + + # Handle slash commands (form-encoded, different structure) + if "command" in body: + return cls( + event_type="slash_command", + event_id=body.get("trigger_id", ""), + timestamp=datetime.now(), + raw_payload=body, + team_id=body.get("team_id"), + channel_id=body.get("channel_id"), + channel_name=body.get("channel_name"), + user_id=body.get("user_id"), + user_name=body.get("user_name"), + text=body.get("text"), + command=body.get("command"), + response_url=body.get("response_url"), + trigger_id=body.get("trigger_id"), + ) + + # Handle Events API + return cls( + event_type=event.get("type", body.get("type", "unknown")), + event_id=body.get("event_id", ""), + timestamp=datetime.now(), + raw_payload=body, + team_id=body.get("team_id"), + channel_id=event.get("channel"), + user_id=event.get("user"), + text=event.get("text"), + thread_ts=event.get("thread_ts"), + ) + + +class DiscordWebhookEvent(WebhookEvent): + """Discord interaction/webhook event.""" + + source: WebhookSource = WebhookSource.DISCORD + + # Discord-specific fields + interaction_type: Optional[int] = None # 1=PING, 2=APPLICATION_COMMAND, 3=MESSAGE_COMPONENT + guild_id: Optional[str] = None + channel_id: Optional[str] = None + user_id: Optional[str] = None + user_name: Optional[str] = None + + # Command data + command_name: Optional[str] = None + command_options: list[dict] = Field(default_factory=list) + + # Message data + message_id: Optional[str] = None + message_content: Optional[str] = None + + @classmethod + def from_payload(cls, payload: WebhookPayload) -> "DiscordWebhookEvent": + body = payload.body + user = body.get("member", {}).get("user", {}) or body.get("user", {}) + data = body.get("data", {}) + + return cls( + event_type=f"interaction_{body.get('type', 0)}", + event_id=body.get("id", ""), + timestamp=payload.timestamp or datetime.now(), + raw_payload=body, + interaction_type=body.get("type"), + guild_id=body.get("guild_id"), + channel_id=body.get("channel_id"), + user_id=user.get("id"), + user_name=user.get("username"), + command_name=data.get("name"), + command_options=data.get("options", []), + message_id=body.get("message", {}).get("id"), + message_content=body.get("message", {}).get("content"), + ) + + +class GmailPubSubEvent(WebhookEvent): + """Gmail Pub/Sub push notification event.""" + + source: WebhookSource = WebhookSource.GMAIL + + # Gmail-specific fields + email_address: Optional[str] = None + history_id: Optional[str] = None + + @classmethod + def from_payload(cls, payload: WebhookPayload) -> "GmailPubSubEvent": + body = payload.body + + # Pub/Sub wraps the message + message = body.get("message", {}) + import base64 + import json + + # Decode the data field + data_b64 = message.get("data", "") + try: + data = json.loads(base64.b64decode(data_b64).decode()) + except Exception: + data = {} + + return cls( + event_type="gmail_push", + event_id=message.get("messageId", ""), + timestamp=payload.timestamp or datetime.now(), + raw_payload=body, + email_address=data.get("emailAddress"), + history_id=data.get("historyId"), + ) + + +# Registry for easy lookup +WEBHOOK_EVENT_CLASSES: dict[WebhookSource, type[WebhookEvent]] = { + WebhookSource.GITHUB: GitHubWebhookEvent, + WebhookSource.VERCEL: VercelWebhookEvent, + WebhookSource.LINEAR: LinearWebhookEvent, + WebhookSource.SLACK: SlackWebhookEvent, + WebhookSource.DISCORD: DiscordWebhookEvent, + WebhookSource.GMAIL: GmailPubSubEvent, +} diff --git a/pr_agent/webhooks/router.py b/pr_agent/webhooks/router.py new file mode 100644 index 0000000000..b96be5998c --- /dev/null +++ b/pr_agent/webhooks/router.py @@ -0,0 +1,633 @@ +""" +Webhook router for detecting and routing webhooks from multiple platforms. + +Uses a combination of headers, payload structure, and heuristics to identify +the source of incoming webhooks. +""" + +import hashlib +import hmac +import logging +from datetime import datetime +from typing import Any, Callable, Optional + +from pr_agent.webhooks.models import ( + WebhookSource, + WebhookHeaders, + WebhookPayload, + WebhookEvent, + WEBHOOK_EVENT_CLASSES, +) + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Source Detection Functions +# ============================================================================= + +def _detect_github(headers: WebhookHeaders, body: dict) -> tuple[bool, float]: + """ + Detect if webhook is from GitHub. + + Indicators: + - X-GitHub-Event header + - X-GitHub-Delivery header + - X-Hub-Signature-256 header + - User-Agent starts with "GitHub-Hookshot/" + - Body contains "repository", "sender" with specific GitHub structure + """ + confidence = 0.0 + + # Strong indicators (headers) + if headers.github_event: + confidence += 0.4 + if headers.github_delivery: + confidence += 0.3 + if headers.github_signature: + confidence += 0.2 + + # User-Agent check + if headers.user_agent and "GitHub-Hookshot" in headers.user_agent: + confidence += 0.1 + + # Payload structure checks + if "repository" in body and isinstance(body.get("repository"), dict): + if "full_name" in body.get("repository", {}): + confidence += 0.1 + if "sender" in body and isinstance(body.get("sender"), dict): + if "login" in body.get("sender", {}): + confidence += 0.1 + + return confidence >= 0.5, min(confidence, 1.0) + + +def _detect_vercel(headers: WebhookHeaders, body: dict) -> tuple[bool, float]: + """ + Detect if webhook is from Vercel. + + Indicators: + - x-vercel-signature header + - User-Agent contains "Vercel" + - Body has "type" field with deployment types + - Body has Vercel-specific fields (deploymentId, projectId) + """ + confidence = 0.0 + + # Strong indicators + if headers.vercel_signature: + confidence += 0.5 + + # User-Agent check + if headers.user_agent and "Vercel" in headers.user_agent: + confidence += 0.2 + + # Payload structure + vercel_types = ["deployment", "deployment.created", "deployment.succeeded", + "deployment.ready", "deployment.error", "deployment.canceled", + "project.created", "project.removed"] + if body.get("type") in vercel_types: + confidence += 0.3 + + # Vercel-specific fields + if "deploymentId" in body or ("payload" in body and "deployment" in body.get("payload", {})): + confidence += 0.2 + + return confidence >= 0.5, min(confidence, 1.0) + + +def _detect_linear(headers: WebhookHeaders, body: dict) -> tuple[bool, float]: + """ + Detect if webhook is from Linear. + + Indicators: + - Linear-Signature header + - Linear-Event header + - Body has "action", "data", "type" with Linear patterns + - Body contains Linear-specific fields (identifier like "ENG-123") + """ + confidence = 0.0 + + # Strong indicators + if headers.linear_signature: + confidence += 0.4 + if headers.linear_event: + confidence += 0.3 + + # Payload structure + linear_types = ["Issue", "Comment", "Project", "Cycle", "IssueLabel", "Reaction"] + if body.get("type") in linear_types: + confidence += 0.2 + + if "action" in body and body.get("action") in ["create", "update", "remove"]: + confidence += 0.1 + + # Linear identifier pattern (ABC-123) + data = body.get("data", {}) + identifier = data.get("identifier", "") + if isinstance(identifier, str) and "-" in identifier: + parts = identifier.split("-") + if len(parts) == 2 and parts[0].isalpha() and parts[1].isdigit(): + confidence += 0.2 + + # Linear-specific fields + if "webhookId" in body: + confidence += 0.1 + + return confidence >= 0.5, min(confidence, 1.0) + + +def _detect_slack(headers: WebhookHeaders, body: dict) -> tuple[bool, float]: + """ + Detect if webhook is from Slack. + + Indicators: + - X-Slack-Signature header + - X-Slack-Request-Timestamp header + - Body has Slack-specific fields (team_id, api_app_id) + - URL verification challenge + - Slash command fields (command, response_url) + """ + confidence = 0.0 + + # Strong indicators + if headers.slack_signature: + confidence += 0.4 + if headers.slack_timestamp: + confidence += 0.3 + + # URL verification + if body.get("type") == "url_verification" and "challenge" in body: + confidence += 0.5 + + # Slack-specific fields + if "team_id" in body: + confidence += 0.2 + if "api_app_id" in body: + confidence += 0.1 + + # Slash command detection + if "command" in body and body.get("command", "").startswith("/"): + confidence += 0.3 + if "response_url" in body and "slack.com" in body.get("response_url", ""): + confidence += 0.2 + + # Events API structure + if body.get("type") == "event_callback" and "event" in body: + confidence += 0.3 + + return confidence >= 0.5, min(confidence, 1.0) + + +def _detect_discord(headers: WebhookHeaders, body: dict) -> tuple[bool, float]: + """ + Detect if webhook is from Discord. + + Indicators: + - X-Signature-Ed25519 header + - X-Signature-Timestamp header + - Body has interaction structure (type, application_id) + - User-Agent contains "Discord" + """ + confidence = 0.0 + + # Strong indicators (Ed25519 signature headers) + if headers.discord_signature: + confidence += 0.4 + if headers.discord_timestamp: + confidence += 0.3 + + # User-Agent check + if headers.user_agent and "Discord" in headers.user_agent: + confidence += 0.1 + + # Interaction structure + if "type" in body and isinstance(body.get("type"), int): + # Discord interaction types are integers: 1=PING, 2=APP_COMMAND, etc. + if body.get("type") in [1, 2, 3, 4, 5]: + confidence += 0.3 + + if "application_id" in body: + confidence += 0.2 + + # Discord-specific fields + if "guild_id" in body or "channel_id" in body: + confidence += 0.1 + + return confidence >= 0.5, min(confidence, 1.0) + + +def _detect_gmail_pubsub(headers: WebhookHeaders, body: dict) -> tuple[bool, float]: + """ + Detect if webhook is from Gmail via Pub/Sub. + + Indicators: + - Body has Pub/Sub message structure + - Message data decodes to Gmail notification + - User-Agent contains "Google" or "APIs-Google" + """ + confidence = 0.0 + + # User-Agent check + if headers.user_agent: + if "Google" in headers.user_agent or "APIs-Google" in headers.user_agent: + confidence += 0.2 + + # Pub/Sub structure + if "message" in body and isinstance(body.get("message"), dict): + message = body["message"] + if "data" in message and "messageId" in message: + confidence += 0.4 + + # Try to decode and check for Gmail fields + if "data" in message: + import base64 + import json + try: + data = json.loads(base64.b64decode(message["data"]).decode()) + if "emailAddress" in data or "historyId" in data: + confidence += 0.4 + except Exception: + pass + + if "subscription" in body and "projects/" in body.get("subscription", ""): + confidence += 0.2 + + return confidence >= 0.5, min(confidence, 1.0) + + +def _detect_railway(headers: WebhookHeaders, body: dict) -> tuple[bool, float]: + """ + Detect if webhook is from Railway. + + Indicators: + - User-Agent contains "Railway" + - Body has Railway-specific fields + """ + confidence = 0.0 + + # User-Agent check + if headers.user_agent and "Railway" in headers.user_agent: + confidence += 0.4 + + # Railway-specific fields + if "type" in body: + railway_types = ["DEPLOY", "deployment.started", "deployment.completed", + "deployment.failed", "deployment.crashed"] + if body.get("type") in railway_types: + confidence += 0.4 + + # Railway payload structure + if "deployment" in body and "service" in body: + confidence += 0.3 + + return confidence >= 0.5, min(confidence, 1.0) + + +# Detection function registry +DETECTORS: list[tuple[WebhookSource, Callable[[WebhookHeaders, dict], tuple[bool, float]]]] = [ + (WebhookSource.GITHUB, _detect_github), + (WebhookSource.VERCEL, _detect_vercel), + (WebhookSource.LINEAR, _detect_linear), + (WebhookSource.SLACK, _detect_slack), + (WebhookSource.DISCORD, _detect_discord), + (WebhookSource.GMAIL, _detect_gmail_pubsub), + (WebhookSource.RAILWAY, _detect_railway), +] + + +def detect_webhook_source( + headers: dict[str, str], + body: dict[str, Any], +) -> tuple[WebhookSource, float]: + """ + Detect the source of a webhook based on headers and body. + + Args: + headers: Raw HTTP headers + body: Parsed JSON body + + Returns: + Tuple of (WebhookSource, confidence_score) + """ + parsed_headers = WebhookHeaders.from_raw(headers) + + best_source = WebhookSource.UNKNOWN + best_confidence = 0.0 + + for source, detector in DETECTORS: + try: + is_match, confidence = detector(parsed_headers, body) + if is_match and confidence > best_confidence: + best_source = source + best_confidence = confidence + except Exception as e: + logger.warning(f"Detector for {source.value} failed: {e}") + + logger.info(f"Detected webhook source: {best_source.value} (confidence: {best_confidence:.2f})") + return best_source, best_confidence + + +# ============================================================================= +# Signature Verification +# ============================================================================= + +def verify_github_signature(payload: bytes, signature: str, secret: str) -> bool: + """Verify GitHub webhook signature (X-Hub-Signature-256).""" + if not signature.startswith("sha256="): + return False + + expected = "sha256=" + hmac.new( + secret.encode(), + payload, + hashlib.sha256 + ).hexdigest() + + return hmac.compare_digest(signature, expected) + + +def verify_slack_signature( + payload: bytes, + signature: str, + timestamp: str, + secret: str, +) -> bool: + """Verify Slack webhook signature.""" + sig_basestring = f"v0:{timestamp}:{payload.decode()}" + expected = "v0=" + hmac.new( + secret.encode(), + sig_basestring.encode(), + hashlib.sha256 + ).hexdigest() + + return hmac.compare_digest(signature, expected) + + +def verify_linear_signature(payload: bytes, signature: str, secret: str) -> bool: + """Verify Linear webhook signature.""" + expected = hmac.new( + secret.encode(), + payload, + hashlib.sha256 + ).hexdigest() + + return hmac.compare_digest(signature, expected) + + +def verify_vercel_signature(payload: bytes, signature: str, secret: str) -> bool: + """Verify Vercel webhook signature.""" + expected = hmac.new( + secret.encode(), + payload, + hashlib.sha1 + ).hexdigest() + + return hmac.compare_digest(signature, expected) + + +def verify_discord_signature( + payload: bytes, + signature: str, + timestamp: str, + public_key: str, +) -> bool: + """Verify Discord interaction signature (Ed25519).""" + try: + from nacl.signing import VerifyKey + from nacl.exceptions import BadSignature + + verify_key = VerifyKey(bytes.fromhex(public_key)) + message = timestamp.encode() + payload + verify_key.verify(message, bytes.fromhex(signature)) + return True + except ImportError: + logger.warning("PyNaCl not installed, cannot verify Discord signatures") + return False + except BadSignature: + return False + except Exception as e: + logger.error(f"Discord signature verification failed: {e}") + return False + + +# ============================================================================= +# Webhook Router +# ============================================================================= + +class WebhookRouter: + """ + Routes webhooks to appropriate handlers based on detected source. + + Usage: + router = WebhookRouter() + + @router.on(WebhookSource.GITHUB) + async def handle_github(event: GitHubWebhookEvent): + print(f"GitHub event: {event.event_type}") + + # In your webhook endpoint: + event = router.parse(headers, body, raw_body) + await router.dispatch(event) + """ + + def __init__(self): + self._handlers: dict[WebhookSource, list[Callable]] = {} + self._secrets: dict[WebhookSource, str] = {} + + def set_secret(self, source: WebhookSource, secret: str) -> None: + """Set the webhook secret for a source (used for verification).""" + self._secrets[source] = secret + + def on(self, source: WebhookSource): + """Decorator to register a handler for a webhook source.""" + + def decorator(func: Callable) -> Callable: + if source not in self._handlers: + self._handlers[source] = [] + self._handlers[source].append(func) + return func + + return decorator + + def register_handler(self, source: WebhookSource, handler: Callable) -> None: + """Register a handler programmatically.""" + if source not in self._handlers: + self._handlers[source] = [] + self._handlers[source].append(handler) + + def parse( + self, + headers: dict[str, str], + body: dict[str, Any], + raw_body: Optional[bytes] = None, + ) -> WebhookPayload: + """ + Parse an incoming webhook into a WebhookPayload. + + Args: + headers: Raw HTTP headers + body: Parsed JSON body + raw_body: Raw body bytes (for signature verification) + + Returns: + WebhookPayload with detected source + """ + source, confidence = detect_webhook_source(headers, body) + parsed_headers = WebhookHeaders.from_raw(headers) + + # Extract event ID and type based on source + event_type = None + event_id = None + + if source == WebhookSource.GITHUB: + event_type = parsed_headers.github_event + event_id = parsed_headers.github_delivery + elif source == WebhookSource.LINEAR: + event_type = body.get("type") + event_id = body.get("webhookId") + elif source == WebhookSource.SLACK: + event_type = body.get("event", {}).get("type") or body.get("type") + event_id = body.get("event_id") + elif source == WebhookSource.VERCEL: + event_type = body.get("type") + event_id = body.get("id") + elif source == WebhookSource.DISCORD: + event_type = f"interaction_{body.get('type')}" + event_id = body.get("id") + + return WebhookPayload( + source=source, + confidence=confidence, + headers=parsed_headers, + body=body, + raw_body=raw_body, + event_type=event_type, + event_id=event_id, + timestamp=datetime.now(), + ) + + def verify(self, payload: WebhookPayload) -> bool: + """ + Verify the webhook signature if a secret is configured. + + Args: + payload: The parsed webhook payload + + Returns: + True if verified (or no secret configured), False if verification fails + """ + secret = self._secrets.get(payload.source) + if not secret or not payload.raw_body: + return True # No verification configured + + if payload.source == WebhookSource.GITHUB: + sig = payload.headers.github_signature + return sig and verify_github_signature(payload.raw_body, sig, secret) + + elif payload.source == WebhookSource.SLACK: + sig = payload.headers.slack_signature + ts = payload.headers.slack_timestamp + return sig and ts and verify_slack_signature(payload.raw_body, sig, ts, secret) + + elif payload.source == WebhookSource.LINEAR: + sig = payload.headers.linear_signature + return sig and verify_linear_signature(payload.raw_body, sig, secret) + + elif payload.source == WebhookSource.VERCEL: + sig = payload.headers.vercel_signature + return sig and verify_vercel_signature(payload.raw_body, sig, secret) + + elif payload.source == WebhookSource.DISCORD: + sig = payload.headers.discord_signature + ts = payload.headers.discord_timestamp + return sig and ts and verify_discord_signature(payload.raw_body, sig, ts, secret) + + return True # Unknown source, skip verification + + def to_event(self, payload: WebhookPayload) -> Optional[WebhookEvent]: + """ + Convert a WebhookPayload to a typed WebhookEvent. + + Args: + payload: The parsed webhook payload + + Returns: + Typed WebhookEvent subclass, or None if source unknown + """ + event_class = WEBHOOK_EVENT_CLASSES.get(payload.source) + if not event_class: + logger.warning(f"No event class for source: {payload.source}") + return None + + return event_class.from_payload(payload) + + async def dispatch(self, event: WebhookEvent) -> list[Any]: + """ + Dispatch an event to registered handlers. + + Args: + event: The typed webhook event + + Returns: + List of handler results + """ + handlers = self._handlers.get(event.source, []) + if not handlers: + logger.debug(f"No handlers for source: {event.source.value}") + return [] + + import asyncio + import inspect + + results = [] + for handler in handlers: + try: + if inspect.iscoroutinefunction(handler): + result = await handler(event) + else: + result = handler(event) + results.append(result) + except Exception as e: + logger.error(f"Handler {handler.__name__} failed: {e}") + results.append(e) + + return results + + async def handle_webhook( + self, + headers: dict[str, str], + body: dict[str, Any], + raw_body: Optional[bytes] = None, + ) -> Optional[WebhookEvent]: + """ + Full webhook handling pipeline: parse -> verify -> convert -> dispatch. + + Args: + headers: Raw HTTP headers + body: Parsed JSON body + raw_body: Raw body bytes for verification + + Returns: + The processed WebhookEvent, or None if handling failed + """ + # Parse + payload = self.parse(headers, body, raw_body) + + if payload.source == WebhookSource.UNKNOWN: + logger.warning("Could not detect webhook source") + return None + + # Verify + if not self.verify(payload): + logger.error(f"Webhook signature verification failed for {payload.source.value}") + return None + + # Convert to typed event + event = self.to_event(payload) + if not event: + return None + + # Dispatch + await self.dispatch(event) + + return event From e0f7bc6563bd5d053535d5c47c92c8c9d2f1463b Mon Sep 17 00:00:00 2001 From: research-developer <115124732+research-developer@users.noreply.github.com> Date: Thu, 18 Dec 2025 20:50:52 -0600 Subject: [PATCH 4/7] Update pr_agent/polling/dispatcher.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pr_agent/polling/dispatcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pr_agent/polling/dispatcher.py b/pr_agent/polling/dispatcher.py index 823fc61b2d..6074904a7c 100644 --- a/pr_agent/polling/dispatcher.py +++ b/pr_agent/polling/dispatcher.py @@ -246,10 +246,12 @@ async def handle_github_mention(event: Event) -> None: if event.command_args: request += " " + " ".join(event.command_args) - await agent.handle_request( + result = agent.handle_request( pr_url=event.pr_url, request=request, ) + if asyncio.iscoroutine(result): + await result except Exception as e: logger.error(f"PR-Agent failed to process {event.command}: {e}") raise From 4e43b1f2434f99a3a2aa050df6c3e744ef67dbfb Mon Sep 17 00:00:00 2001 From: research-developer <115124732+research-developer@users.noreply.github.com> Date: Tue, 23 Dec 2025 04:43:44 -0600 Subject: [PATCH 5/7] Update pr_agent/cli/main.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pr_agent/cli/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pr_agent/cli/main.py b/pr_agent/cli/main.py index a2c9491f39..32b36bb96e 100644 --- a/pr_agent/cli/main.py +++ b/pr_agent/cli/main.py @@ -257,8 +257,9 @@ async def main(): dispatcher.add_poller(github_poller) # Register handlers - handler = await create_pr_agent_handler() + handler = create_pr_agent_handler() dispatcher.register_handler(EventType.GITHUB_PR_MENTION, handler) + dispatcher.register_handler(EventType.GITHUB_PR_OPENED, handler) # Setup Railway pollers for repos that have it configured railway_token = get_credential("railway_token", config) From 29144dfe80a3b1d924734f99b7a8d7a2fbe4cd10 Mon Sep 17 00:00:00 2001 From: research-developer Date: Tue, 23 Dec 2025 04:51:54 -0600 Subject: [PATCH 6/7] fix: Address PR review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename pr_agent/cli/ to pr_agent/daemon/ to avoid shadowing the existing pr_agent/cli.py module (P1 fix) - Add pr-agent-daemon entry point in pyproject.toml - Fix GitHubPoller.poll() to return empty list instead of raising NotImplementedError - allows daemon to start without crashing (P1 fix) - Fix RailwayPoller.poll() same as above - Update README with pr-agent-daemon command references πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 58 +++++++++++++++++------------- pr_agent/cli/__init__.py | 15 -------- pr_agent/daemon/__init__.py | 15 ++++++++ pr_agent/{cli => daemon}/config.py | 0 pr_agent/{cli => daemon}/main.py | 16 ++++----- pr_agent/polling/dispatcher.py | 2 +- pr_agent/polling/github.py | 10 +++--- pr_agent/polling/railway.py | 10 +++--- pyproject.toml | 1 + 9 files changed, 70 insertions(+), 57 deletions(-) delete mode 100644 pr_agent/cli/__init__.py create mode 100644 pr_agent/daemon/__init__.py rename pr_agent/{cli => daemon}/config.py (100%) rename pr_agent/{cli => daemon}/main.py (97%) diff --git a/README.md b/README.md index b6875cb284..4f4a3e3a11 100644 --- a/README.md +++ b/README.md @@ -20,29 +20,36 @@ pip install -e . # Initialize a repository cd ~/projects/myapp -pr-agent init +pr-agent-daemon init # Configure credentials (or set environment variables) -pr-agent config set github_token ghp_xxx -pr-agent config set anthropic_api_key sk-ant-xxx +pr-agent-daemon config set github_token ghp_xxx +pr-agent-daemon config set anthropic_api_key sk-ant-xxx # Start the daemon -pr-agent start +pr-agent-daemon start # Check status -pr-agent status +pr-agent-daemon status ``` ## CLI Commands +The daemon uses `pr-agent-daemon` command (separate from the original `pr-agent` CLI): + | Command | Description | |---------|-------------| -| `pr-agent init` | Add current repo to config | -| `pr-agent start` | Start polling daemon | -| `pr-agent stop` | Stop daemon | -| `pr-agent status` | Show daemon status and tracked repos | -| `pr-agent config` | View/edit configuration | -| `pr-agent remove` | Remove repo from config | +| `pr-agent-daemon init` | Add current repo to config | +| `pr-agent-daemon start` | Start polling daemon | +| `pr-agent-daemon stop` | Stop daemon | +| `pr-agent-daemon status` | Show daemon status and tracked repos | +| `pr-agent-daemon config` | View/edit configuration | +| `pr-agent-daemon remove` | Remove repo from config | + +The original `pr-agent` command remains available for direct PR reviews: +```bash +pr-agent --pr_url= review +``` ## Configuration @@ -87,7 +94,7 @@ Credentials can also be set via environment variables (takes precedence over con ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ pr-agent CLI β”‚ +β”‚ pr-agent-daemon CLI β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ init - Add repo to config β”‚ β”‚ start - Start polling daemon β”‚ @@ -148,7 +155,7 @@ When mentioned on a PR, the bot responds to: Link a Railway project to get deployment notifications: ```bash -pr-agent init --railway-project abc123 +pr-agent-daemon init --railway-project abc123 ``` The daemon will monitor deployments and can: @@ -162,26 +169,27 @@ The daemon will monitor deployments and can: ``` pr_agent/ -β”œβ”€β”€ cli/ # CLI commands -β”‚ β”œβ”€β”€ config.py # Config management -β”‚ └── main.py # Typer CLI -β”œβ”€β”€ polling/ # Polling infrastructure -β”‚ β”œβ”€β”€ base.py # Abstract BasePoller -β”‚ β”œβ”€β”€ events.py # Event models -β”‚ β”œβ”€β”€ github.py # GitHub notification poller -β”‚ β”œβ”€β”€ railway.py # Railway deployment poller -β”‚ └── dispatcher.py # Event routing -└── ... # Original PR-Agent tools +β”œβ”€β”€ daemon/ # Daemon CLI commands +β”‚ β”œβ”€β”€ config.py # Config management +β”‚ └── main.py # Typer CLI +β”œβ”€β”€ polling/ # Polling infrastructure +β”‚ β”œβ”€β”€ base.py # Abstract BasePoller +β”‚ β”œβ”€β”€ events.py # Event models +β”‚ β”œβ”€β”€ github.py # GitHub notification poller +β”‚ β”œβ”€β”€ railway.py # Railway deployment poller +β”‚ └── dispatcher.py # Event routing +β”œβ”€β”€ cli.py # Original PR-Agent CLI +└── ... # Original PR-Agent tools ``` ### Running in Development ```bash # Run in foreground with debug output -pr-agent start --foreground +pr-agent-daemon start --foreground # Or run directly -python -m pr_agent.cli.main start -f +python -m pr_agent.daemon.main start -f ``` ## Requirements diff --git a/pr_agent/cli/__init__.py b/pr_agent/cli/__init__.py deleted file mode 100644 index d4fbe1c8e4..0000000000 --- a/pr_agent/cli/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -PR-Agent CLI for local daemon management. - -Commands: - pr-agent init - Add current repo to config - pr-agent start - Start polling daemon - pr-agent stop - Stop daemon - pr-agent status - Show daemon status - pr-agent config - View/edit config -""" - -from pr_agent.cli.config import Config, load_config, save_config -from pr_agent.cli.main import main as run - -__all__ = ["Config", "load_config", "save_config", "run"] diff --git a/pr_agent/daemon/__init__.py b/pr_agent/daemon/__init__.py new file mode 100644 index 0000000000..bcdedeed3c --- /dev/null +++ b/pr_agent/daemon/__init__.py @@ -0,0 +1,15 @@ +""" +PR-Agent Daemon for local polling-based PR reviews. + +Commands: + pr-agent-daemon init - Add current repo to config + pr-agent-daemon start - Start polling daemon + pr-agent-daemon stop - Stop daemon + pr-agent-daemon status - Show daemon status + pr-agent-daemon config - View/edit config +""" + +from pr_agent.daemon.config import Config, load_config, save_config +from pr_agent.daemon.main import main as run + +__all__ = ["Config", "load_config", "save_config", "run"] diff --git a/pr_agent/cli/config.py b/pr_agent/daemon/config.py similarity index 100% rename from pr_agent/cli/config.py rename to pr_agent/daemon/config.py diff --git a/pr_agent/cli/main.py b/pr_agent/daemon/main.py similarity index 97% rename from pr_agent/cli/main.py rename to pr_agent/daemon/main.py index 32b36bb96e..0e386051c6 100644 --- a/pr_agent/cli/main.py +++ b/pr_agent/daemon/main.py @@ -1,12 +1,12 @@ """ -PR-Agent CLI entry point. +PR-Agent Daemon entry point. Commands: - pr-agent init - Add current repo to config - pr-agent start - Start polling daemon - pr-agent stop - Stop daemon - pr-agent status - Show daemon status - pr-agent config - View/edit config + pr-agent-daemon init - Add current repo to config + pr-agent-daemon start - Start polling daemon + pr-agent-daemon stop - Stop daemon + pr-agent-daemon status - Show daemon status + pr-agent-daemon config - View/edit config """ import os @@ -19,7 +19,7 @@ from rich.console import Console from rich.table import Table -from pr_agent.cli.config import ( +from pr_agent.daemon.config import ( Config, RepoConfig, DEFAULT_CONFIG_FILE, @@ -33,7 +33,7 @@ ) app = typer.Typer( - name="pr-agent", + name="pr-agent-daemon", help="Local PR-Agent daemon for GitHub and Railway polling.", add_completion=False, ) diff --git a/pr_agent/polling/dispatcher.py b/pr_agent/polling/dispatcher.py index 6074904a7c..75d7626bea 100644 --- a/pr_agent/polling/dispatcher.py +++ b/pr_agent/polling/dispatcher.py @@ -214,7 +214,7 @@ def stats(self) -> dict: } -async def create_pr_agent_handler() -> EventHandler: +def create_pr_agent_handler() -> EventHandler: """ Create a handler that integrates with existing PR-Agent tools. diff --git a/pr_agent/polling/github.py b/pr_agent/polling/github.py index af62778cc1..047573735a 100644 --- a/pr_agent/polling/github.py +++ b/pr_agent/polling/github.py @@ -122,11 +122,13 @@ async def poll(self) -> list[Event]: - Parse commands from comment body - Mark notifications as read """ - # STUB: Implementation goes in feature/github-poller branch - raise NotImplementedError( - "GitHubPoller.poll() not yet implemented. " - "See feature/github-poller branch." + # STUB: Return empty list until implemented in feature/github-poller branch + # This allows the daemon to start without crashing + logger.debug( + f"[{self.name}] poll() not yet implemented - " + "see feature/github-poller branch" ) + return [] async def _fetch_notifications(self) -> Optional[list[dict]]: """ diff --git a/pr_agent/polling/railway.py b/pr_agent/polling/railway.py index d363e53eb3..6820441642 100644 --- a/pr_agent/polling/railway.py +++ b/pr_agent/polling/railway.py @@ -105,11 +105,13 @@ async def poll(self) -> list[Event]: - Emit events only for terminal state transitions - Filter by service_ids and environments if specified """ - # STUB: Implementation goes in feature/railway-poller branch - raise NotImplementedError( - "RailwayPoller.poll() not yet implemented. " - "See feature/railway-poller branch." + # STUB: Return empty list until implemented in feature/railway-poller branch + # This allows the daemon to start without crashing + logger.debug( + f"[{self.name}] poll() not yet implemented - " + "see feature/railway-poller branch" ) + return [] async def _fetch_deployments(self) -> list[dict]: """ diff --git a/pyproject.toml b/pyproject.toml index 52a7f62b32..177eb65297 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ include = [ [project.scripts] pr-agent = "pr_agent.cli:run" +pr-agent-daemon = "pr_agent.daemon:run" [tool.ruff] line-length = 120 From 428cfe95281cb9648339c6957320784232bd04e6 Mon Sep 17 00:00:00 2001 From: research-developer Date: Tue, 23 Dec 2025 22:13:28 -0600 Subject: [PATCH 7/7] feat: Implement Railway and Cloudflare pollers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Railway Poller: - Full implementation of poll() and _fetch_deployments() - GraphQL query for project deployments - Status change detection (SUCCESS, FAILED, CRASHED) - Service and environment filtering - Logs URL construction Cloudflare Poller: - New CloudflarePoller for Pages and Workers - REST API polling for deployment status - Pages: tracks deployment stages (queued β†’ deploy) - Workers: tracks new deployments - Project and script name filtering Event Models: - CloudflarePagesEvent for Pages deployments - CloudflareWorkersEvent for Workers deployments - New EventTypes for Cloudflare states Config Updates: - cloudflare_token and cloudflare_account_id credentials - poll_interval_cloudflare setting - cloudflare_pages_project and cloudflare_workers in RepoConfig Closes #9 πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pr_agent/daemon/config.py | 11 + pr_agent/polling/__init__.py | 26 ++- pr_agent/polling/cloudflare.py | 401 +++++++++++++++++++++++++++++++++ pr_agent/polling/events.py | 88 ++++++++ pr_agent/polling/railway.py | 157 ++++++++++--- 5 files changed, 649 insertions(+), 34 deletions(-) create mode 100644 pr_agent/polling/cloudflare.py diff --git a/pr_agent/daemon/config.py b/pr_agent/daemon/config.py index 70a67f92b3..ab9e0b02ee 100644 --- a/pr_agent/daemon/config.py +++ b/pr_agent/daemon/config.py @@ -25,6 +25,8 @@ class Credentials(BaseModel): anthropic_api_key: Optional[str] = Field(None, description="Anthropic API key") openai_api_key: Optional[str] = Field(None, description="OpenAI API key") railway_token: Optional[str] = Field(None, description="Railway API token") + cloudflare_token: Optional[str] = Field(None, description="Cloudflare API token") + cloudflare_account_id: Optional[str] = Field(None, description="Cloudflare account ID") class Settings(BaseModel): @@ -32,6 +34,7 @@ class Settings(BaseModel): poll_interval_github: int = Field(30, description="GitHub poll interval in seconds") poll_interval_railway: int = Field(60, description="Railway poll interval in seconds") + poll_interval_cloudflare: int = Field(60, description="Cloudflare poll interval in seconds") model: str = Field("claude-sonnet-4-20250514", description="Default AI model") log_level: str = Field("INFO", description="Logging level") github_username: Optional[str] = Field(None, description="GitHub bot username") @@ -49,6 +52,14 @@ class RepoConfig(BaseModel): default_factory=list, description="Railway service IDs to monitor" ) + # Cloudflare integration (optional) + cloudflare_pages_project: Optional[str] = Field( + None, description="Cloudflare Pages project name" + ) + cloudflare_workers: list[str] = Field( + default_factory=list, description="Cloudflare Workers script names to monitor" + ) + # Auto-commands on PR open auto_review: bool = Field(True, description="Auto-run /review on new PRs") auto_describe: bool = Field(False, description="Auto-run /describe on new PRs") diff --git a/pr_agent/polling/__init__.py b/pr_agent/polling/__init__.py index aa0021b67f..7cc51eae0c 100644 --- a/pr_agent/polling/__init__.py +++ b/pr_agent/polling/__init__.py @@ -2,19 +2,37 @@ Polling infrastructure for local PR-Agent daemon. This module provides polling-based event sources as an alternative to webhooks, -enabling PR-Agent to run as a local service that monitors GitHub notifications -and Railway deployments. +enabling PR-Agent to run as a local service that monitors GitHub notifications, +Railway deployments, and Cloudflare deployments. """ from pr_agent.polling.base import BasePoller -from pr_agent.polling.events import Event, EventType, GitHubMentionEvent, RailwayDeployEvent +from pr_agent.polling.events import ( + Event, + EventType, + GitHubMentionEvent, + RailwayDeployEvent, + CloudflarePagesEvent, + CloudflareWorkersEvent, +) from pr_agent.polling.dispatcher import EventDispatcher +from pr_agent.polling.github import GitHubPoller +from pr_agent.polling.railway import RailwayPoller +from pr_agent.polling.cloudflare import CloudflarePoller __all__ = [ + # Base "BasePoller", + "EventDispatcher", + # Pollers + "GitHubPoller", + "RailwayPoller", + "CloudflarePoller", + # Events "Event", "EventType", "GitHubMentionEvent", "RailwayDeployEvent", - "EventDispatcher", + "CloudflarePagesEvent", + "CloudflareWorkersEvent", ] diff --git a/pr_agent/polling/cloudflare.py b/pr_agent/polling/cloudflare.py new file mode 100644 index 0000000000..438d9ed50b --- /dev/null +++ b/pr_agent/polling/cloudflare.py @@ -0,0 +1,401 @@ +""" +Cloudflare Deployment Poller. + +Polls Cloudflare's REST API for Pages and Workers deployment status changes. +""" + +from datetime import datetime +from typing import Optional +import logging + +import httpx + +from pr_agent.polling.base import BasePoller +from pr_agent.polling.events import ( + Event, + EventType, + CloudflarePagesEvent, + CloudflareWorkersEvent, +) + +logger = logging.getLogger(__name__) + +# Cloudflare API endpoint +CLOUDFLARE_API_URL = "https://api.cloudflare.com/client/v4" + + +class CloudflarePoller(BasePoller): + """ + Poll Cloudflare API for Pages and Workers deployment status changes. + + Tracks deployments and emits events when status changes to + success, failed, or active. + + Usage: + poller = CloudflarePoller( + token="cf_xxx", + account_id="abc123", + poll_interval=60, + ) + async for event in poller.run(): + if event.is_failure: + await alert_team(event) + """ + + def __init__( + self, + token: str, + account_id: str, + poll_interval: int = 60, + pages_projects: Optional[list[str]] = None, + workers_scripts: Optional[list[str]] = None, + environments: Optional[list[str]] = None, + ): + """ + Initialize the Cloudflare poller. + + Args: + token: Cloudflare API token + account_id: Cloudflare account ID + poll_interval: Seconds between polls (default: 60) + pages_projects: Optional list of Pages project names to monitor + If None, monitors all projects + workers_scripts: Optional list of Worker script names to monitor + If None, monitors all workers + environments: Optional list of environment names to filter + (e.g., ["production", "preview"]) + """ + super().__init__(poll_interval=poll_interval, name="cloudflare") + + self.token = token + self.account_id = account_id + self.pages_projects = set(pages_projects) if pages_projects else None + self.workers_scripts = set(workers_scripts) if workers_scripts else None + self.environments = set(environments) if environments else None + + # Track deployment states for change detection + self._pages_states: dict[str, str] = {} # deploy_id -> status + self._workers_states: dict[str, str] = {} # version_id -> status + + # HTTP client (created in setup) + self._client: Optional[httpx.AsyncClient] = None + + async def setup(self) -> None: + """Initialize HTTP client.""" + self._client = httpx.AsyncClient(timeout=30.0) + logger.info(f"[{self.name}] Initialized for account {self.account_id}") + + async def teardown(self) -> None: + """Close HTTP client.""" + if self._client: + await self._client.aclose() + self._client = None + + @property + def _headers(self) -> dict[str, str]: + """Get headers for Cloudflare API requests.""" + return { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json", + } + + async def poll(self) -> list[Event]: + """ + Poll Cloudflare API for deployment status changes. + + Returns: + List of CloudflarePagesEvent/CloudflareWorkersEvent objects + """ + if not self._client: + logger.warning(f"[{self.name}] HTTP client not initialized") + return [] + + events: list[Event] = [] + + # Poll Pages deployments + if self.pages_projects is None or len(self.pages_projects) > 0: + try: + pages_events = await self._poll_pages() + events.extend(pages_events) + except Exception as e: + logger.error(f"[{self.name}] Failed to poll Pages: {e}") + + # Poll Workers deployments + if self.workers_scripts is None or len(self.workers_scripts) > 0: + try: + workers_events = await self._poll_workers() + events.extend(workers_events) + except Exception as e: + logger.error(f"[{self.name}] Failed to poll Workers: {e}") + + if events: + logger.info(f"[{self.name}] Emitting {len(events)} deployment event(s)") + + return events + + async def _poll_pages(self) -> list[Event]: + """Poll Cloudflare Pages for deployment changes.""" + events: list[Event] = [] + + # Get list of projects to poll + projects = await self._list_pages_projects() + + for project in projects: + project_name = project.get("name") + if not project_name: + continue + + # Filter by configured projects + if self.pages_projects and project_name not in self.pages_projects: + continue + + try: + deployments = await self._fetch_pages_deployments(project_name) + for deployment in deployments: + event = self._process_pages_deployment(project_name, deployment) + if event: + events.append(event) + except Exception as e: + logger.error(f"[{self.name}] Failed to fetch deployments for {project_name}: {e}") + + return events + + async def _poll_workers(self) -> list[Event]: + """Poll Cloudflare Workers for deployment changes.""" + events: list[Event] = [] + + # Get list of workers to poll + scripts = await self._list_workers_scripts() + + for script in scripts: + script_name = script.get("id") + if not script_name: + continue + + # Filter by configured scripts + if self.workers_scripts and script_name not in self.workers_scripts: + continue + + try: + deployments = await self._fetch_workers_deployments(script_name) + for deployment in deployments: + event = self._process_workers_deployment(script_name, deployment) + if event: + events.append(event) + except Exception as e: + logger.error(f"[{self.name}] Failed to fetch deployments for {script_name}: {e}") + + return events + + async def _list_pages_projects(self) -> list[dict]: + """List all Pages projects in the account.""" + url = f"{CLOUDFLARE_API_URL}/accounts/{self.account_id}/pages/projects" + + try: + response = await self._client.get(url, headers=self._headers) + response.raise_for_status() + data = response.json() + + if not data.get("success"): + logger.error(f"[{self.name}] Pages API error: {data.get('errors')}") + return [] + + return data.get("result", []) + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + logger.warning(f"[{self.name}] Rate limited on Pages projects") + else: + logger.error(f"[{self.name}] HTTP error listing projects: {e}") + return [] + + async def _list_workers_scripts(self) -> list[dict]: + """List all Workers scripts in the account.""" + url = f"{CLOUDFLARE_API_URL}/accounts/{self.account_id}/workers/scripts" + + try: + response = await self._client.get(url, headers=self._headers) + response.raise_for_status() + data = response.json() + + if not data.get("success"): + logger.error(f"[{self.name}] Workers API error: {data.get('errors')}") + return [] + + return data.get("result", []) + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + logger.warning(f"[{self.name}] Rate limited on Workers scripts") + else: + logger.error(f"[{self.name}] HTTP error listing scripts: {e}") + return [] + + async def _fetch_pages_deployments(self, project_name: str) -> list[dict]: + """Fetch recent deployments for a Pages project.""" + url = ( + f"{CLOUDFLARE_API_URL}/accounts/{self.account_id}" + f"/pages/projects/{project_name}/deployments" + ) + + response = await self._client.get(url, headers=self._headers) + response.raise_for_status() + data = response.json() + + if not data.get("success"): + logger.error(f"[{self.name}] Pages deployments error: {data.get('errors')}") + return [] + + return data.get("result", []) + + async def _fetch_workers_deployments(self, script_name: str) -> list[dict]: + """Fetch recent deployments for a Workers script.""" + url = ( + f"{CLOUDFLARE_API_URL}/accounts/{self.account_id}" + f"/workers/scripts/{script_name}/deployments" + ) + + response = await self._client.get(url, headers=self._headers) + response.raise_for_status() + data = response.json() + + if not data.get("success"): + logger.error(f"[{self.name}] Workers deployments error: {data.get('errors')}") + return [] + + # Workers API nests deployments + result = data.get("result", {}) + return result.get("deployments", []) if isinstance(result, dict) else [] + + def _process_pages_deployment( + self, project_name: str, deployment: dict + ) -> Optional[CloudflarePagesEvent]: + """Process a Pages deployment into an event.""" + deploy_id = deployment.get("id") + if not deploy_id: + return None + + # Get latest stage status + latest_stage = deployment.get("latest_stage", {}) + stage_name = latest_stage.get("name", "") + status = latest_stage.get("status", "") + + # Apply environment filter + environment = deployment.get("environment", "production") + if self.environments and environment not in self.environments: + return None + + # Check if status changed + previous_status = self._pages_states.get(deploy_id) + current_state = f"{stage_name}:{status}" + self._pages_states[deploy_id] = current_state + + # Only emit for terminal states and state changes + if stage_name != "deploy": + return None # Not in final stage yet + + terminal_statuses = {"success", "active", "failure", "failed"} + if status.lower() not in terminal_statuses: + return None + + # Skip if we already emitted for this state + if previous_status == current_state: + return None + + # Determine event type + if status.lower() in ("success", "active"): + event_type = EventType.CLOUDFLARE_PAGES_SUCCESS + else: + event_type = EventType.CLOUDFLARE_PAGES_FAILED + + # Extract git info + trigger = deployment.get("deployment_trigger", {}) + metadata = trigger.get("metadata", {}) + commit_sha = metadata.get("commit_hash") + commit_message = metadata.get("commit_message") + branch = metadata.get("branch") + + # Parse timestamp + created_on = deployment.get("created_on") + timestamp = datetime.utcnow() + if created_on: + try: + timestamp = datetime.fromisoformat(created_on.replace("Z", "+00:00")) + except (ValueError, AttributeError): + pass + + # Calculate build duration + started = latest_stage.get("started_on") + ended = latest_stage.get("ended_on") + build_duration_ms = None + if started and ended: + try: + start_dt = datetime.fromisoformat(started.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00")) + build_duration_ms = int((end_dt - start_dt).total_seconds() * 1000) + except (ValueError, AttributeError): + pass + + logger.info( + f"[{self.name}] Pages {project_name}/{deploy_id[:8]} -> {status} " + f"({environment})" + ) + + return CloudflarePagesEvent( + id=f"cf-pages-{deploy_id}", + type=event_type, + timestamp=timestamp, + raw_data=deployment, + project_name=project_name, + account_id=self.account_id, + deployment_id=deploy_id, + status=status, + url=deployment.get("url"), + preview_url=deployment.get("aliases", [None])[0] if deployment.get("aliases") else None, + environment=environment, + branch=branch, + commit_sha=commit_sha, + commit_message=commit_message, + build_duration_ms=build_duration_ms, + ) + + def _process_workers_deployment( + self, script_name: str, deployment: dict + ) -> Optional[CloudflareWorkersEvent]: + """Process a Workers deployment into an event.""" + deploy_id = deployment.get("id") + if not deploy_id: + return None + + # Check if this is a new deployment + previous = self._workers_states.get(deploy_id) + self._workers_states[deploy_id] = "deployed" + + # Skip if we already processed this deployment + if previous == "deployed": + return None + + # Parse timestamp + created_on = deployment.get("created_on") + timestamp = datetime.utcnow() + if created_on: + try: + timestamp = datetime.fromisoformat(created_on.replace("Z", "+00:00")) + except (ValueError, AttributeError): + pass + + # Get version info + versions = deployment.get("versions", []) + version_id = versions[0].get("version_id") if versions else None + + logger.info(f"[{self.name}] Workers {script_name} deployed ({deploy_id[:8]})") + + return CloudflareWorkersEvent( + id=f"cf-workers-{deploy_id}", + type=EventType.CLOUDFLARE_WORKERS_DEPLOYED, + timestamp=timestamp, + raw_data=deployment, + worker_name=script_name, + account_id=self.account_id, + deployment_id=deploy_id, + status="deployed", + version_id=version_id, + ) diff --git a/pr_agent/polling/events.py b/pr_agent/polling/events.py index 44f23cbad4..a07feb5ef1 100644 --- a/pr_agent/polling/events.py +++ b/pr_agent/polling/events.py @@ -27,6 +27,13 @@ class EventType(str, Enum): RAILWAY_DEPLOY_BUILDING = "railway.deploy.building" RAILWAY_SERVICE_CRASHED = "railway.service.crashed" + # Cloudflare events + CLOUDFLARE_PAGES_SUCCESS = "cloudflare.pages.success" + CLOUDFLARE_PAGES_FAILED = "cloudflare.pages.failed" + CLOUDFLARE_PAGES_BUILDING = "cloudflare.pages.building" + CLOUDFLARE_WORKERS_DEPLOYED = "cloudflare.workers.deployed" + CLOUDFLARE_WORKERS_FAILED = "cloudflare.workers.failed" + class Event(BaseModel): """ @@ -160,3 +167,84 @@ class GitHubPROpenedEvent(Event): default_factory=list, description="Commands to run automatically (e.g., ['/review', '/describe'])", ) + + +class CloudflarePagesEvent(Event): + """ + Event for Cloudflare Pages deployment status changes. + + Triggered when a Pages deployment succeeds, fails, or starts building. + """ + + source: str = "cloudflare" + + # Project info + project_name: str = Field(..., description="Cloudflare Pages project name") + account_id: str = Field(..., description="Cloudflare account ID") + + # Deployment info + deployment_id: str = Field(..., description="Deployment ID") + status: str = Field( + ..., description="Deployment status (active, success, failed, building)" + ) + url: Optional[str] = Field(None, description="Deployment URL") + preview_url: Optional[str] = Field(None, description="Preview URL for branch deploys") + + # Environment + environment: str = Field( + "production", description="Environment (production, preview)" + ) + branch: Optional[str] = Field(None, description="Git branch name") + + # Git info + commit_sha: Optional[str] = Field(None, description="Git commit SHA") + commit_message: Optional[str] = Field(None, description="Git commit message") + + # Build info + build_duration_ms: Optional[int] = Field(None, description="Build duration in ms") + error_message: Optional[str] = Field(None, description="Error message if failed") + + @property + def is_failure(self) -> bool: + """Check if this is a failure event.""" + return self.status == "failed" + + @property + def is_success(self) -> bool: + """Check if this is a success event.""" + return self.status in ("active", "success") + + +class CloudflareWorkersEvent(Event): + """ + Event for Cloudflare Workers deployment status changes. + + Triggered when a Worker is deployed or fails to deploy. + """ + + source: str = "cloudflare" + + # Worker info + worker_name: str = Field(..., description="Worker script name") + account_id: str = Field(..., description="Cloudflare account ID") + + # Deployment info + deployment_id: Optional[str] = Field(None, description="Deployment ID") + status: str = Field(..., description="Deployment status (deployed, failed)") + + # Version info + version_id: Optional[str] = Field(None, description="Worker version ID") + routes: list[str] = Field(default_factory=list, description="Worker routes") + + # Error info + error_message: Optional[str] = Field(None, description="Error message if failed") + + @property + def is_failure(self) -> bool: + """Check if this is a failure event.""" + return self.status == "failed" + + @property + def is_success(self) -> bool: + """Check if this is a success event.""" + return self.status == "deployed" diff --git a/pr_agent/polling/railway.py b/pr_agent/polling/railway.py index 6820441642..773f71bd31 100644 --- a/pr_agent/polling/railway.py +++ b/pr_agent/polling/railway.py @@ -2,11 +2,9 @@ Railway Deployment Poller. Polls Railway's GraphQL API for deployment status changes. - -Implementation Status: STUB -Branch: feature/railway-poller """ +from datetime import datetime from typing import Optional import logging @@ -98,20 +96,34 @@ async def poll(self) -> list[Event]: Returns: List of RailwayDeployEvent objects for deployments that changed to terminal states (SUCCESS, FAILED, CRASHED) - - TODO: Implement in feature/railway-poller branch - - Query deployments via GraphQL - - Track state changes - - Emit events only for terminal state transitions - - Filter by service_ids and environments if specified - """ - # STUB: Return empty list until implemented in feature/railway-poller branch - # This allows the daemon to start without crashing - logger.debug( - f"[{self.name}] poll() not yet implemented - " - "see feature/railway-poller branch" - ) - return [] + """ + if not self._client: + logger.warning(f"[{self.name}] HTTP client not initialized") + return [] + + try: + deployments = await self._fetch_deployments() + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + logger.warning(f"[{self.name}] Rate limited, backing off") + else: + logger.error(f"[{self.name}] HTTP error: {e}") + return [] + except Exception as e: + logger.error(f"[{self.name}] Failed to fetch deployments: {e}") + return [] + + events: list[Event] = [] + + for deployment in deployments: + event = self._process_deployment(deployment) + if event: + events.append(event) + + if events: + logger.info(f"[{self.name}] Emitting {len(events)} deployment event(s)") + + return events async def _fetch_deployments(self) -> list[dict]: """ @@ -119,13 +131,26 @@ async def _fetch_deployments(self) -> list[dict]: Returns: List of deployment dicts from GraphQL response - - TODO: Implement - - GraphQL query for deployments - - Handle pagination if needed - - Return normalized deployment data """ - raise NotImplementedError("See feature/railway-poller branch") + query = self._build_deployments_query() + variables = {"projectId": self.project_id} + + response = await self._client.post( + RAILWAY_API_URL, + json={"query": query, "variables": variables}, + headers=self._headers, + ) + response.raise_for_status() + + data = response.json() + + if "errors" in data: + errors = data["errors"] + logger.error(f"[{self.name}] GraphQL errors: {errors}") + return [] + + edges = data.get("data", {}).get("deployments", {}).get("edges", []) + return [edge["node"] for edge in edges] def _build_deployments_query(self) -> str: """ @@ -176,14 +201,86 @@ def _process_deployment(self, deployment: dict) -> Optional[RailwayDeployEvent]: Returns: RailwayDeployEvent if should emit, None otherwise - - TODO: Implement - - Check if deployment matches filters (service_ids, environments) - - Check if status changed from previous poll - - Only emit for terminal states (SUCCESS, FAILED, CRASHED) - - Create and return RailwayDeployEvent """ - raise NotImplementedError("See feature/railway-poller branch") + deploy_id = deployment.get("id") + status = deployment.get("status") + + if not deploy_id or not status: + return None + + # Get service and environment info + service = deployment.get("service", {}) + service_id = service.get("id", "") + service_name = service.get("name", "unknown") + + environment = deployment.get("environment", {}) + env_id = environment.get("id", "") + env_name = environment.get("name", "unknown") + + # Apply filters + if not self._should_process_service(service_id): + return None + if not self._should_process_environment(env_name): + return None + + # Check if status changed + previous_status = self._deployment_states.get(deploy_id) + self._deployment_states[deploy_id] = status + + # Only emit for terminal states and state changes + terminal_states = {"SUCCESS", "FAILED", "CRASHED"} + if status not in terminal_states: + return None + + # Skip if we already emitted for this terminal state + if previous_status == status: + return None + + # Get event type + event_type = self._map_status_to_event_type(status) + if not event_type: + return None + + # Extract metadata + meta = deployment.get("meta", {}) + commit_sha = meta.get("commitHash") + commit_message = meta.get("commitMessage") + + # Build logs URL + logs_url = ( + f"https://railway.app/project/{self.project_id}" + f"/service/{service_id}/deployment/{deploy_id}" + ) + + # Parse timestamp + created_at = deployment.get("createdAt") + timestamp = datetime.utcnow() + if created_at: + try: + timestamp = datetime.fromisoformat(created_at.replace("Z", "+00:00")) + except (ValueError, AttributeError): + pass + + logger.info( + f"[{self.name}] Deployment {deploy_id[:8]} -> {status} " + f"({service_name}/{env_name})" + ) + + return RailwayDeployEvent( + id=f"railway-{deploy_id}", + type=event_type, + timestamp=timestamp, + raw_data=deployment, + service_id=service_id, + service_name=service_name, + environment_id=env_id, + environment_name=env_name, + deploy_id=deploy_id, + status=status, + commit_sha=commit_sha, + commit_message=commit_message, + logs_url=logs_url, + ) def _should_process_service(self, service_id: str) -> bool: """