diff --git a/README.md b/README.md index d1f16fc..f9b734a 100644 --- a/README.md +++ b/README.md @@ -1,66 +1,76 @@ # Auto PR Bot -Automates a simple GitHub workflow: -- Ensure head branch exists (default `dev`) -- Overwrite `README.md` on head branch -- Create PR from head → base (default `dev` → `main`) -- Auto-merge the PR +Freshly structured toolkit for automating a lightweight GitHub workflow: + +- Keep a head branch (default `dev`) alive by cloning it from the base branch when missing +- Overwrite `README.md` on that branch with auto-generated content +- Create and (optionally) merge a PR from head → base, once diffs exist + +## Project Layout + +``` +autopr_bot/ + ├─ bot.py # GitHub automation logic + ├─ content.py # Random content generators + └─ ui/ # Tkinter desktop experience +main.py # Backwards-compatible script entry +ui.py # Thin launcher for the redesigned UI +``` ## Requirements -- Python 3.8+ + +- Python 3.9+ +- `requests` (install via `pip install -r requirements.txt` or `pip install requests`) - GitHub Personal Access Token (classic) with `repo` scope -- `requests` library (`pip install requests`) -## Quick Start (UI) +## Launch the UI + ```bash python ui.py ``` -Fill in: -- Token: your GitHub PAT -- Username/Owner: e.g., `octocat` -- Repository: e.g., `hello-world` -- Base Branch: e.g., `main` -- Head Branch: e.g., `dev` -- Uptime (seconds): loop interval - -Buttons: -- Run Once: single cycle (ensure branch → overwrite README → PR → merge) -- Start Loop: run every N seconds -- Stop Loop: stop background loop + +Highlights: +- Modern dark theme with status badges and live log stream +- Optional co-author section; enable it only when you want `Co-authored-by` lines +- Loop controls with interval spinbox and live indicators +- Preview generator to inspect the random PR/README copy before running anything ## Programmatic Usage + ```python -from main import AutoPRBot +from autopr_bot import AutoPRBot bot = AutoPRBot( token="", repo="owner/repo", base_branch="main", head_branch="dev", + # co_authors=[{"name": "Ada Lovelace", "email": "ada@example.com"}], ) -# Single cycle -bot.run_once() - -# Continuous loop every 60 seconds -# bot.run_loop(60) +bot.run_once() # Single cycle +# bot.run_loop(120) # Continuous automation ``` -## How It Works -1. If the head branch does not exist, it is created from the base branch -2. `README.md` on the head branch is overwritten using the Contents API -3. If there are diffs, a PR from head → base is created -4. The PR is merged automatically +## Automation Flow + +1. Ensure the head branch exists (creates it from base if missing). +2. Overwrite `README.md` on the head branch with new random content. +3. If commits exist, open a PR from head → base. +4. Merge the PR automatically if creation succeeds. -## Notes & Troubleshooting -- If you see "Validation Failed: No commits between", there were no diffs to PR. -- 404 errors usually indicate wrong `owner/repo`, branch names, or insufficient token permissions. -- Merge may fail if branch protection rules block auto-merge. -- API rate limits: increase the loop interval. +## Troubleshooting -## Security -- Use least-privilege tokens. For private repos, `repo` scope is required. -- Avoid hardcoding tokens; the UI uses in-memory token entry. +- **Validation Failed: No commits between** – nothing changed; either wait or tweak content. +- **404s / 403s** – usually incorrect `owner/repo`, branch names, or token permissions. +- **Merge blocked** – branch protection rules may prevent auto-merges. +- **Rate limits** – widen the loop interval or use fewer API calls. + +## Security Notes + +- Prefer environment variables or secret managers for tokens; never commit them. +- Limit PAT scopes to `repo`, or less if possible. ## License + MIT diff --git a/autopr_bot/__init__.py b/autopr_bot/__init__.py new file mode 100644 index 0000000..68aa0b0 --- /dev/null +++ b/autopr_bot/__init__.py @@ -0,0 +1,7 @@ +"""Auto PR Bot package.""" + +from .bot import AutoPRBot + +__all__ = ["AutoPRBot"] + + diff --git a/autopr_bot/bot.py b/autopr_bot/bot.py new file mode 100644 index 0000000..d32e857 --- /dev/null +++ b/autopr_bot/bot.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import base64 +import time +from typing import Callable, Dict, List, Optional + +import requests + +from .content import generate_pr_content, generate_readme_content + + +class AutoPRBot: + """High-level helper that keeps a GitHub PR flowing between branches.""" + + def __init__( + self, + token: str, + repo: str, + base_branch: str = "main", + head_branch: str = "dev", + logger: Optional[Callable[[str], None]] = None, + co_authors: Optional[List[Dict[str, str]]] = None, + ) -> None: + self.token = token + self.repo = repo + self.base_branch = base_branch + self.head_branch = head_branch + self.co_authors = co_authors or [] + self.headers = { + "Authorization": f"token {self.token}", + "Accept": "application/vnd.github+json", + "User-Agent": "auto-pr-bot", + } + self.log = logger if logger else print + + # ------------------------------------------------------------------ # + # Content helpers + # ------------------------------------------------------------------ # + def generate_random_content(self) -> Dict[str, object]: + return generate_pr_content() + + def generate_random_readme_content(self) -> str: + return generate_readme_content() + + # ------------------------------------------------------------------ # + # GitHub helpers + # ------------------------------------------------------------------ # + def branches_have_diffs(self) -> bool: + url = f"https://api.github.com/repos/{self.repo}/compare/{self.base_branch}...{self.head_branch}" + response = requests.get(url, headers=self.headers, timeout=30) + if not response.ok: + self.log(f"Failed to compare branches: {response.text}") + return False + data = response.json() + return data.get("ahead_by", 0) > 0 + + def ensure_branch_exists(self, branch: str) -> bool: + url_ref = f"https://api.github.com/repos/{self.repo}/git/ref/heads/{branch}" + response = requests.get(url_ref, headers=self.headers, timeout=30) + if response.ok: + return True + if response.status_code != 404: + self.log(f"Failed to check branch: {response.text}") + return False + + base_ref_url = f"https://api.github.com/repos/{self.repo}/git/ref/heads/{self.base_branch}" + base_ref = requests.get(base_ref_url, headers=self.headers, timeout=30) + if not base_ref.ok: + self.log(f"Failed to get base branch ref: {base_ref.text}") + return False + base_sha = base_ref.json().get("object", {}).get("sha") + if not base_sha: + self.log("Base branch SHA not found") + return False + + create_ref_url = f"https://api.github.com/repos/{self.repo}/git/refs" + payload = {"ref": f"refs/heads/{branch}", "sha": base_sha} + created = requests.post(create_ref_url, headers=self.headers, json=payload, timeout=30) + if not created.ok: + self.log(f"Failed to create branch: {created.text}") + return False + + self.log(f"Created branch {branch} from {self.base_branch}") + return True + + def update_readme_on_branch(self, branch: str) -> bool: + path = "README.md" + new_content = self.generate_random_readme_content() + get_url = f"https://api.github.com/repos/{self.repo}/contents/{path}?ref={branch}" + response = requests.get(get_url, headers=self.headers, timeout=30) + + sha = None + if response.status_code == 200: + data = response.json() + sha = data.get("sha") + elif response.status_code != 404 and not response.ok: + self.log(f"Failed to fetch README: {response.text}") + return False + + encoded_content = base64.b64encode(new_content.encode()).decode() + put_url = f"https://api.github.com/repos/{self.repo}/contents/{path}" + + commit_message = "chore: overwrite README via auto-update" + if self.co_authors: + co_author_lines = [ + f"Co-authored-by: {author['name']} <{author['email']}>" + for author in self.co_authors + if author.get("name") and author.get("email") + ] + if co_author_lines: + commit_message += "\n\n" + "\n".join(co_author_lines) + + payload = { + "message": commit_message, + "content": encoded_content, + "branch": branch, + } + if sha: + payload["sha"] = sha + + put_response = requests.put(put_url, headers=self.headers, json=payload, timeout=30) + if not put_response.ok: + self.log(f"Failed to update README: {put_response.text}") + return False + + self.log(f"Updated {path} on {branch}") + return True + + def create_pull_request(self) -> Optional[int]: + url = f"https://api.github.com/repos/{self.repo}/pulls" + random_content = self.generate_random_content() + data = { + "title": random_content["title"], + "head": self.head_branch, + "base": self.base_branch, + } + response = requests.post(url, headers=self.headers, json=data, timeout=30) + if response.status_code == 422: + text = response.text + if "A pull request already exists" in text: + self.log("PR already exists, skipping.") + return None + if "No commits between" in text: + self.log("No commits between branches, skipping PR creation.") + return None + + if response.ok: + pr = response.json() + self.log(f"Created PR #{pr['number']}") + return pr["number"] + + self.log(f"Failed to create PR: {response.text}") + return None + + def merge_pull_request(self, pr_number: int) -> None: + url = f"https://api.github.com/repos/{self.repo}/pulls/{pr_number}/merge" + data = {"merge_method": "merge"} + response = requests.put(url, headers=self.headers, json=data, timeout=30) + if response.ok: + self.log(f"Merged PR #{pr_number}") + else: + self.log(f"Failed to merge PR #{pr_number}: {response.text}") + + def get_open_pr(self) -> Optional[int]: + owner = self.repo.split("/")[0] + url = f"https://api.github.com/repos/{self.repo}/pulls?head={owner}:{self.head_branch}&base={self.base_branch}&state=open" + response = requests.get(url, headers=self.headers, timeout=30) + if response.ok: + prs = response.json() + if prs: + return prs[0]["number"] + return None + + # ------------------------------------------------------------------ # + # Workflows + # ------------------------------------------------------------------ # + def run_once(self) -> None: + if not self.token: + raise ValueError("❌ Please provide a GITHUB_TOKEN") + if not self.ensure_branch_exists(self.head_branch): + raise SystemExit(1) + if not self.update_readme_on_branch(self.head_branch): + raise SystemExit(1) + + self.log("\n=== Checking for PRs ===") + pr_number = self.get_open_pr() + if pr_number: + self.log(f"Found existing PR #{pr_number}, merging...") + self.merge_pull_request(pr_number) + return + + if not self.branches_have_diffs(): + self.log("No diffs between dev and main. Skipping PR.") + return + + self.log("No PR found, creating one...") + new_pr = self.create_pull_request() + if new_pr: + self.merge_pull_request(new_pr) + + def run_loop(self, interval_seconds: int) -> None: + while True: + try: + self.run_once() + except Exception as exc: + self.log(f"Error: {exc}") + time.sleep(interval_seconds) + + # ------------------------------------------------------------------ # + # Debug helpers + # ------------------------------------------------------------------ # + def display_random_content(self) -> None: + self.log("\n" + "=" * 80) + self.log("RANDOM PR CONTENT GENERATED") + self.log("=" * 80) + random_content = self.generate_random_content() + self.log(f"\nTitle: {random_content['title']}") + self.log(f"\nBody:\n{random_content['body']}") + self.log(f"\nWords Used: {random_content['words_used']}") + self.log(f"\nAll Words: {', '.join(random_content['all_words'])}") + + self.log("\n" + "=" * 80) + self.log("RANDOM README CONTENT GENERATED") + self.log("=" * 80) + readme_content = self.generate_random_readme_content() + self.log(f"\n{readme_content}") + + diff --git a/autopr_bot/content.py b/autopr_bot/content.py new file mode 100644 index 0000000..ab879ef --- /dev/null +++ b/autopr_bot/content.py @@ -0,0 +1,143 @@ +import random +from datetime import datetime, timezone +from typing import Dict, List + + +CODING_WORDS: List[str] = [ + "algorithm", "array", "boolean", "class", "compiler", "debugger", "function", + "variable", "string", "integer", "object", "method", "parameter", "return", + "loop", "condition", "statement", "expression", "operator", "syntax", "semantic", + "recursion", "iteration", "inheritance", "polymorphism", "encapsulation", "abstraction", + "interface", "implementation", "constructor", "destructor", "pointer", "reference", + "memory", "allocation", "deallocation", "garbage", "collection", "optimization", + "performance", "efficiency", "complexity", "asymptotic", "big", "notation", + "data", "structure", "stack", "queue", "tree", "graph", "hash", "table", + "binary", "search", "sorting", "bubble", "merge", "quick", "heap", "radix", + "database", "query", "sql", "schema", "table", "index", "transaction", "commit", + "rollback", "concurrency", "threading", "synchronization", "mutex", "semaphore", + "deadlock", "race", "condition", "parallel", "distributed", "microservice", + "api", "rest", "json", "xml", "http", "https", "endpoint", "request", "response", + "authentication", "authorization", "encryption", "decryption", "security", "vulnerability", + "testing", "unit", "integration", "regression", "coverage", "mock", "stub", "fixture", + "deployment", "ci", "cd", "pipeline", "docker", "kubernetes", "container", "orchestration", +] + + +def _select_words(sample_size: int) -> List[str]: + return random.sample(CODING_WORDS, min(sample_size, len(CODING_WORDS))) + + +def generate_pr_content() -> Dict[str, object]: + """Generate a pseudo-random pull request title and body.""" + selected_words = _select_words(50) + title_words = random.sample(selected_words, 3) + title = f"feat: implement {title_words[0]} {title_words[1]} {title_words[2]} optimization" + + def _section(name: str, lines: List[str]) -> str: + return f"\n## {name}\n" + "\n".join(f"- {line}" for line in lines) + "\n" + + what_changed = _section( + "What Changed", + [ + f"Enhanced {random.choice(selected_words)} {random.choice(selected_words)} processing", + f"Improved {random.choice(selected_words)} {random.choice(selected_words)} performance", + f"Added {random.choice(selected_words)} {random.choice(selected_words)} validation", + f"Refactored {random.choice(selected_words)} {random.choice(selected_words)} logic", + f"Updated {random.choice(selected_words)} {random.choice(selected_words)} configuration", + ], + ) + + technical_details = """ +## Technical Details +This PR introduces significant improvements to the {0} {1} system: + +- **{2} {3}**: Implemented advanced {4} {5} algorithms +- **{6} {7}**: Enhanced {8} {9} processing capabilities +- **{10} {11}**: Optimized {12} {13} performance metrics +- **{14} {15}**: Added robust {16} {17} error handling +- **{18} {19}**: Improved {20} {21} security protocols + +The implementation leverages modern {22} {23} patterns and follows best practices for {24} {25} development. +""".format(*(random.choice(selected_words) for _ in range(26))) + + performance = _section( + "Performance Improvements", + [ + f"Reduced {random.choice(selected_words)} {random.choice(selected_words)} latency by 40%", + f"Optimized {random.choice(selected_words)} {random.choice(selected_words)} memory usage", + f"Enhanced {random.choice(selected_words)} {random.choice(selected_words)} throughput", + f"Improved {random.choice(selected_words)} {random.choice(selected_words)} scalability", + f"Streamlined {random.choice(selected_words)} {random.choice(selected_words)} operations", + ], + ) + + testing = _section( + "Testing", + [ + f"Added comprehensive {random.choice(selected_words)} {random.choice(selected_words)} unit tests", + f"Implemented {random.choice(selected_words)} {random.choice(selected_words)} integration tests", + f"Enhanced {random.choice(selected_words)} {random.choice(selected_words)} regression testing", + f"Improved {random.choice(selected_words)} {random.choice(selected_words)} test coverage", + f"Added {random.choice(selected_words)} {random.choice(selected_words)} performance benchmarks", + ], + ) + + code_quality = _section( + "Code Quality", + [ + f"Applied {random.choice(selected_words)} {random.choice(selected_words)} design patterns", + f"Implemented {random.choice(selected_words)} {random.choice(selected_words)} best practices", + f"Enhanced {random.choice(selected_words)} {random.choice(selected_words)} documentation", + f"Improved {random.choice(selected_words)} {random.choice(selected_words)} maintainability", + f"Added {random.choice(selected_words)} {random.choice(selected_words)} type safety", + ], + ) + + footer = f""" +--- +**Generated**: {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")} +""" + + return { + "title": title, + "body": what_changed + technical_details + performance + testing + code_quality + footer, + "words_used": len(selected_words), + "all_words": selected_words, + } + + +def generate_readme_content() -> str: + """Generate random README content.""" + selected_words = _select_words(30) + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") + + content = f"""# Generated README + +This file was updated on {timestamp}. + +## Project Overview +This repository demonstrates automated {random.choice(selected_words)} {random.choice(selected_words)} workflows using advanced {random.choice(selected_words)} {random.choice(selected_words)} techniques. + +## Features +- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Implements robust {random.choice(selected_words)} {random.choice(selected_words)} processing +- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Enhanced {random.choice(selected_words)} {random.choice(selected_words)} performance optimization +- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Advanced {random.choice(selected_words)} {random.choice(selected_words)} error handling +- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Secure {random.choice(selected_words)} {random.choice(selected_words)} authentication +- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Efficient {random.choice(selected_words)} {random.choice(selected_words)} data structures + +## Technical Implementation +The system utilizes modern {random.choice(selected_words)} {random.choice(selected_words)} patterns and follows industry best practices for {random.choice(selected_words)} {random.choice(selected_words)} development. + +## Performance Metrics +- Optimized {random.choice(selected_words)} {random.choice(selected_words)} algorithms +- Enhanced {random.choice(selected_words)} {random.choice(selected_words)} memory management +- Improved {random.choice(selected_words)} {random.choice(selected_words)} scalability +- Streamlined {random.choice(selected_words)} {random.choice(selected_words)} operations + +""" + return content + + +__all__ = ["CODING_WORDS", "generate_pr_content", "generate_readme_content"] + + diff --git a/autopr_bot/ui/__init__.py b/autopr_bot/ui/__init__.py new file mode 100644 index 0000000..dc9e8cc --- /dev/null +++ b/autopr_bot/ui/__init__.py @@ -0,0 +1,7 @@ +"""UI helpers for Auto PR Bot.""" + +from .app import App, launch + +__all__ = ["App", "launch"] + + diff --git a/autopr_bot/ui/app.py b/autopr_bot/ui/app.py new file mode 100644 index 0000000..b4fac78 --- /dev/null +++ b/autopr_bot/ui/app.py @@ -0,0 +1,499 @@ +from __future__ import annotations + +import threading +import tkinter as tk +import tkinter.font as tkfont +from datetime import datetime +from tkinter import messagebox, ttk +from typing import Optional + +from autopr_bot import AutoPRBot +from autopr_bot.content import generate_pr_content, generate_readme_content + +from .coauthors import CoAuthorManager +from .logger import TextLogger +from .styles import PALETTE, apply_style + + +class App(tk.Tk): + def __init__(self) -> None: + super().__init__() + self.title("Auto PR Bot Studio") + self.geometry("1120x720") + self.resizable(False, False) + self.columnconfigure(0, weight=1) + self.rowconfigure(0, weight=1) + + apply_style(self) + self._set_base_fonts() + self._icon_image = self._build_icon() + self.iconphoto(True, self._icon_image) + + # Async helpers + self._bg_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + # Tk variables + self.token_var = tk.StringVar() + self.username_var = tk.StringVar() + self.repo_var = tk.StringVar() + self.base_branch_var = tk.StringVar(value="main") + self.head_branch_var = tk.StringVar(value="dev") + self.uptime_var = tk.IntVar(value=60) + self.use_co_authors_var = tk.BooleanVar(value=True) + self.status_var = tk.StringVar(value="Idle • Waiting for configuration") + self.loop_var = tk.StringVar(value="Loop stopped.") + + self._build_ui() + self.protocol("WM_DELETE_WINDOW", self._on_close) + + # ------------------------------------------------------------------ # + def _build_ui(self) -> None: + container = ttk.Frame(self, style="Root.TFrame", padding=12) + container.grid(row=0, column=0, sticky="nsew") + container.columnconfigure(0, weight=1) + container.rowconfigure(1, weight=1) + + header = ttk.Frame(container, style="CardAccent.TFrame", padding=(14, 12)) + header.grid(row=0, column=0, sticky="ew", pady=(0, 10)) + header.columnconfigure(0, weight=1) + header.columnconfigure(1, weight=0) + header.columnconfigure(2, weight=0) + + title_frame = ttk.Frame(header, style="CardAccent.TFrame") + title_frame.grid(row=0, column=0, sticky="w") + + ttk.Label(title_frame, text="Auto PR Bot Studio", style="HeroHeading.TLabel").pack(anchor="w") + ttk.Label( + title_frame, + text="Automate README refreshes, PR creation, and merges — all inside a live dashboard.", + style="HeroSubheading.TLabel", + ).pack(anchor="w", pady=(4, 0)) + + controls_frame = ttk.Frame(header, style="CardAccent.TFrame") + controls_frame.grid(row=0, column=2, sticky="e", padx=(0, 0)) + + self.status_label = ttk.Label(controls_frame, textvariable=self.status_var, style="StatusInfo.TLabel") + self.status_label.pack(anchor="e", pady=(0, 6)) + + ttk.Button(controls_frame, text="Preview Sample Content", command=self.on_preview_content).pack(anchor="e") + + metrics_frame = ttk.Frame(header, style="CardAccent.TFrame") + metrics_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(10, 0)) + metrics = [ + ("Base Branch", self.base_branch_var), + ("Head Branch", self.head_branch_var), + ("Loop Interval (s)", self.uptime_var), + ] + for idx, (label, var) in enumerate(metrics): + col = ttk.Frame(metrics_frame, style="CardAccent.TFrame") + col.grid(row=0, column=idx, sticky="ew", padx=(0, 12 if idx < len(metrics) - 1 else 0)) + metrics_frame.columnconfigure(idx, weight=1) + ttk.Label(col, text=label, style="HeroSubheading.TLabel").pack(anchor="w") + ttk.Label(col, textvariable=var, style="HeroMetric.TLabel").pack(anchor="w", pady=(1, 0)) + + paned = ttk.Panedwindow(container, orient=tk.HORIZONTAL) + paned.grid(row=1, column=0, sticky="nsew") + + forms_panel = ttk.Frame(paned, style="Root.TFrame") + forms_panel.columnconfigure(0, weight=1) + forms_panel.rowconfigure(0, weight=1) + paned.add(forms_panel, weight=3) + + # Create scrollable frame for cards + canvas = tk.Canvas(forms_panel, bg=PALETTE["bg"], highlightthickness=0) + scrollbar_forms = ttk.Scrollbar(forms_panel, orient=tk.VERTICAL, command=canvas.yview, style="Vertical.TScrollbar") + scrollable_frame = ttk.Frame(canvas, style="Root.TFrame") + + def update_scroll_region(event=None): + canvas.update_idletasks() + canvas.configure(scrollregion=canvas.bbox("all")) + + scrollable_frame.bind("", update_scroll_region) + + def on_canvas_configure(event): + canvas_width = event.width + canvas.itemconfig(canvas_window, width=canvas_width) + + canvas_window = canvas.create_window((0, 0), window=scrollable_frame, anchor="nw") + canvas.configure(yscrollcommand=scrollbar_forms.set) + canvas.bind("", on_canvas_configure) + + # Mousewheel scrolling + def on_mousewheel(event): + canvas.yview_scroll(int(-1 * (event.delta / 120)), "units") + + canvas.bind("", on_mousewheel) + scrollable_frame.bind("", on_mousewheel) + + # Also bind to child widgets + def bind_to_children(parent): + for child in parent.winfo_children(): + try: + child.bind("", on_mousewheel) + bind_to_children(child) + except: + pass + + bind_to_children(scrollable_frame) + + canvas.grid(row=0, column=0, sticky="nsew") + scrollbar_forms.grid(row=0, column=1, sticky="ns") + + cards_container = ttk.Frame(scrollable_frame, style="Root.TFrame") + cards_container.pack(fill=tk.BOTH, expand=True) + cards_container.columnconfigure(0, weight=1) + + # Store canvas reference for later updates + self._cards_canvas = canvas + self._scrollable_frame = scrollable_frame + + self._build_config_card(cards_container, row=0) + self._build_automation_card(cards_container, row=1) + self._build_collaboration_card(cards_container, row=2) + + log_panel = ttk.Frame(paned, style="Root.TFrame") + log_panel.columnconfigure(0, weight=1) + log_panel.rowconfigure(0, weight=1) + paned.add(log_panel, weight=2) + + log_card = ttk.LabelFrame(log_panel, text="Activity Stream", style="Card.TLabelframe") + log_card.grid(row=0, column=0, sticky="nsew") + log_card.columnconfigure(0, weight=1) + log_card.rowconfigure(1, weight=1) + + ttk.Label( + log_card, + text="Live output from GitHub operations, loop events, and previews.", + style="Muted.TLabel", + ).grid(row=0, column=0, sticky="w", pady=(0, 6)) + + log_container = ttk.Frame(log_card, style="Card.TFrame") + log_container.grid(row=1, column=0, sticky="nsew") + log_container.columnconfigure(0, weight=1) + log_container.rowconfigure(0, weight=1) + + self.log_text = tk.Text( + log_container, + height=20, + bg=PALETTE["surface"], + fg=PALETTE["text"], + insertbackground=PALETTE["accent"], + relief=tk.FLAT, + font=("Consolas", 8), + wrap=tk.WORD, + padx=4, + pady=4, + selectbackground=PALETTE["accent"], + selectforeground=PALETTE["bg"], + ) + self.log_text.grid(row=0, column=0, sticky="nsew") + scrollbar = ttk.Scrollbar(log_container, orient=tk.VERTICAL, command=self.log_text.yview) + scrollbar.grid(row=0, column=1, sticky="ns") + self.log_text["yscrollcommand"] = scrollbar.set + + log_footer = ttk.Frame(log_card, style="Card.TFrame") + log_footer.grid(row=2, column=0, sticky="ew", pady=(12, 0)) + log_footer.columnconfigure(0, weight=1) + ttk.Label(log_footer, text="Logs auto-scroll during runs.", style="Muted.TLabel").grid(row=0, column=0, sticky="w", padx=(0, 12)) + ttk.Button(log_footer, text="Clear Logs", command=lambda: self.log_text.delete("1.0", tk.END)).grid( + row=0, column=1, sticky="e" + ) + + sizegrip = ttk.Sizegrip(container, style="TSizegrip") + sizegrip.grid(row=2, column=0, sticky="se", pady=(12, 0)) + + self.logger = TextLogger(self.log_text) + + def _build_config_card(self, parent: ttk.Frame, row: int) -> None: + card = ttk.LabelFrame(parent, text="Repository Settings", style="Card.TLabelframe") + card.grid(row=row, column=0, sticky="nsew", pady=(0, 8)) + card.columnconfigure(1, weight=1) + + fields = [ + ("GitHub Token", self.token_var, {"show": "*"}), + ("Username / Owner", self.username_var, {}), + ("Repository", self.repo_var, {}), + ("Base Branch", self.base_branch_var, {}), + ("Head Branch", self.head_branch_var, {}), + ] + + for idx, (label, var, extra) in enumerate(fields): + ttk.Label(card, text=label, style="Card.TLabel").grid(row=idx, column=0, sticky="w", pady=3, padx=(0, 6)) + entry = ttk.Entry(card, textvariable=var, **extra) + entry.grid(row=idx, column=1, sticky="ew", pady=3) + + def _build_automation_card(self, parent: ttk.Frame, row: int) -> None: + card = ttk.LabelFrame(parent, text="Automation", style="Card.TLabelframe") + card.grid(row=row, column=0, sticky="nsew", pady=(0, 8)) + card.columnconfigure(0, weight=1) + + interval_row = ttk.Frame(card, style="Card.TFrame") + interval_row.grid(row=0, column=0, sticky="ew") + ttk.Label(interval_row, text="Loop Interval (seconds)", style="Card.TLabel").pack(side=tk.LEFT) + ttk.Spinbox( + interval_row, + textvariable=self.uptime_var, + from_=10, + to=3600, + increment=10, + width=10, + ).pack(side=tk.RIGHT) + + self.loop_label = ttk.Label(card, textvariable=self.loop_var, style="Muted.TLabel") + self.loop_label.grid(row=1, column=0, sticky="w", pady=(6, 8)) + + buttons = ttk.Frame(card, style="Card.TFrame") + buttons.grid(row=2, column=0, sticky="ew", pady=(2, 0)) + buttons.columnconfigure(0, weight=1) + buttons.columnconfigure(1, weight=1) + buttons.columnconfigure(2, weight=1) + + self.run_button = ttk.Button(buttons, text="Run Once", style="Accent.TButton", command=self.on_run_once) + self.run_button.grid(row=0, column=0, sticky="ew", padx=(0, 4)) + + self.loop_button = ttk.Button(buttons, text="Start Loop", command=self.on_start_loop) + self.loop_button.grid(row=0, column=1, sticky="ew", padx=(0, 4)) + + self.stop_button = ttk.Button( + buttons, + text="Stop Loop", + style="Danger.TButton", + command=self.on_stop_loop, + state=tk.DISABLED, + ) + self.stop_button.grid(row=0, column=2, sticky="ew") + + def _build_collaboration_card(self, parent: ttk.Frame, row: int) -> None: + card = ttk.LabelFrame(parent, text="Collaboration", style="Card.TLabelframe") + card.grid(row=row, column=0, sticky="new") + card.columnconfigure(0, weight=1) + + ttk.Checkbutton( + card, + text="Include co-authors in README commits", + variable=self.use_co_authors_var, + command=self._toggle_co_authors, + ).grid(row=0, column=0, sticky="w", pady=(0, 4)) + + ttk.Label( + card, + text="When enabled, commits will include the selected collaborators.", + style="Muted.TLabel", + ).grid(row=1, column=0, sticky="w", pady=(0, 6)) + + self.co_author_manager = CoAuthorManager(card) + self.co_author_manager.frame.grid(row=2, column=0, sticky="new") + if not self.co_author_manager.has_rows(): + self.co_author_manager.add_row() + self._toggle_co_authors() + + # ------------------------------------------------------------------ # + def _make_bot(self) -> AutoPRBot: + token = self.token_var.get().strip() + if not token: + raise ValueError("Token is required.") + + owner = self.username_var.get().strip() + repo = self.repo_var.get().strip() + if not owner or not repo: + raise ValueError("Provide both the repository owner and name.") + + repo_full = f"{owner}/{repo}" + co_authors = self.co_author_manager.get_co_authors() if self.use_co_authors_var.get() else [] + + return AutoPRBot( + token=token, + repo=repo_full, + base_branch=self.base_branch_var.get().strip() or "main", + head_branch=self.head_branch_var.get().strip() or "dev", + logger=self.logger, + co_authors=co_authors, + ) + + def on_run_once(self) -> None: + try: + bot = self._make_bot() + except Exception as exc: + messagebox.showerror("Configuration Error", str(exc)) + return + + self.run_button.config(state=tk.DISABLED) + thread = threading.Thread(target=self._run_once_thread, args=(bot,), daemon=True) + thread.start() + + def _run_once_thread(self, bot: AutoPRBot) -> None: + self._set_status("Running single sync…", "info") + try: + bot.run_once() + self._set_status("Run completed successfully.", "success") + except Exception as exc: + bot.log(f"Error: {exc}") + self._set_status("Run failed. Check logs for details.", "danger") + finally: + self.after(0, lambda: self.run_button.config(state=tk.NORMAL)) + + def on_start_loop(self) -> None: + if self._bg_thread and self._bg_thread.is_alive(): + messagebox.showinfo("Loop Running", "The loop is already running.") + return + + try: + bot = self._make_bot() + except Exception as exc: + messagebox.showerror("Configuration Error", str(exc)) + return + + interval = max(10, int(self.uptime_var.get() or 60)) + self._stop_event.clear() + self.loop_var.set(f"Loop running every {interval}s.") + self._set_status("Loop running…", "success") + + self.loop_button.config(state=tk.DISABLED) + self.stop_button.config(state=tk.NORMAL) + self.run_button.config(state=tk.DISABLED) + + self._bg_thread = threading.Thread(target=self._loop_worker, args=(bot, interval), daemon=True) + self._bg_thread.start() + + def _loop_worker(self, bot: AutoPRBot, interval: int) -> None: + while not self._stop_event.is_set(): + try: + bot.run_once() + timestamp = datetime.now().strftime("%H:%M:%S") + self._set_loop_message(f"Last run completed at {timestamp}.") + except Exception as exc: + bot.log(f"Error: {exc}") + self._set_status("Loop encountered errors. See logs.", "danger") + if self._stop_event.wait(interval): + break + self._set_loop_message("Loop stopped.") + self._set_status("Loop stopped.", "warning") + self.after(0, self._reset_controls_after_loop) + + def on_stop_loop(self) -> None: + if self._bg_thread and self._bg_thread.is_alive(): + self._stop_event.set() + self._set_status("Stopping loop…", "warning") + self.stop_button.config(state=tk.DISABLED) + + def _reset_controls_after_loop(self) -> None: + self.loop_button.config(state=tk.NORMAL) + self.run_button.config(state=tk.NORMAL) + self.stop_button.config(state=tk.DISABLED) + + def _toggle_co_authors(self) -> None: + if self.use_co_authors_var.get(): + self.co_author_manager.frame.grid() + if not self.co_author_manager.has_rows(): + self.co_author_manager.add_row() + else: + self.co_author_manager.frame.grid_remove() + # Update scroll region when toggling + if hasattr(self, '_cards_canvas'): + self.after(10, lambda: self._cards_canvas.configure(scrollregion=self._cards_canvas.bbox("all"))) + + def on_preview_content(self) -> None: + pr = generate_pr_content() + readme = generate_readme_content() + self.logger("\n--- SAMPLE PR CONTENT ---") + self.logger(f"Title: {pr['title']}") + self.logger(pr["body"]) + self.logger("--- SAMPLE README CONTENT ---") + self.logger(readme) + + def _set_status(self, message: str, level: str = "info") -> None: + style_map = { + "info": "StatusInfo.TLabel", + "success": "StatusSuccess.TLabel", + "warning": "StatusWarning.TLabel", + "danger": "StatusDanger.TLabel", + } + style = style_map.get(level, "StatusInfo.TLabel") + + def update() -> None: + self.status_var.set(message) + self.status_label.configure(style=style) + + self.after(0, update) + + def _set_loop_message(self, message: str) -> None: + self.after(0, lambda: self.loop_var.set(message)) + + def _on_close(self) -> None: + self._stop_event.set() + self.destroy() + + def run(self) -> None: + self.mainloop() + + # ------------------------------------------------------------------ # + # Window helpers + # ------------------------------------------------------------------ # + def _set_base_fonts(self) -> None: + base_fonts = [ + "TkDefaultFont", + "TkMenuFont", + "TkTextFont", + "TkFixedFont", + "TkHeadingFont", + "TkTooltipFont", + "TkIconFont", + "TkCaptionFont", + "TkSmallCaptionFont", + ] + for font_name in base_fonts: + try: + tkfont.nametofont(font_name).configure(size=8) + except tk.TclError: + continue + + def _build_icon(self) -> tk.PhotoImage: + size = 64 + icon = tk.PhotoImage(width=size, height=size) + + # Gradient background + for y in range(size): + ratio = y / size + r1, g1, b1 = 56, 189, 248 # accent + r2, g2, b2 = 14, 165, 233 # accent_dark + r = int(r1 + (r2 - r1) * ratio) + g = int(g1 + (g2 - g1) * ratio) + b = int(b1 + (b2 - b1) * ratio) + color = f"#{r:02x}{g:02x}{b:02x}" + icon.put(color, to=(0, y, size, y + 1)) + + # Draw stylized "PR" letters + # Letter P + for y in range(14, 50): + icon.put(PALETTE["bg"], to=(14, y, 18, y + 1)) + for x in range(14, 30): + icon.put(PALETTE["bg"], to=(x, 14, x + 1, 18)) + icon.put(PALETTE["bg"], to=(x, 28, x + 1, 32)) + for y in range(14, 32): + icon.put(PALETTE["bg"], to=(26, y, 30, y + 1)) + + # Letter R + for y in range(14, 50): + icon.put(PALETTE["bg"], to=(34, y, 38, y + 1)) + for x in range(34, 50): + icon.put(PALETTE["bg"], to=(x, 14, x + 1, 18)) + icon.put(PALETTE["bg"], to=(x, 28, x + 1, 32)) + for y in range(14, 32): + icon.put(PALETTE["bg"], to=(46, y, 50, y + 1)) + # R diagonal + for i in range(12): + icon.put(PALETTE["bg"], to=(38 + i, 32 + i, 42 + i, 36 + i)) + + return icon + + +def launch() -> None: + app = App() + app.run() + + +if __name__ == "__main__": + launch() + + diff --git a/autopr_bot/ui/coauthors.py b/autopr_bot/ui/coauthors.py new file mode 100644 index 0000000..a864ecd --- /dev/null +++ b/autopr_bot/ui/coauthors.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import List +import tkinter as tk +from tkinter import ttk + + +@dataclass +class CoAuthorRow: + frame: ttk.Frame + name_var: tk.StringVar + email_var: tk.StringVar + + +class CoAuthorManager: + """Manage dynamic co-author inputs.""" + + def __init__(self, master: ttk.Widget): + self.master = master + self.frame = ttk.Frame(master, style="Card.TFrame") + self.frame.columnconfigure(0, weight=1) + self.rows: List[CoAuthorRow] = [] + + controls = ttk.Frame(self.frame, style="Card.TFrame") + controls.grid(row=0, column=0, sticky="ew") + ttk.Label( + controls, + text="Add collaborators who should appear in commit metadata.", + style="Muted.TLabel", + ).pack(side=tk.LEFT) + + ttk.Button( + controls, + text="+ Add Co-Author", + style="Accent.TButton", + command=self.add_row, + ).pack(side=tk.RIGHT, padx=(4, 0)) + + self.list_frame = ttk.Frame(self.frame, style="Card.TFrame") + self.list_frame.grid(row=1, column=0, sticky="nsew", pady=(6, 0)) + self.list_frame.columnconfigure(0, weight=1) + self.list_frame.columnconfigure(1, weight=2) + + header = ttk.Frame(self.list_frame, style="Card.TFrame") + header.grid(row=0, column=0, columnspan=3, sticky="ew", pady=(0, 4)) + header.columnconfigure(0, weight=1) + header.columnconfigure(1, weight=2) + ttk.Label(header, text="Name", style="Card.TLabel", font=("Segoe UI", 8, "bold")).grid(row=0, column=0, sticky="w") + ttk.Label(header, text="Email", style="Card.TLabel", font=("Segoe UI", 8, "bold")).grid(row=0, column=1, sticky="w") + + # ------------------------------------------------------------------ # + def add_row(self) -> None: + row_index = len(self.rows) + 1 # +1 because row 0 is the header + row_frame = ttk.Frame(self.list_frame, style="Card.TFrame") + row_frame.grid(row=row_index, column=0, columnspan=3, sticky="ew", pady=2) + row_frame.columnconfigure(0, weight=1) + row_frame.columnconfigure(1, weight=2) + + name_var = tk.StringVar() + email_var = tk.StringVar() + + name_entry = ttk.Entry(row_frame, textvariable=name_var) + name_entry.grid(row=0, column=0, sticky="ew", padx=(0, 6)) + + email_entry = ttk.Entry(row_frame, textvariable=email_var) + email_entry.grid(row=0, column=1, sticky="ew", padx=(0, 6)) + + ttk.Button( + row_frame, + text="Remove", + command=lambda: self._remove_row(row_frame), + ).grid(row=0, column=2, sticky="e") + + self.rows.append(CoAuthorRow(row_frame, name_var, email_var)) + + def _remove_row(self, frame: ttk.Frame) -> None: + frame.destroy() + self.rows = [row for row in self.rows if row.frame != frame] + # Re-grid remaining rows to maintain proper order + for idx, row in enumerate(self.rows, start=1): + row.frame.grid(row=idx, column=0, columnspan=3, sticky="ew", pady=2) + + def clear(self) -> None: + for row in list(self.rows): + row.frame.destroy() + self.rows.clear() + + def get_co_authors(self) -> List[dict]: + authors: List[dict] = [] + for row in self.rows: + name = row.name_var.get().strip() + email = row.email_var.get().strip() + if name and email: + authors.append({"name": name, "email": email}) + return authors + + def has_rows(self) -> bool: + return bool(self.rows) + + # Convenience for toggling visibility + def show(self) -> None: + self.frame.grid() + + def hide(self) -> None: + self.frame.grid_remove() + + diff --git a/autopr_bot/ui/logger.py b/autopr_bot/ui/logger.py new file mode 100644 index 0000000..62f00a7 --- /dev/null +++ b/autopr_bot/ui/logger.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from datetime import datetime +import tkinter as tk + + +class TextLogger: + """Thread-safe logger that streams messages into a text widget.""" + + def __init__(self, text_widget: tk.Text, max_lines: int = 500) -> None: + self.text_widget = text_widget + self.max_lines = max_lines + + def __call__(self, message: str) -> None: + timestamp = datetime.now().strftime("%H:%M:%S") + formatted = f"[{timestamp}] {message}" + self.text_widget.after(0, self._append, formatted) + + def _append(self, message: str) -> None: + self.text_widget.insert(tk.END, message + "\n") + self.text_widget.see(tk.END) + self._trim_lines() + + def _trim_lines(self) -> None: + lines = int(self.text_widget.index("end-1c").split(".")[0]) + if lines > self.max_lines: + self.text_widget.delete("1.0", f"{lines - self.max_lines}.0") + + diff --git a/autopr_bot/ui/styles.py b/autopr_bot/ui/styles.py new file mode 100644 index 0000000..c0d3317 --- /dev/null +++ b/autopr_bot/ui/styles.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +import tkinter as tk +from tkinter import ttk + + +PALETTE = { + "bg": "#0f172a", + "surface": "#1e293b", + "card": "#16223a", + "muted": "#94a3b8", + "text": "#f8fafc", + "accent": "#38bdf8", + "accent_dark": "#0ea5e9", + "success": "#22c55e", + "warning": "#fbbf24", + "danger": "#ef4444", +} + + +def apply_style(root: tk.Tk) -> None: + """Configure ttk styles for a modern dark interface.""" + style = ttk.Style(root) + try: + style.theme_use("clam") + except tk.TclError: + pass + + root.configure(bg=PALETTE["bg"]) + + # Base styles + style.configure("Root.TFrame", background=PALETTE["bg"]) + style.configure("TFrame", background=PALETTE["surface"]) + style.configure("Card.TFrame", background=PALETTE["card"], relief="flat") + style.configure( + "CardAccent.TFrame", + background=PALETTE["accent"], + relief="flat", + borderwidth=0, + ) + style.configure("Card.TLabel", background=PALETTE["card"], foreground=PALETTE["text"]) + style.configure( + "Card.TLabelframe", + background=PALETTE["card"], + foreground=PALETTE["text"], + borderwidth=1, + bordercolor=PALETTE["surface"], + relief="flat", + padding=5, + ) + style.configure( + "Card.TLabelframe.Label", + background=PALETTE["card"], + foreground=PALETTE["accent"], + font=("Segoe UI Semibold", 9), + ) + + style.configure( + "Header.TLabel", + background=PALETTE["card"], + foreground=PALETTE["text"], + font=("Segoe UI", 12, "bold"), + ) + style.configure( + "Subheader.TLabel", + background=PALETTE["card"], + foreground=PALETTE["muted"], + font=("Segoe UI", 9), + ) + style.configure( + "Muted.TLabel", + background=PALETTE["card"], + foreground=PALETTE["muted"], + font=("Segoe UI", 8), + ) + style.configure( + "HeroHeading.TLabel", + background=PALETTE["accent"], + foreground=PALETTE["bg"], + font=("Segoe UI Semibold", 12), + ) + style.configure( + "HeroSubheading.TLabel", + background=PALETTE["accent"], + foreground=PALETTE["bg"], + font=("Segoe UI", 9), + ) + style.configure( + "HeroMetric.TLabel", + background=PALETTE["accent"], + foreground=PALETTE["bg"], + font=("Segoe UI", 12, "bold"), + ) + style.configure( + "Metric.TLabel", + background=PALETTE["card"], + foreground=PALETTE["text"], + font=("Segoe UI", 9, "bold"), + padding=(12, 6), + ) + style.configure( + "MetricValue.TLabel", + background=PALETTE["card"], + foreground=PALETTE["muted"], + font=("Segoe UI", 8), + ) + + # Buttons + style.configure( + "TButton", + background=PALETTE["surface"], + foreground=PALETTE["text"], + font=("Segoe UI", 8), + padding=(8, 4), + borderwidth=0, + relief="flat", + ) + style.map( + "TButton", + background=[("active", PALETTE["card"]), ("disabled", "#1f2937")], + foreground=[("disabled", PALETTE["muted"])], + ) + + style.configure( + "Accent.TButton", + background=PALETTE["accent"], + foreground=PALETTE["bg"], + font=("Segoe UI Semibold", 8), + padding=(8, 4), + borderwidth=0, + relief="flat", + ) + style.map( + "Accent.TButton", + background=[("active", PALETTE["accent_dark"]), ("disabled", "#1f2937")], + foreground=[("disabled", PALETTE["muted"])], + ) + + style.configure( + "Danger.TButton", + background=PALETTE["danger"], + foreground=PALETTE["text"], + font=("Segoe UI Semibold", 8), + padding=(8, 4), + borderwidth=0, + relief="flat", + ) + style.map( + "Danger.TButton", + background=[("active", "#dc2626"), ("disabled", "#1f2937")], + foreground=[("disabled", PALETTE["muted"])], + ) + + # Inputs + entry_style = { + "background": PALETTE["surface"], + "foreground": PALETTE["text"], + "fieldbackground": PALETTE["surface"], + "bordercolor": PALETTE["accent"], + "lightcolor": PALETTE["accent"], + "darkcolor": PALETTE["surface"], + "insertcolor": PALETTE["accent"], + "selectbackground": PALETTE["accent"], + "selectforeground": PALETTE["bg"], + } + style.configure("TEntry", **entry_style, padding=4) + style.configure("TCombobox", **entry_style, padding=4) + style.configure("TSpinbox", **entry_style, padding=4) + style.map( + "TEntry", + bordercolor=[("focus", PALETTE["accent"]), ("!focus", PALETTE["surface"])], + ) + style.configure( + "TCheckbutton", + background=PALETTE["card"], + foreground=PALETTE["text"], + font=("Segoe UI", 8), + ) + style.map( + "TCheckbutton", + background=[("active", PALETTE["card"]), ("selected", PALETTE["card"])], + indicatorcolor=[("selected", PALETTE["accent"])], + ) + + # Status labels + style.configure( + "StatusInfo.TLabel", + background=PALETTE["accent_dark"], + foreground=PALETTE["bg"], + padding=(6, 2), + font=("Segoe UI", 8, "bold"), + ) + + # Misc elements + style.configure( + "TPanedwindow", + background=PALETTE["bg"], + borderwidth=0, + sashwidth=16, + ) + style.map( + "TPanedwindow", + background=[("background", PALETTE["bg"])], + ) + # Note: Sash styling may not work on all platforms + try: + style.configure( + "Sash", + sashthickness=10, + background=PALETTE["muted"], + ) + style.map( + "Sash", + background=[("active", PALETTE["accent"])], + ) + except tk.TclError: + pass + style.configure( + "Vertical.TScrollbar", + background=PALETTE["surface"], + troughcolor=PALETTE["card"], + bordercolor=PALETTE["card"], + arrowcolor=PALETTE["text"], + ) + style.map( + "Vertical.TScrollbar", + background=[("active", PALETTE["accent"])], + ) + style.configure( + "TSizegrip", + background=PALETTE["bg"], + ) + style.configure( + "StatusSuccess.TLabel", + background=PALETTE["success"], + foreground=PALETTE["bg"], + padding=(6, 2), + font=("Segoe UI", 8, "bold"), + ) + style.configure( + "StatusWarning.TLabel", + background=PALETTE["warning"], + foreground=PALETTE["bg"], + padding=(6, 2), + font=("Segoe UI", 8, "bold"), + ) + style.configure( + "StatusDanger.TLabel", + background=PALETTE["danger"], + foreground=PALETTE["bg"], + padding=(6, 2), + font=("Segoe UI", 8, "bold"), + ) + + diff --git a/main.py b/main.py index e00f375..0398e7e 100644 --- a/main.py +++ b/main.py @@ -1,345 +1,18 @@ -import time -import requests -import base64 -import random -from datetime import datetime, timezone -from typing import Callable, Optional # seconds +"""Backwards-compatible entry point for the Auto PR Bot.""" -# === CODING WORDS FOR RANDOM CONTENT === -CODING_WORDS = [ - "algorithm", "array", "boolean", "class", "compiler", "debugger", "function", - "variable", "string", "integer", "object", "method", "parameter", "return", - "loop", "condition", "statement", "expression", "operator", "syntax", "semantic", - "recursion", "iteration", "inheritance", "polymorphism", "encapsulation", "abstraction", - "interface", "implementation", "constructor", "destructor", "pointer", "reference", - "memory", "allocation", "deallocation", "garbage", "collection", "optimization", - "performance", "efficiency", "complexity", "asymptotic", "big", "notation", - "data", "structure", "stack", "queue", "tree", "graph", "hash", "table", - "binary", "search", "sorting", "bubble", "merge", "quick", "heap", "radix", - "database", "query", "sql", "schema", "table", "index", "transaction", "commit", - "rollback", "concurrency", "threading", "synchronization", "mutex", "semaphore", - "deadlock", "race", "condition", "parallel", "distributed", "microservice", - "api", "rest", "json", "xml", "http", "https", "endpoint", "request", "response", - "authentication", "authorization", "encryption", "decryption", "security", "vulnerability", - "testing", "unit", "integration", "regression", "coverage", "mock", "stub", "fixture", - "deployment", "ci", "cd", "pipeline", "docker", "kubernetes", "container", "orchestration" -] +from autopr_bot import AutoPRBot -# === HEADERS === -class AutoPRBot: - def __init__(self, token: str, repo: str, base_branch: str = "main", head_branch: str = "dev", logger: Optional[Callable[[str], None]] = None, co_authors: Optional[list] = None): - self.token = token - self.repo = repo - self.base_branch = base_branch - self.head_branch = head_branch - self.co_authors = co_authors or [] # List of co-author info: [{"name": "John Doe", "email": "john@example.com"}] - self.headers = { - "Authorization": f"token {self.token}", - "Accept": "application/vnd.github+json", - "User-Agent": "auto-pr-bot" - } - self.log = logger if logger else print - def generate_random_content(self) -> dict: - """Generate random PR content using coding words""" - selected_words = random.sample(CODING_WORDS, min(50, len(CODING_WORDS))) - - # Generate title - title_words = random.sample(selected_words, 3) - title = f"feat: implement {title_words[0]} {title_words[1]} {title_words[2]} optimization" - - # Generate body content - what_changed = f""" -## What Changed -- Enhanced {random.choice(selected_words)} {random.choice(selected_words)} processing -- Improved {random.choice(selected_words)} {random.choice(selected_words)} performance -- Added {random.choice(selected_words)} {random.choice(selected_words)} validation -- Refactored {random.choice(selected_words)} {random.choice(selected_words)} logic -- Updated {random.choice(selected_words)} {random.choice(selected_words)} configuration -""" - - tech_details = f""" -## Technical Details -This PR introduces significant improvements to the {random.choice(selected_words)} {random.choice(selected_words)} system: - -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Implemented advanced {random.choice(selected_words)} {random.choice(selected_words)} algorithms -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Enhanced {random.choice(selected_words)} {random.choice(selected_words)} processing capabilities -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Optimized {random.choice(selected_words)} {random.choice(selected_words)} performance metrics -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Added robust {random.choice(selected_words)} {random.choice(selected_words)} error handling -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Improved {random.choice(selected_words)} {random.choice(selected_words)} security protocols - -The implementation leverages modern {random.choice(selected_words)} {random.choice(selected_words)} patterns and follows best practices for {random.choice(selected_words)} {random.choice(selected_words)} development. -""" - - performance = f""" -## Performance Improvements -- Reduced {random.choice(selected_words)} {random.choice(selected_words)} latency by 40% -- Optimized {random.choice(selected_words)} {random.choice(selected_words)} memory usage -- Enhanced {random.choice(selected_words)} {random.choice(selected_words)} throughput -- Improved {random.choice(selected_words)} {random.choice(selected_words)} scalability -- Streamlined {random.choice(selected_words)} {random.choice(selected_words)} operations -""" - - testing = f""" -## Testing -- Added comprehensive {random.choice(selected_words)} {random.choice(selected_words)} unit tests -- Implemented {random.choice(selected_words)} {random.choice(selected_words)} integration tests -- Enhanced {random.choice(selected_words)} {random.choice(selected_words)} regression testing -- Improved {random.choice(selected_words)} {random.choice(selected_words)} test coverage -- Added {random.choice(selected_words)} {random.choice(selected_words)} performance benchmarks -""" - - code_quality = f""" -## Code Quality -- Applied {random.choice(selected_words)} {random.choice(selected_words)} design patterns -- Implemented {random.choice(selected_words)} {random.choice(selected_words)} best practices -- Enhanced {random.choice(selected_words)} {random.choice(selected_words)} documentation -- Improved {random.choice(selected_words)} {random.choice(selected_words)} maintainability -- Added {random.choice(selected_words)} {random.choice(selected_words)} type safety -""" - - footer_words = random.sample(selected_words, 10) - footer = f""" ---- -**Generated**: {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")} -""" - - body = what_changed + tech_details + performance + testing + code_quality + footer - - return { - "title": title, - "body": body, - "words_used": len(selected_words), - "all_words": selected_words - } - - def generate_random_readme_content(self) -> str: - """Generate random README content using coding words""" - selected_words = random.sample(CODING_WORDS, min(30, len(CODING_WORDS))) - timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") - - content = f"""# Auto-Generated README - -This file was auto-updated on {timestamp}. - -## Project Overview -This repository demonstrates automated {random.choice(selected_words)} {random.choice(selected_words)} workflows using advanced {random.choice(selected_words)} {random.choice(selected_words)} techniques. - -## Features -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Implements robust {random.choice(selected_words)} {random.choice(selected_words)} processing -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Enhanced {random.choice(selected_words)} {random.choice(selected_words)} performance optimization -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Advanced {random.choice(selected_words)} {random.choice(selected_words)} error handling -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Secure {random.choice(selected_words)} {random.choice(selected_words)} authentication -- **{random.choice(selected_words).title()} {random.choice(selected_words).title()}**: Efficient {random.choice(selected_words)} {random.choice(selected_words)} data structures - -## Technical Implementation -The system utilizes modern {random.choice(selected_words)} {random.choice(selected_words)} patterns and follows industry best practices for {random.choice(selected_words)} {random.choice(selected_words)} development. - -## Performance Metrics -- Optimized {random.choice(selected_words)} {random.choice(selected_words)} algorithms -- Enhanced {random.choice(selected_words)} {random.choice(selected_words)} memory management -- Improved {random.choice(selected_words)} {random.choice(selected_words)} scalability -- Streamlined {random.choice(selected_words)} {random.choice(selected_words)} operations - -## Random Coding Terms -{', '.join(random.sample(selected_words, 15))} - ---- -*This content was generated using 100 random coding-related words* -""" - return content - - def branches_have_diffs(self) -> bool: - url = f"https://api.github.com/repos/{self.repo}/compare/{self.base_branch}...{self.head_branch}" - r = requests.get(url, headers=self.headers) - if not r.ok: - self.log(f"Failed to compare branches: {r.text}") - return False - data = r.json() - return data.get("ahead_by", 0) > 0 - - def ensure_branch_exists(self, branch: str) -> bool: - url_ref = f"https://api.github.com/repos/{self.repo}/git/ref/heads/{branch}" - r = requests.get(url_ref, headers=self.headers) - if r.ok: - return True - if r.status_code != 404: - self.log(f"Failed to check branch: {r.text}") - return False - base_ref_url = f"https://api.github.com/repos/{self.repo}/git/ref/heads/{self.base_branch}" - base_ref = requests.get(base_ref_url, headers=self.headers) - if not base_ref.ok: - self.log(f"Failed to get base branch ref: {base_ref.text}") - return False - base_sha = base_ref.json().get("object", {}).get("sha") - if not base_sha: - self.log("Base branch SHA not found") - return False - create_ref_url = f"https://api.github.com/repos/{self.repo}/git/refs" - payload = {"ref": f"refs/heads/{branch}", "sha": base_sha} - created = requests.post(create_ref_url, headers=self.headers, json=payload) - if not created.ok: - self.log(f"Failed to create branch: {created.text}") - return False - self.log(f"Created branch {branch} from {self.base_branch}") - return True - - def update_readme_on_branch(self, branch: str) -> bool: - path = "README.md" - # Generate random content using coding words - new_content = self.generate_random_readme_content() - get_url = f"https://api.github.com/repos/{self.repo}/contents/{path}?ref={branch}" - r = requests.get(get_url, headers=self.headers) - existing_content_decoded = "" - sha = None - if r.status_code == 200: - data = r.json() - sha = data.get("sha") - # We ignore previous content on purpose; we only need SHA to update - elif r.status_code != 404 and not r.ok: - self.log(f"Failed to fetch README: {r.text}") - return False - encoded = base64.b64encode(new_content.encode()).decode() - put_url = f"https://api.github.com/repos/{self.repo}/contents/{path}" - - # Build commit message with co-authors - commit_message = "chore: overwrite README via auto-update" - if self.co_authors: - co_author_lines = [] - for co_author in self.co_authors: - co_author_lines.append(f"Co-authored-by: {co_author['name']} <{co_author['email']}>") - commit_message += "\n\n" + "\n".join(co_author_lines) - - payload = { - "message": commit_message, - "content": encoded, - "branch": branch - } - if sha: - payload["sha"] = sha - put = requests.put(put_url, headers=self.headers, json=payload) - if not put.ok: - self.log(f"Failed to update README: {put.text}") - return False - self.log(f"Updated {path} on {branch}") - return True - - def create_pull_request(self) -> Optional[int]: - url = f"https://api.github.com/repos/{self.repo}/pulls" - - # Generate random PR content - random_content = self.generate_random_content() - - data = { - "title": random_content["title"], - "head": self.head_branch, - "base": self.base_branch, - # "body": pr_body - } - response = requests.post(url, headers=self.headers, json=data) - if response.status_code == 422: - text = response.text - if "A pull request already exists" in text: - self.log("PR already exists, skipping.") - return None - if "No commits between" in text: - self.log("No commits between branches, skipping PR creation.") - return None - if response.ok: - pr = response.json() - self.log(f"Created PR #{pr['number']}") - return pr["number"] - else: - self.log(f"Failed to create PR: {response.text}") - return None - - def merge_pull_request(self, pr_number: int) -> None: - url = f"https://api.github.com/repos/{self.repo}/pulls/{pr_number}/merge" - data = {"merge_method": "merge"} - response = requests.put(url, headers=self.headers, json=data) - if response.ok: - self.log(f"Merged PR #{pr_number}") - else: - self.log(f"Failed to merge PR #{pr_number}: {response.text}") - - def get_open_pr(self) -> Optional[int]: - owner = self.repo.split('/')[0] - url = f"https://api.github.com/repos/{self.repo}/pulls?head={owner}:{self.head_branch}&base={self.base_branch}&state=open" - r = requests.get(url, headers=self.headers) - if r.ok and len(r.json()) > 0: - return r.json()[0]["number"] - return None - - def run_once(self) -> None: - if not self.token: - raise ValueError("❌ Please provide a GITHUB_TOKEN") - if not self.ensure_branch_exists(self.head_branch): - raise SystemExit(1) - if not self.update_readme_on_branch(self.head_branch): - raise SystemExit(1) - self.log("\n=== Checking for PRs ===") - pr_number = self.get_open_pr() - if pr_number: - self.log(f"Found existing PR #{pr_number}, merging...") - self.merge_pull_request(pr_number) - else: - if not self.branches_have_diffs(): - self.log("No diffs between dev and main. Skipping PR.") - else: - self.log("No PR found, creating one...") - new_pr = self.create_pull_request() - if new_pr: - self.merge_pull_request(new_pr) - - def display_random_content(self) -> None: - """Display generated random content for preview""" - self.log("\n" + "="*80) - self.log("RANDOM PR CONTENT GENERATED") - self.log("="*80) - - random_content = self.generate_random_content() - self.log(f"\nTitle: {random_content['title']}") - self.log(f"\nBody:\n{random_content['body']}") - self.log(f"\nWords Used: {random_content['words_used']}") - self.log(f"\nAll Words: {', '.join(random_content['all_words'])}") - - self.log("\n" + "="*80) - self.log("RANDOM README CONTENT GENERATED") - self.log("="*80) - readme_content = self.generate_random_readme_content() - self.log(f"\n{readme_content}") - - def run_loop(self, interval_seconds: int) -> None: - while True: - try: - self.run_once() - except Exception as e: - self.log(f"Error: {e}") - time.sleep(interval_seconds) - -if __name__ == "__main__": - # Example with co-authors - co_authors = [ - {"name": "John Doe", "email": "john@example.com"}, - {"name": "Jane Smith", "email": "jane@example.com"} - ] - - # Set your GitHub token and repository here - GITHUB_TOKEN = "your_token_here" - REPO = "owner/repo" - BASE_BRANCH = "main" - HEAD_BRANCH = "dev" - +def main() -> None: + """Demonstrate the bot with placeholder values.""" bot = AutoPRBot( - token=GITHUB_TOKEN, - repo=REPO, - base_branch=BASE_BRANCH, - head_branch=HEAD_BRANCH, - co_authors=co_authors + token="your_token_here", + repo="owner/repo", + base_branch="main", + head_branch="dev", ) - - # Display random content generation bot.display_random_content() - # Uncomment the line below to run the actual bot - # bot.run_once() + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f84158e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests>=2.31.0 + + diff --git a/ui.py b/ui.py index 3850a05..d1ebdfc 100644 --- a/ui.py +++ b/ui.py @@ -1,239 +1,6 @@ -import threading -import tkinter as tk -from tkinter import ttk, messagebox -from typing import List, Dict - -from main import AutoPRBot - - -class TextLogger: - def __init__(self, text_widget: tk.Text): - self.text_widget = text_widget - - def __call__(self, message: str) -> None: - self.text_widget.after(0, self._append, message) - - def _append(self, message: str) -> None: - self.text_widget.insert(tk.END, message + "\n") - self.text_widget.see(tk.END) - - -class CoAuthorManager: - def __init__(self, parent_frame: ttk.Frame): - self.parent_frame = parent_frame - self.co_authors: List[Dict[str, str]] = [] - self.co_author_frames: List[ttk.Frame] = [] - self._build_co_author_section() - - def _build_co_author_section(self): - # Co-authors section - co_authors_frame = ttk.LabelFrame(self.parent_frame, text="Co-Authors", padding=10) - co_authors_frame.grid(row=6, column=0, columnspan=2, sticky=tk.EW, padx=8, pady=6) - - # Add co-author button - add_btn = ttk.Button(co_authors_frame, text="+ Add Co-Author", command=self._add_co_author) - add_btn.pack(anchor=tk.W, pady=(0, 10)) - - # Co-authors list frame - self.co_authors_list_frame = ttk.Frame(co_authors_frame) - self.co_authors_list_frame.pack(fill=tk.X) - - def _add_co_author(self): - co_author_frame = ttk.Frame(self.co_authors_list_frame) - co_author_frame.pack(fill=tk.X, pady=2) - - # Name entry - name_var = tk.StringVar() - name_entry = ttk.Entry(co_author_frame, textvariable=name_var, width=20) - name_entry.pack(side=tk.LEFT, padx=(0, 5)) - - # Email entry - email_var = tk.StringVar() - email_entry = ttk.Entry(co_author_frame, textvariable=email_var, width=25) - email_entry.pack(side=tk.LEFT, padx=(0, 5)) - - # Remove button - remove_btn = ttk.Button(co_author_frame, text="Remove", - command=lambda: self._remove_co_author(co_author_frame)) - remove_btn.pack(side=tk.LEFT) - - # Store references - co_author_data = { - 'frame': co_author_frame, - 'name_var': name_var, - 'email_var': email_var - } - self.co_author_frames.append(co_author_data) - - # Add labels for first entry - if len(self.co_author_frames) == 1: - labels_frame = ttk.Frame(self.co_authors_list_frame) - labels_frame.pack(fill=tk.X, pady=(0, 5)) - ttk.Label(labels_frame, text="Name", font=('TkDefaultFont', 8, 'bold')).pack(side=tk.LEFT, padx=(0, 5)) - ttk.Label(labels_frame, text="Email", font=('TkDefaultFont', 8, 'bold')).pack(side=tk.LEFT, padx=(0, 5)) - - def _remove_co_author(self, frame_to_remove: ttk.Frame): - # Find and remove the co-author data - for i, co_author_data in enumerate(self.co_author_frames): - if co_author_data['frame'] == frame_to_remove: - self.co_author_frames.pop(i) - break - - # Destroy the frame - frame_to_remove.destroy() - - # Remove labels if no co-authors left - if not self.co_author_frames: - for child in self.co_authors_list_frame.winfo_children(): - if isinstance(child, ttk.Frame) and len(child.winfo_children()) == 2: - child.destroy() - - def get_co_authors(self) -> List[Dict[str, str]]: - co_authors = [] - for co_author_data in self.co_author_frames: - name = co_author_data['name_var'].get().strip() - email = co_author_data['email_var'].get().strip() - if name and email: - co_authors.append({"name": name, "email": email}) - return co_authors - - -class App(tk.Tk): - def __init__(self): - super().__init__() - self.title("Auto PR Bot") - self.geometry("700x600") - self._bg_thread = None - self._stop_event = threading.Event() - - # Inputs - self.token_var = tk.StringVar() - self.username_var = tk.StringVar() - self.repo_var = tk.StringVar() - self.uptime_var = tk.IntVar(value=60) - self.base_branch_var = tk.StringVar(value="main") - self.head_branch_var = tk.StringVar(value="dev") - - self._build_ui() - - def _build_ui(self): - pad = {"padx": 8, "pady": 6} - - frm = ttk.Frame(self) - frm.pack(fill=tk.BOTH, expand=True) - - # Basic configuration section - config_frame = ttk.LabelFrame(frm, text="Configuration", padding=10) - config_frame.grid(row=0, column=0, columnspan=2, sticky=tk.EW, **pad) - - ttk.Label(config_frame, text="GitHub Token").grid(row=0, column=0, sticky=tk.W, **pad) - ttk.Entry(config_frame, textvariable=self.token_var, show="*").grid(row=0, column=1, sticky=tk.EW, **pad) - - ttk.Label(config_frame, text="Username / Owner").grid(row=1, column=0, sticky=tk.W, **pad) - ttk.Entry(config_frame, textvariable=self.username_var).grid(row=1, column=1, sticky=tk.EW, **pad) - - ttk.Label(config_frame, text="Repository Name").grid(row=2, column=0, sticky=tk.W, **pad) - ttk.Entry(config_frame, textvariable=self.repo_var).grid(row=2, column=1, sticky=tk.EW, **pad) - - ttk.Label(config_frame, text="Base Branch").grid(row=3, column=0, sticky=tk.W, **pad) - ttk.Entry(config_frame, textvariable=self.base_branch_var).grid(row=3, column=1, sticky=tk.EW, **pad) - - ttk.Label(config_frame, text="Head Branch").grid(row=4, column=0, sticky=tk.W, **pad) - ttk.Entry(config_frame, textvariable=self.head_branch_var).grid(row=4, column=1, sticky=tk.EW, **pad) - - ttk.Label(config_frame, text="Uptime (seconds)").grid(row=5, column=0, sticky=tk.W, **pad) - ttk.Entry(config_frame, textvariable=self.uptime_var).grid(row=5, column=1, sticky=tk.EW, **pad) - - config_frame.columnconfigure(1, weight=1) - - # Initialize co-author manager - self.co_author_manager = CoAuthorManager(frm) - - # Buttons - btns = ttk.Frame(frm) - btns.grid(row=7, column=0, columnspan=2, sticky=tk.EW, **pad) - ttk.Button(btns, text="Run Once", command=self.on_run_once).pack(side=tk.LEFT, padx=4) - ttk.Button(btns, text="Start Loop", command=self.on_start_loop).pack(side=tk.LEFT, padx=4) - ttk.Button(btns, text="Stop Loop", command=self.on_stop_loop).pack(side=tk.LEFT, padx=4) - - # Log Area - log_frame = ttk.LabelFrame(frm, text="Logs", padding=5) - log_frame.grid(row=8, column=0, columnspan=2, sticky=tk.NSEW, **pad) - self.log_text = tk.Text(log_frame, height=12) - self.log_text.pack(fill=tk.BOTH, expand=True) - frm.rowconfigure(8, weight=1) - - def _build_repo_fullname(self) -> str: - owner = self.username_var.get().strip() - repo = self.repo_var.get().strip() - return f"{owner}/{repo}" - - def _make_bot(self) -> AutoPRBot: - token = self.token_var.get().strip() - if not token: - raise ValueError("Token is required") - repo_full = self._build_repo_fullname() - if "/" not in repo_full: - raise ValueError("Provide valid owner and repository") - logger = TextLogger(self.log_text) - - # Get co-authors from the manager - co_authors = self.co_author_manager.get_co_authors() - - return AutoPRBot( - token=token, - repo=repo_full, - base_branch=self.base_branch_var.get().strip() or "main", - head_branch=self.head_branch_var.get().strip() or "dev", - logger=logger, - co_authors=co_authors, - ) - - def on_run_once(self): - try: - bot = self._make_bot() - except Exception as e: - messagebox.showerror("Error", str(e)) - return - threading.Thread(target=self._run_once_thread, args=(bot,), daemon=True).start() - - def _run_once_thread(self, bot: AutoPRBot): - try: - bot.run_once() - except Exception as e: - bot.log(f"Error: {e}") - - def on_start_loop(self): - if self._bg_thread and self._bg_thread.is_alive(): - messagebox.showinfo("Running", "Loop is already running") - return - try: - bot = self._make_bot() - except Exception as e: - messagebox.showerror("Error", str(e)) - return - interval = max(1, int(self.uptime_var.get() or 60)) - self._stop_event.clear() - self._bg_thread = threading.Thread(target=self._loop_worker, args=(bot, interval), daemon=True) - self._bg_thread.start() - - def _loop_worker(self, bot: AutoPRBot, interval: int): - while not self._stop_event.is_set(): - try: - bot.run_once() - except Exception as e: - bot.log(f"Error: {e}") - self._stop_event.wait(interval) - - def on_stop_loop(self): - if self._bg_thread and self._bg_thread.is_alive(): - self._stop_event.set() - self.log_text.insert(tk.END, "Stop requested.\n") - self.log_text.see(tk.END) - +from autopr_bot.ui import launch if __name__ == "__main__": - app = App() - app.mainloop() + launch()