From 45dd740b830eedf387fa5a90812bb1ef54828b50 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 13:59:20 +0100 Subject: [PATCH 01/10] fix: harden monitor behavior and raise coverage --- ptop3/monitor.py | 1132 ++++++++++++++++++++----------------- pyproject.toml | 13 + tests/conftest.py | 11 +- tests/test_drop_caches.py | 21 +- tests/test_monitor.py | 1001 +++++++++++++++++++++++++++++++- tests/test_sudo_config.py | 6 +- tests/test_swap_clean.py | 91 ++- 7 files changed, 1742 insertions(+), 533 deletions(-) diff --git a/ptop3/monitor.py b/ptop3/monitor.py index ac63667..36a16e1 100644 --- a/ptop3/monitor.py +++ b/ptop3/monitor.py @@ -5,7 +5,7 @@ ↑/↓/PgUp/PgDn/Home/End Move selection Enter or l Expand group / back with h t Toggle process tree view [detail view] - s Cycle sort: mem -> cpu -> rss -> swap -> io -> net -> count + s Cycle sort: mem -> cpu -> rss -> swap -> io -> count f / r Filter regex / reset + / - Refresh interval up/down k / K Kill (TERM / KILL) — process in detail, group in group view @@ -27,51 +27,43 @@ import subprocess import sys import time -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path -# --- Performance Caching & Modes --- -PID_CACHE: dict = {} -PID_CACHE_TTL = 30.0 -LAST_CACHE_CLEAN = 0.0 -SWAP_CACHE: dict = {} -SWAP_CACHE_TTL = 2.0 - -LITE_MODE = False - try: import psutil except ImportError: print("Install dependency first: pip install psutil", file=sys.stderr) sys.exit(1) + DEFAULT_REFRESH = 2.0 MIN_REFRESH, MAX_REFRESH = 1.0, 10.0 CPU_HOT = 100.0 RSS_HOT_MB = 800.0 SWAP_HOT_MB = 500.0 SWAP_CLEAN_SUGGEST_PCT = 80.0 -SUDOERS_PATH = "/etc/sudoers.d/ptop3" DISK_USAGE_CRITICAL = 95.0 DISK_USAGE_HIGH = 85.0 -DISK_IO_HIGH_MBPS = 50.0 -NETWORK_HIGH_MBPS = 100.0 -LOAD_CRITICAL = 8.0 -LOAD_HIGH = 4.0 -TEMP_CRITICAL = 85.0 -TEMP_HIGH = 70.0 - -SORT_KEYS = ("mem", "cpu", "rss", "swap", "io", "net", "count") + +SORT_KEYS = ("mem", "cpu", "rss", "swap", "io", "count") MAX_APP_NAME = 24 ALIASES = { - "code-insiders": "vscode-insiders", "code": "vscode", - "chromium": "chrome", "chromium-browse": "chrome", "chrome": "chrome", - "firefox": "firefox", "web content": "firefox", - "python3": "python", "python": "python", - "java": "java", "cursor": "cursor", - "gnome-shell": "gnome-shell", "xwayland": "Xwayland", + "code-insiders": "vscode-insiders", + "code": "vscode", + "chromium": "chrome", + "chromium-browse": "chrome", + "chrome": "chrome", + "firefox": "firefox", + "web content": "firefox", + "python3": "python", + "python": "python", + "java": "java", + "cursor": "cursor", + "gnome-shell": "gnome-shell", + "xwayland": "Xwayland", } _ALIASES_BY_LEN = sorted(ALIASES.items(), key=lambda x: len(x[0]), reverse=True) _CMD_WORDLIKE = r"[a-z0-9_-]" @@ -87,30 +79,7 @@ (".mount_cursor", "cursor"), ] - -def _find_subscript(name: str) -> str | None: - """Find an installed subscript by console_script name, fall back to dev path.""" - path = shutil.which(name) - if path: - return path - # Development fallback: run via sys.executable - stem = name.replace("ptop3-", "").replace("-", "_") - dev_path = Path(__file__).parent / "scripts" / f"{stem}.py" - if dev_path.exists(): - return str(dev_path) - return None - - -def _subscript_cmd(name: str) -> list[str] | None: - """Return the command list to invoke a subscript, or None if not found.""" - path = shutil.which(name) - if path: - return [path] - stem = name.replace("ptop3-", "").replace("-", "_") - dev_path = Path(__file__).parent / "scripts" / f"{stem}.py" - if dev_path.exists(): - return [sys.executable, str(dev_path)] - return None +LITE_MODE = False @dataclass(slots=True) @@ -126,8 +95,6 @@ class ProcRow: app: str io_read_mb: float = 0.0 io_write_mb: float = 0.0 - net_sent_mb: float = 0.0 - net_recv_mb: float = 0.0 status: str = "running" @@ -141,8 +108,210 @@ class GroupRow: swap_mb: float io_read_mb: float = 0.0 io_write_mb: float = 0.0 - net_sent_mb: float = 0.0 - net_recv_mb: float = 0.0 + + +@dataclass(slots=True) +class SampleResult: + rows: list[ProcRow] + error: str = "" + + +@dataclass(slots=True) +class ProcessSampler: + pid_cache_ttl: float = 30.0 + swap_cache_ttl: float = 2.0 + pid_cache: dict[int, tuple[float, str, str, str]] = field(default_factory=dict) + swap_cache: dict[int, tuple[float, float]] = field(default_factory=dict) + last_cache_clean: float = 0.0 + + def read_vmswap_mb(self, pid: int) -> float: + now = time.time() + cached = self.swap_cache.get(pid) + if cached and (now - cached[0] < self.swap_cache_ttl): + return cached[1] + try: + with open(f"/proc/{pid}/status") as f: + for line in f: + if not line.startswith("VmSwap:"): + continue + parts = line.split() + if len(parts) >= 2: + swap_mb = float(parts[1]) / 1024.0 + self.swap_cache[pid] = (now, swap_mb) + return swap_mb + except OSError: + pass + self.swap_cache[pid] = (now, 0.0) + return 0.0 + + def _cleanup_cache(self, now: float) -> None: + if now - self.last_cache_clean <= 10.0: + return + stale_pids = [pid for pid, (ts, *_rest) in self.pid_cache.items() if now - ts > self.pid_cache_ttl] + for pid in stale_pids: + self.pid_cache.pop(pid, None) + stale_swaps = [ + pid for pid, (ts, _swap) in self.swap_cache.items() + if now - ts > self.swap_cache_ttl * 5 + ] + for pid in stale_swaps: + self.swap_cache.pop(pid, None) + self.last_cache_clean = now + + def sample(self, filter_re: re.Pattern[str] | None) -> SampleResult: + rows: list[ProcRow] = [] + errors: list[str] = [] + now = time.time() + self._cleanup_cache(now) + + try: + mem = psutil.virtual_memory() + inv_mem_total = 100.0 / mem.total + except Exception as exc: # pragma: no cover - defensive fallback + return SampleResult([], f"sampling failed: {exc}") + + lite = LITE_MODE + filter_search = filter_re.search if filter_re else None + attrs = ["pid", "ppid", "name"] + + try: + proc_iter = psutil.process_iter(attrs=attrs) + except Exception as exc: + return SampleResult([], f"sampling failed: {exc}") + + for proc in proc_iter: + try: + row = self._sample_process(proc, now, inv_mem_total, lite, filter_search) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + except Exception as exc: + if not errors: + errors.append(f"sampling warning: {exc}") + continue + if row is not None: + rows.append(row) + + return SampleResult(rows, errors[0] if errors else "") + + def _sample_process( + self, + proc: psutil.Process, + now: float, + inv_mem_total: float, + lite: bool, + filter_search, + ) -> ProcRow | None: + with proc.oneshot(): + pid = proc.info["pid"] + ppid = proc.info.get("ppid") or 0 + + cache_entry = self.pid_cache.get(pid) + if cache_entry and (now - cache_entry[0] < self.pid_cache_ttl): + _, name, cmdline, app = cache_entry + if filter_search and not cmdline: + cmdline = _safe_cmdline(proc) + else: + name = proc.info.get("name") or "?" + cmdline = _safe_cmdline(proc) if (not lite) or filter_search else "" + app = normalize_app_name(name, cmdline) + self.pid_cache[pid] = (now, name, cmdline, app) + + if filter_search and not _matches_filter(filter_search, app, name, cmdline): + return None + + try: + rss_bytes = proc.memory_info().rss + except (psutil.AccessDenied, psutil.NoSuchProcess): + return None + + rss_mb = rss_bytes / (1024 * 1024) + cpu = 0.0 if lite and rss_mb < 2.0 else proc.cpu_percent() + swap_mb = _swap_value(proc.pid, rss_mb, lite) + io_read_mb, io_write_mb = _io_values(proc, rss_mb, lite) + + try: + status = proc.status() if not lite else "running" + except (psutil.AccessDenied, psutil.NoSuchProcess): + status = "unknown" + + return ProcRow( + pid=pid, + ppid=ppid, + name=name, + rss_mb=rss_mb, + cpu=cpu, + mem_pct=rss_bytes * inv_mem_total, + swap_mb=swap_mb, + cmdline=cmdline, + app=app, + io_read_mb=io_read_mb, + io_write_mb=io_write_mb, + status=status, + ) + + +DEFAULT_SAMPLER = ProcessSampler() +PID_CACHE = DEFAULT_SAMPLER.pid_cache +SWAP_CACHE = DEFAULT_SAMPLER.swap_cache + + +def _find_subscript(name: str) -> str | None: + """Find an installed subscript by console_script name, fall back to dev path.""" + path = shutil.which(name) + if path: + return path + stem = name.replace("ptop3-", "").replace("-", "_") + dev_path = Path(__file__).parent / "scripts" / f"{stem}.py" + if dev_path.exists(): + return str(dev_path) + return None + + +def _subscript_cmd(name: str) -> list[str] | None: + """Return the command list to invoke a subscript, or None if not found.""" + path = shutil.which(name) + if path: + return [path] + stem = name.replace("ptop3-", "").replace("-", "_") + dev_path = Path(__file__).parent / "scripts" / f"{stem}.py" + if dev_path.exists(): + return [sys.executable, str(dev_path)] + return None + + +def _safe_cmdline(proc: psutil.Process) -> str: + try: + cmd_list = proc.cmdline() + except (psutil.AccessDenied, psutil.NoSuchProcess): + return "" + return " ".join(cmd_list[:10]) if cmd_list else "" + + +def _matches_filter(filter_search, app: str, name: str, cmdline: str) -> bool: + return bool(filter_search(app) or filter_search(name) or (cmdline and filter_search(cmdline))) + + +def _swap_value(pid: int, rss_mb: float, lite: bool) -> float: + if not lite and rss_mb > 50: + return read_vmswap_mb(pid) + if lite and rss_mb > 200: + return read_vmswap_mb(pid) + return 0.0 + + +def _io_values(proc: psutil.Process, rss_mb: float, lite: bool) -> tuple[float, float]: + if lite or rss_mb <= 25: + return 0.0, 0.0 + try: + io_counters = proc.io_counters() + except (psutil.AccessDenied, AttributeError, OSError): + return 0.0, 0.0 + if not io_counters: + return 0.0, 0.0 + return ( + io_counters.read_bytes / (1024 * 1024), + io_counters.write_bytes / (1024 * 1024), + ) def normalize_app_name(name_hint: str, cmd_hint: str) -> str: @@ -150,243 +319,159 @@ def normalize_app_name(name_hint: str, cmd_hint: str) -> str: cmd_low = (cmd_hint or "").lower() if app == "python3": app = "python" - for k, v in ALIASES.items(): - if app.startswith(k): - return v - for pattern, v in _CMD_ALIASES: + for key, value in ALIASES.items(): + if app.startswith(key): + return value + for pattern, value in _CMD_ALIASES: if pattern in cmd_low: - return v - for pattern, v in _ALIASES_CMD_REGEX: + return value + for pattern, value in _ALIASES_CMD_REGEX: if pattern.search(cmd_low): - return v + return value return app or "unknown" def read_vmswap_mb(pid: int) -> float: - now = time.time() - cached = SWAP_CACHE.get(pid) - if cached and (now - cached[0] < SWAP_CACHE_TTL): - return cached[1] - try: - with open(f"/proc/{pid}/status") as f: - for line in f: - if line.startswith("VmSwap:"): - parts = line.split() - if len(parts) >= 2: - swap_mb = float(parts[1]) / 1024.0 - SWAP_CACHE[pid] = (now, swap_mb) - return swap_mb - except Exception: - pass - SWAP_CACHE[pid] = (now, 0.0) - return 0.0 + return DEFAULT_SAMPLER.read_vmswap_mb(pid) -def get_proc_rows(filter_re: re.Pattern | None) -> list[ProcRow]: - global LAST_CACHE_CLEAN - rows: list[ProcRow] = [] - now = time.time() - mem = psutil.virtual_memory() - mem_total = mem.total - inv_mem_total = 100.0 / mem_total - - if now - LAST_CACHE_CLEAN > 10.0: - stale = [pid for pid, (ts, _, _, _) in PID_CACHE.items() if now - ts > PID_CACHE_TTL] - for pid in stale: - PID_CACHE.pop(pid, None) - swap_stale = [pid for pid, (ts, _) in SWAP_CACHE.items() if now - ts > SWAP_CACHE_TTL * 5] - for pid in swap_stale: - SWAP_CACHE.pop(pid, None) - LAST_CACHE_CLEAN = now - - lite = LITE_MODE - attrs = ["pid", "ppid", "name"] - filter_search = filter_re.search if filter_re else None - read_swap = read_vmswap_mb - - try: - for p in psutil.process_iter(attrs=attrs): - try: - with p.oneshot(): - pid = p.info["pid"] - ppid = p.info.get("ppid") or 0 - - cache_entry = PID_CACHE.get(pid) - if cache_entry and (now - cache_entry[0] < PID_CACHE_TTL): - _, name, cmdline, app = cache_entry - if filter_search and not cmdline: - try: - cmd_list = p.cmdline() - cmdline = " ".join(cmd_list[:10]) if cmd_list else "" - except (psutil.AccessDenied, psutil.NoSuchProcess): - cmdline = "" - else: - name = p.info.get("name") or "?" - cmdline = "" - if (not lite) or filter_search: - try: - cmd_list = p.cmdline() - cmdline = " ".join(cmd_list[:10]) if cmd_list else "" - except (psutil.AccessDenied, psutil.NoSuchProcess): - cmdline = "" - app = normalize_app_name(name, cmdline) - PID_CACHE[pid] = (now, name, cmdline, app) - - if filter_search and not ( - filter_search(app) or filter_search(name) or (cmdline and filter_search(cmdline)) - ): - continue - - try: - mi = p.memory_info() - rss_bytes = mi.rss - rss_mb = rss_bytes / (1024 * 1024) - except (psutil.AccessDenied, psutil.NoSuchProcess): - continue +def sample_processes(filter_re: re.Pattern[str] | None) -> SampleResult: + return DEFAULT_SAMPLER.sample(filter_re) - if lite and rss_mb < 2.0: - cpu = 0.0 - else: - cpu = p.cpu_percent() - - if not lite and rss_mb > 50: - swap_mb = read_swap(pid) - elif lite and rss_mb > 200: - swap_mb = read_swap(pid) - else: - swap_mb = 0.0 - - io_read_mb = io_write_mb = 0.0 - if not lite and rss_mb > 25: - try: - io_counters = p.io_counters() - if io_counters: - io_read_mb = io_counters.read_bytes / (1024 * 1024) - io_write_mb = io_counters.write_bytes / (1024 * 1024) - except (psutil.AccessDenied, AttributeError): - pass - - net_sent_mb = net_recv_mb = 0.0 - - try: - status = p.status() if not lite else "running" - except (psutil.AccessDenied, psutil.NoSuchProcess): - status = "unknown" - - mem_pct = rss_bytes * inv_mem_total - - rows.append( - ProcRow( - pid, ppid, name, rss_mb, cpu, mem_pct, swap_mb, cmdline, app, - io_read_mb, io_write_mb, net_sent_mb, net_recv_mb, status, - ) - ) - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): - continue - except Exception: - pass - return rows +def get_proc_rows(filter_re: re.Pattern[str] | None) -> list[ProcRow]: + return sample_processes(filter_re).rows def aggregate(rows: list[ProcRow]) -> list[GroupRow]: groups: dict[str, GroupRow] = {} - for r in rows: - g = groups.get(r.app) - if g is None: - groups[r.app] = GroupRow( - app=r.app, + for row in rows: + group = groups.get(row.app) + if group is None: + groups[row.app] = GroupRow( + app=row.app, procs=1, - rss_mb=r.rss_mb, - mem_pct=r.mem_pct, - cpu=r.cpu, - swap_mb=r.swap_mb, - io_read_mb=r.io_read_mb, - io_write_mb=r.io_write_mb, - net_sent_mb=r.net_sent_mb, - net_recv_mb=r.net_recv_mb, + rss_mb=row.rss_mb, + mem_pct=row.mem_pct, + cpu=row.cpu, + swap_mb=row.swap_mb, + io_read_mb=row.io_read_mb, + io_write_mb=row.io_write_mb, ) continue - g.procs += 1 - g.rss_mb += r.rss_mb - g.mem_pct += r.mem_pct - g.cpu += r.cpu - g.swap_mb += r.swap_mb - g.io_read_mb += r.io_read_mb - g.io_write_mb += r.io_write_mb - g.net_sent_mb += r.net_sent_mb - g.net_recv_mb += r.net_recv_mb + group.procs += 1 + group.rss_mb += row.rss_mb + group.mem_pct += row.mem_pct + group.cpu += row.cpu + group.swap_mb += row.swap_mb + group.io_read_mb += row.io_read_mb + group.io_write_mb += row.io_write_mb return list(groups.values()) -def print_once(filter_re: re.Pattern | None, sort_key: str, top: int): - rows = get_proc_rows(filter_re) - groups = aggregate(rows) +def _group_sort_value(group: GroupRow, sort_key: str) -> float: + if sort_key == "mem": + return group.mem_pct + if sort_key == "cpu": + return group.cpu + if sort_key == "rss": + return group.rss_mb + if sort_key == "swap": + return group.swap_mb + if sort_key == "io": + return group.io_read_mb + group.io_write_mb + return float(group.procs) + + +def sort_groups(groups: list[GroupRow], sort_key: str) -> list[GroupRow]: + groups.sort(key=lambda group: _group_sort_value(group, sort_key), reverse=True) + return groups + + +def _proc_sort_value(row: ProcRow, sort_key: str) -> float: + if sort_key == "mem": + return row.mem_pct + if sort_key == "cpu": + return row.cpu + if sort_key == "rss": + return row.rss_mb + if sort_key == "swap": + return row.swap_mb if sort_key == "io": - groups.sort(key=lambda g: (g.io_read_mb + g.io_write_mb), reverse=True) - else: - key = { - "mem": "mem_pct", "cpu": "cpu", "rss": "rss_mb", - "swap": "swap_mb", "net": "net_sent_mb", "count": "procs", - }[sort_key] - groups.sort(key=lambda g: getattr(g, key), reverse=True) + return row.io_read_mb + row.io_write_mb + return float(row.pid) + + +def sort_processes(rows: list[ProcRow], sort_key: str) -> list[ProcRow]: + rows.sort(key=lambda row: _proc_sort_value(row, sort_key), reverse=True) + return rows + + +def print_once(filter_re: re.Pattern[str] | None, sort_key: str, top: int) -> None: + sample = sample_processes(filter_re) + groups = sort_groups(aggregate(sample.rows), sort_key) + if sample.error: + print(sample.error, file=sys.stderr) print( f"{'APP':{MAX_APP_NAME}} {'PROCS':>5} {'RSS(MiB)':>10} {'SWAP(MiB)':>10}" f" {'%MEM':>7} {'%CPU':>7} {'IO_R(MB)':>10} {'IO_W(MB)':>10}" ) - for g in groups[:top]: + for group in groups[:top]: print( - f"{g.app[:MAX_APP_NAME]:{MAX_APP_NAME}} {g.procs:>5} {g.rss_mb:>10.1f}" - f" {g.swap_mb:>10.1f} {g.mem_pct:>7.1f} {g.cpu:>7.1f}" - f" {g.io_read_mb:>10.1f} {g.io_write_mb:>10.1f}" + f"{group.app[:MAX_APP_NAME]:{MAX_APP_NAME}} {group.procs:>5} {group.rss_mb:>10.1f}" + f" {group.swap_mb:>10.1f} {group.mem_pct:>7.1f} {group.cpu:>7.1f}" + f" {group.io_read_mb:>10.1f} {group.io_write_mb:>10.1f}" ) -def clamp(v, lo, hi): - return max(lo, min(hi, v)) +def clamp(value, low, high): + return max(low, min(high, value)) + + +def visible_window_start(selected: int, current_start: int, window_size: int, total: int) -> int: + if window_size <= 0 or total <= window_size: + return 0 + if selected < current_start: + current_start = selected + elif selected >= current_start + window_size: + current_start = selected - window_size + 1 + return clamp(current_start, 0, max(0, total - window_size)) def _tree_sort_key(sort_key: str): if sort_key == "mem": - return lambda r: r.mem_pct + return lambda row: row.mem_pct if sort_key == "cpu": - return lambda r: r.cpu + return lambda row: row.cpu if sort_key == "rss": - return lambda r: r.rss_mb + return lambda row: row.rss_mb if sort_key == "swap": - return lambda r: r.swap_mb + return lambda row: row.swap_mb if sort_key == "io": - return lambda r: r.io_read_mb + r.io_write_mb - if sort_key == "net": - return lambda r: r.net_sent_mb - return lambda r: r.rss_mb - + return lambda row: row.io_read_mb + row.io_write_mb + return lambda row: row.rss_mb -def build_process_tree(procs: list[ProcRow], sort_key: str = "rss") -> list[tuple]: - """Build a flattened tree list from processes using ppid. - Returns list of (ProcRow, indent_level, tree_prefix_str) tuples. - Roots and siblings are sorted by sort_key descending. - """ - by_pid = {r.pid: r for r in procs} +def build_process_tree(procs: list[ProcRow], sort_key: str = "rss") -> list[tuple[ProcRow, int, str]]: + """Build a flattened tree list from processes using ppid.""" + by_pid = {row.pid: row for row in procs} children: dict[int, list[int]] = {} - roots = [] - pids_in_group = set(r.pid for r in procs) + roots: list[int] = [] + pids_in_group = {row.pid for row in procs} keyfn = _tree_sort_key(sort_key) - for r in procs: - if r.ppid in pids_in_group and r.ppid != r.pid: - children.setdefault(r.ppid, []).append(r.pid) + for row in procs: + if row.ppid in pids_in_group and row.ppid != row.pid: + children.setdefault(row.ppid, []).append(row.pid) else: - roots.append(r.pid) + roots.append(row.pid) roots.sort(key=lambda pid: keyfn(by_pid[pid]), reverse=True) + result: list[tuple[ProcRow, int, str]] = [] - result = [] - - def walk(pid, depth, continuation): + def walk(pid: int, depth: int, continuation: list[bool]) -> None: proc = by_pid.get(pid) - if not proc: + if proc is None: return prefix = "" if depth > 0: @@ -395,10 +480,9 @@ def walk(pid, depth, continuation): prefix += "├── " if continuation[-1] else "└── " result.append((proc, depth, prefix)) kids = children.get(pid, []) - kids.sort(key=lambda p: keyfn(by_pid[p]) if p in by_pid else 0, reverse=True) - for i, child_pid in enumerate(kids): - has_more_siblings = i < len(kids) - 1 - walk(child_pid, depth + 1, continuation + [has_more_siblings]) + kids.sort(key=lambda child_pid: keyfn(by_pid[child_pid]), reverse=True) + for idx, child_pid in enumerate(kids): + walk(child_pid, depth + 1, continuation + [idx < len(kids) - 1]) for root_pid in roots: walk(root_pid, 0, []) @@ -425,13 +509,9 @@ def __init__(self, stdscr, filter_re, sort_key, refresh): curses.init_pair(3, curses.COLOR_YELLOW, -1) curses.init_pair(4, curses.COLOR_RED, -1) curses.init_pair(5, curses.COLOR_GREEN, -1) - curses.init_pair(6, curses.COLOR_MAGENTA, -1) curses.init_pair(7, curses.COLOR_WHITE, curses.COLOR_BLUE) curses.init_pair(10, curses.COLOR_WHITE, curses.COLOR_MAGENTA) curses.init_pair(11, curses.COLOR_WHITE, curses.COLOR_BLUE) - curses.init_pair(12, curses.COLOR_WHITE, curses.COLOR_GREEN) - curses.init_pair(13, curses.COLOR_WHITE, curses.COLOR_YELLOW) - curses.init_pair(14, curses.COLOR_WHITE, curses.COLOR_RED) curses.init_pair(15, curses.COLOR_WHITE, curses.COLOR_CYAN) curses.init_pair(16, curses.COLOR_BLACK, curses.COLOR_WHITE) curses.init_pair(17, curses.COLOR_BLACK, curses.COLOR_GREEN) @@ -450,25 +530,50 @@ def __init__(self, stdscr, filter_re, sort_key, refresh): self.refresh = refresh self.view = "groups" self.sel = 0 + self.scroll = 0 self.groups: list[GroupRow] = [] self.detail_list: list[ProcRow] = [] - self.detail_tree: list[tuple] = [] + self.detail_tree: list[tuple[ProcRow, int, str]] = [] self.tree_mode = False self.detail_app: str | None = None self.last = 0.0 self.last_proc_rows: list[ProcRow] = [] - self.last_sample_time = 0.0 self.alerts_cache: list[str] = [] self.alerts_cache_time = 0.0 - self.sample_skip_counter = 0 self.status_msg = "" self.status_time = 0.0 + self.sample_error = "" self.sel_attr = curses.A_REVERSE if curses.has_colors(): self.sel_attr = curses.color_pair(7) | curses.A_BOLD - def _set_timeout(self, r): - self.stdscr.timeout(int(r * 1000)) + def _set_timeout(self, refresh): + self.stdscr.timeout(int(refresh * 1000)) + + def _content_rows(self, maxy: int) -> int: + return max(1, maxy - 15) + + def _reset_scroll(self) -> None: + self.scroll = 0 + + def _sync_scroll(self, maxy: int) -> None: + self.scroll = visible_window_start(self.sel, self.scroll, self._content_rows(maxy), self.length()) + + def _selected_group(self) -> GroupRow | None: + if not self.groups: + return None + return self.groups[self.sel] + + def _selected_proc(self) -> ProcRow | None: + if self.view != "detail": + return None + if self.tree_mode: + if not self.detail_tree: + return None + return self.detail_tree[self.sel][0] + if not self.detail_list: + return None + return self.detail_list[self.sel] def run(self): while True: @@ -488,17 +593,17 @@ def run(self): ch = {65: curses.KEY_UP, 66: curses.KEY_DOWN, 67: ord("l"), 68: ord("h")}.get(b, ch) if ch in (ord("q"), 3): break - elif ch in (curses.KEY_DOWN, ord("j")): + if ch in (curses.KEY_DOWN, ord("j")): self.sel = clamp(self.sel + 1, 0, max(0, self.length() - 1)) elif ch == curses.KEY_UP: self.sel = clamp(self.sel - 1, 0, max(0, self.length() - 1)) - elif ch in (curses.KEY_NPAGE,): + elif ch == curses.KEY_NPAGE: self.sel = clamp(self.sel + 10, 0, max(0, self.length() - 1)) - elif ch in (curses.KEY_PPAGE,): + elif ch == curses.KEY_PPAGE: self.sel = clamp(self.sel - 10, 0, max(0, self.length() - 1)) - elif ch in (curses.KEY_HOME,): + elif ch == curses.KEY_HOME: self.sel = 0 - elif ch in (curses.KEY_END,): + elif ch == curses.KEY_END: self.sel = max(0, self.length() - 1) elif ch in (10, 13, curses.KEY_ENTER, ord("l")): self.toggle_view() @@ -506,9 +611,10 @@ def run(self): if self.view == "detail": self.toggle_view() elif ch == ord("s"): - i = (SORT_KEYS.index(self.sort_key) + 1) % len(SORT_KEYS) - self.sort_key = SORT_KEYS[i] + idx = (SORT_KEYS.index(self.sort_key) + 1) % len(SORT_KEYS) + self.sort_key = SORT_KEYS[idx] self.sel = 0 + self._reset_scroll() self.status(f"Sort: {self.sort_key}") elif ch == ord("+"): self.refresh = clamp(self.refresh + 0.1, MIN_REFRESH, MAX_REFRESH) @@ -522,22 +628,26 @@ def run(self): self.filter_text = "" self.filter_re = None self.sel = 0 + self._reset_scroll() self.status("Filter cleared") elif ch == ord("f"): self.prompt_filter() self.sel = 0 + self._reset_scroll() elif ch == ord("k"): if self.view == "detail": self.kill_selected(signal.SIGTERM) elif self.view == "groups": - self.kill_group(sig=signal.SIGTERM) + self.kill_group(signal.SIGTERM) elif ch == ord("K"): if self.view == "detail": self.kill_selected(signal.SIGKILL) elif self.view == "groups": - self.kill_group(sig=signal.SIGKILL) + self.kill_group(signal.SIGKILL) elif ch == ord("t") and self.view == "detail": self.tree_mode = not self.tree_mode + self.sel = 0 + self._reset_scroll() self.status(f"Tree view: {'ON' if self.tree_mode else 'OFF'}") elif ch == ord("g") and self.view == "groups": self.kill_group() @@ -554,136 +664,122 @@ def length(self): return len(self.detail_list) def sample(self): - self.sample_skip_counter += 1 - rows = get_proc_rows(self.filter_re) - groups = aggregate(rows) - keymap = { - "mem": "mem_pct", "cpu": "cpu", "rss": "rss_mb", - "swap": "swap_mb", "net": "net_sent_mb", "count": "procs", - } - if self.sort_key == "io": - groups.sort(key=lambda g: (g.io_read_mb + g.io_write_mb), reverse=True) - else: - groups.sort(key=lambda g: getattr(g, keymap[self.sort_key]), reverse=True) + sample = sample_processes(self.filter_re) + groups = sort_groups(aggregate(sample.rows), self.sort_key) self.groups = groups - self.last_proc_rows = rows + self.last_proc_rows = sample.rows + self.sample_error = sample.error if self.view == "detail" and self.detail_app: - lst = [r for r in rows if r.app == self.detail_app] - if self.tree_mode: - self.detail_tree = build_process_tree(lst, self.sort_key) - self.detail_list = lst - else: - if self.sort_key == "mem": - lst.sort(key=lambda x: x.mem_pct, reverse=True) - elif self.sort_key == "cpu": - lst.sort(key=lambda x: x.cpu, reverse=True) - elif self.sort_key == "rss": - lst.sort(key=lambda x: x.rss_mb, reverse=True) - elif self.sort_key == "swap": - lst.sort(key=lambda x: x.swap_mb, reverse=True) - elif self.sort_key == "io": - lst.sort(key=lambda x: (x.io_read_mb + x.io_write_mb), reverse=True) - elif self.sort_key == "net": - lst.sort(key=lambda x: x.net_sent_mb, reverse=True) - else: - lst.sort(key=lambda x: x.pid, reverse=True) - self.detail_list = lst + detail_list = [row for row in sample.rows if row.app == self.detail_app] + self.detail_list = sort_processes(detail_list, self.sort_key) + self.detail_tree = build_process_tree(detail_list, self.sort_key) + else: + self.detail_list = [] + self.detail_tree = [] + self.sel = clamp(self.sel, 0, max(0, self.length() - 1)) def toggle_view(self): if self.view == "groups": - if not self.groups: + group = self._selected_group() + if group is None: return - self.detail_app = self.groups[self.sel].app + self.detail_app = group.app self.view = "detail" - self.sel = 0 else: self.view = "groups" self.detail_app = None - self.sel = 0 self.tree_mode = False + self.detail_list = [] + self.detail_tree = [] + self.sel = 0 + self._reset_scroll() def draw(self): self.stdscr.erase() maxy, maxx = self.stdscr.getmaxyx() - vm, sm = psutil.virtual_memory(), psutil.swap_memory() + self._sync_scroll(maxy) + + vm = psutil.virtual_memory() + sm = psutil.swap_memory() gib = 1024 ** 3 try: l1, l5, l15 = os.getloadavg() except OSError: l1 = l5 = l15 = 0.0 - mem_used = vm.used / gib - mem_total = vm.total / gib - mem_avail = getattr(vm, "available", 0) / gib - mem_free = getattr(vm, "free", 0) / gib - mem_buf = getattr(vm, "buffers", 0) / gib - mem_cache = getattr(vm, "cached", 0) / gib cpu_count = os.cpu_count() or 1 - BG_TITLE = curses.color_pair(10) | curses.A_BOLD - BG_LABEL = curses.color_pair(11) | curses.A_BOLD - BG_OK = curses.color_pair(17) | curses.A_BOLD - BG_WARN = curses.color_pair(18) | curses.A_BOLD - BG_CRIT = curses.color_pair(19) | curses.A_BOLD - BG_NEUT = curses.color_pair(16) - BG_BUF = curses.color_pair(15) | curses.A_BOLD - BG_SWAP_L = curses.color_pair(20) | curses.A_BOLD - BG_SWAP = curses.color_pair(21) | curses.A_BOLD - BG_GAP = curses.color_pair(1) - - def _mem_bg(pct): - if pct > 90: - return BG_CRIT - if pct > 70: - return BG_WARN - return BG_OK - - def _swap_bg(pct): - if pct > 80: - return BG_CRIT - if pct > 50: - return BG_WARN - return BG_SWAP - - def _load_bg(load_val): + + bg_title = curses.color_pair(10) | curses.A_BOLD + bg_label = curses.color_pair(11) | curses.A_BOLD + bg_ok = curses.color_pair(17) | curses.A_BOLD + bg_warn = curses.color_pair(18) | curses.A_BOLD + bg_crit = curses.color_pair(19) | curses.A_BOLD + bg_neut = curses.color_pair(16) + bg_buf = curses.color_pair(15) | curses.A_BOLD + bg_swap_l = curses.color_pair(20) | curses.A_BOLD + bg_swap = curses.color_pair(21) | curses.A_BOLD + bg_gap = curses.color_pair(1) + + def mem_bg(percent): + if percent > 90: + return bg_crit + if percent > 70: + return bg_warn + return bg_ok + + def swap_bg(percent): + if percent > 80: + return bg_crit + if percent > 50: + return bg_warn + return bg_swap + + def load_bg(load_val): per_core = load_val / cpu_count if per_core > 2.0: - return BG_CRIT + return bg_crit if per_core > 1.0: - return BG_WARN - return BG_OK + return bg_warn + return bg_ok - def _avail_bg(gib_val, thresh): - return BG_OK if gib_val > thresh else BG_WARN + def avail_bg(gib_val, threshold): + return bg_ok if gib_val > threshold else bg_warn + mem_used = vm.used / gib + mem_total = vm.total / gib + mem_avail = getattr(vm, "available", 0) / gib + mem_free = getattr(vm, "free", 0) / gib + mem_buf = getattr(vm, "buffers", 0) / gib + mem_cache = getattr(vm, "cached", 0) / gib badges = [ - ("", "ptop3", BG_TITLE, BG_TITLE), - ("mem", f"{mem_used:.1f}/{mem_total:.1f}G", BG_LABEL, _mem_bg(vm.percent)), - ("avl", f"{mem_avail:.1f}G", BG_LABEL, _avail_bg(mem_avail, 2.0)), - ("free", f"{mem_free:.1f}G", BG_LABEL, _avail_bg(mem_free, 1.0)), - ("buf", f"{mem_buf:.1f}G", BG_BUF, BG_NEUT), - ("cache", f"{mem_cache:.1f}G", BG_BUF, BG_NEUT), - ("swap", f"{sm.used/gib:.1f}/{sm.total/gib:.1f}G", BG_SWAP_L, _swap_bg(sm.percent)), - ("load", f"{l1:.2f} {l5:.2f} {l15:.2f}", BG_LABEL, _load_bg(l1)), - ("sort", self.sort_key, BG_LABEL, BG_NEUT), - ("ref", f"{self.refresh:.1f}s", BG_LABEL, BG_NEUT), - ("filt", self.filter_text or "-", BG_LABEL, BG_NEUT), + ("", "ptop3", bg_title, bg_title), + ("mem", f"{mem_used:.1f}/{mem_total:.1f}G", bg_label, mem_bg(vm.percent)), + ("avl", f"{mem_avail:.1f}G", bg_label, avail_bg(mem_avail, 2.0)), + ("free", f"{mem_free:.1f}G", bg_label, avail_bg(mem_free, 1.0)), + ("buf", f"{mem_buf:.1f}G", bg_buf, bg_neut), + ("cache", f"{mem_cache:.1f}G", bg_buf, bg_neut), + ("swap", f"{sm.used/gib:.1f}/{sm.total/gib:.1f}G", bg_swap_l, swap_bg(sm.percent)), + ("load", f"{l1:.2f} {l5:.2f} {l15:.2f}", bg_label, load_bg(l1)), + ("sort", self.sort_key, bg_label, bg_neut), + ("ref", f"{self.refresh:.1f}s", bg_label, bg_neut), + ("filt", self.filter_text or "-", bg_label, bg_neut), ] col = 0 - for label, value, lattr, vattr in badges: + for label, value, label_attr, value_attr in badges: if col >= maxx - 1: break if label: - ltxt = f" {label} " - self.addstr(0, col, ltxt[: maxx - col - 1], lattr) - col += len(ltxt) - vtxt = f" {value} " + label_text = f" {label} " + self.addstr(0, col, label_text[: maxx - col - 1], label_attr) + col += len(label_text) + value_text = f" {value} " if col < maxx - 1: - self.addstr(0, col, vtxt[: maxx - col - 1], vattr) - col += len(vtxt) + self.addstr(0, col, value_text[: maxx - col - 1], value_attr) + col += len(value_text) if col < maxx - 1: - self.addstr(0, col, " ", BG_GAP) + self.addstr(0, col, " ", bg_gap) col += 1 if self.view == "groups": @@ -694,16 +790,18 @@ def _avail_bg(gib_val, thresh): alerts = self.collect_alerts() if alerts: alert_start_y = maxy - len(alerts) - 2 - for i, alert in enumerate(alerts): - y = alert_start_y + i - if y >= 2: - color = curses.color_pair(3) if "critical" in alert.lower() else curses.color_pair(1) - if "critical" in alert.lower(): - color |= curses.A_BOLD - self.addstr(y, 0, alert[: maxx - 1], color) - - if self.status_msg and (time.time() - self.status_time) < 8.0: - self.addstr(maxy - 2, 0, self.status_msg[: maxx - 1], curses.color_pair(2)) + for idx, alert in enumerate(alerts): + y = alert_start_y + idx + if y < 2: + continue + color = curses.color_pair(3) if "critical" in alert.lower() else curses.color_pair(1) + if "critical" in alert.lower(): + color |= curses.A_BOLD + self.addstr(y, 0, alert[: maxx - 1], color) + + status_line = self._status_line() + if status_line: + self.addstr(maxy - 2, 0, status_line[: maxx - 1], curses.color_pair(2)) help_line = ( "↑/↓ PgUp/PgDn Home/End Enter/l expand h back s sort f filter" @@ -712,6 +810,11 @@ def _avail_bg(gib_val, thresh): self.addstr(maxy - 1, 0, help_line[: maxx - 1], curses.color_pair(5)) self.stdscr.refresh() + def _status_line(self) -> str: + if self.status_msg and (time.time() - self.status_time) < 8.0: + return self.status_msg + return self.sample_error + def _make_bar(self, pct, width=10): pct = max(0.0, min(100.0, pct)) filled = int(round(pct / 100.0 * width)) @@ -719,82 +822,90 @@ def _make_bar(self, pct, width=10): def draw_groups(self, maxx, maxy): bar_w = 10 - hdr = ( + header = ( f"{'APP':{MAX_APP_NAME}} {'PROCS':>5} {'RSS(MiB)':>10} {'SWAP(MiB)':>10}" f" {'%MEM':>7} {'MEM':>{bar_w}} {'%CPU':>7} {'CPU':>{bar_w}}" f" {'IO_R':>7} {'IO_W':>7}" ) - self.addstr(2, 0, hdr[: maxx - 1], curses.A_BOLD) - available_rows = maxy - 14 - for i, g in enumerate(self.groups[:available_rows]): - y = 3 + i - mem_bar = self._make_bar(g.mem_pct, bar_w) - cpu_bar = self._make_bar(min(g.cpu, 100.0), bar_w) + self.addstr(2, 0, header[: maxx - 1], curses.A_BOLD) + + available_rows = self._content_rows(maxy) + visible_groups = self.groups[self.scroll:self.scroll + available_rows] + for offset, group in enumerate(visible_groups): + idx = self.scroll + offset + y = 3 + offset + mem_bar = self._make_bar(group.mem_pct, bar_w) + cpu_bar = self._make_bar(min(group.cpu, 100.0), bar_w) line = ( - f"{g.app[:MAX_APP_NAME]:{MAX_APP_NAME}} {g.procs:>5} {g.rss_mb:>10.1f} {g.swap_mb:>10.1f}" - f" {g.mem_pct:>7.1f} {mem_bar} {g.cpu:>7.1f} {cpu_bar}" - f" {g.io_read_mb:>7.1f} {g.io_write_mb:>7.1f}" + f"{group.app[:MAX_APP_NAME]:{MAX_APP_NAME}} {group.procs:>5} {group.rss_mb:>10.1f} {group.swap_mb:>10.1f}" + f" {group.mem_pct:>7.1f} {mem_bar} {group.cpu:>7.1f} {cpu_bar}" + f" {group.io_read_mb:>7.1f} {group.io_write_mb:>7.1f}" ) - color = self.color_group(g) - if i == self.sel: - color = self.sel_attr + color = self.sel_attr if idx == self.sel else self.color_group(group) self.addstr(y, 0, line[: maxx - 1], color) def draw_detail(self, maxx, maxy): mode_label = " [TREE]" if self.tree_mode else "" self.addstr(2, 0, f"App: {self.detail_app}{mode_label}", curses.A_BOLD) + available_rows = self._content_rows(maxy) + if self.tree_mode: - stats_w = 40 self.addstr( - 3, 0, + 3, + 0, f"{'PID':>7} {'RSS(MiB)':>9} {'%CPU':>6} {'%MEM':>6} {'SWAP':>6} TREE", curses.A_BOLD, ) - available_rows = maxy - 15 - for i, (r, depth, prefix) in enumerate(self.detail_tree[:available_rows]): - y = 4 + i - cmd_full = r.cmdline or r.name - tree_str = cmd_full if depth == 0 else prefix + cmd_full + visible_rows = self.detail_tree[self.scroll:self.scroll + available_rows] + for offset, (row, depth, prefix) in enumerate(visible_rows): + idx = self.scroll + offset + y = 4 + offset + stats_w = 40 remaining = maxx - stats_w - 1 + tree_str = row.cmdline or row.name + if depth > 0: + tree_str = prefix + tree_str tree_str = tree_str[: max(remaining, 10)] - line = f"{r.pid:>7} {r.rss_mb:>9.1f} {r.cpu:>6.1f} {r.mem_pct:>6.1f} {r.swap_mb:>6.1f} {tree_str}" - color = self.color_proc(r) - if i == self.sel: - color = self.sel_attr - self.addstr(y, 0, line[: maxx - 1], color) - else: - stats_w = 66 - self.addstr( - 3, 0, - f"{'PID':>7} {'PPID':>7} {'RSS(MiB)':>9} {'%CPU':>6} {'%MEM':>6}" - f" {'SWAP(MiB)':>9} {'IO_R':>7} {'IO_W':>7} CMD", - curses.A_BOLD, - ) - available_rows = maxy - 15 - for i, r in enumerate(self.detail_list[:available_rows]): - y = 4 + i - remaining = maxx - stats_w - 1 - cmd = (r.cmdline or r.name)[: max(remaining, 10)] line = ( - f"{r.pid:>7} {r.ppid:>7} {r.rss_mb:>9.1f} {r.cpu:>6.1f} {r.mem_pct:>6.1f}" - f" {r.swap_mb:>9.1f} {r.io_read_mb:>7.1f} {r.io_write_mb:>7.1f} {cmd}" + f"{row.pid:>7} {row.rss_mb:>9.1f} {row.cpu:>6.1f} " + f"{row.mem_pct:>6.1f} {row.swap_mb:>6.1f} {tree_str}" ) - color = self.color_proc(r) - if i == self.sel: - color = self.sel_attr + color = self.sel_attr if idx == self.sel else self.color_proc(row) self.addstr(y, 0, line[: maxx - 1], color) + return - def color_group(self, g: GroupRow): - if g.cpu >= CPU_HOT * 2 or g.rss_mb >= RSS_HOT_MB * 2 or g.swap_mb >= SWAP_HOT_MB * 2: + self.addstr( + 3, + 0, + f"{'PID':>7} {'PPID':>7} {'RSS(MiB)':>9} {'%CPU':>6} {'%MEM':>6}" + f" {'SWAP(MiB)':>9} {'IO_R':>7} {'IO_W':>7} CMD", + curses.A_BOLD, + ) + visible_rows = self.detail_list[self.scroll:self.scroll + available_rows] + for offset, row in enumerate(visible_rows): + idx = self.scroll + offset + y = 4 + offset + stats_w = 66 + remaining = maxx - stats_w - 1 + cmd = (row.cmdline or row.name)[: max(remaining, 10)] + line = ( + f"{row.pid:>7} {row.ppid:>7} {row.rss_mb:>9.1f} {row.cpu:>6.1f} {row.mem_pct:>6.1f}" + f" {row.swap_mb:>9.1f} {row.io_read_mb:>7.1f} {row.io_write_mb:>7.1f} {cmd}" + ) + color = self.sel_attr if idx == self.sel else self.color_proc(row) + self.addstr(y, 0, line[: maxx - 1], color) + + def color_group(self, group: GroupRow): + if group.cpu >= CPU_HOT * 2 or group.rss_mb >= RSS_HOT_MB * 2 or group.swap_mb >= SWAP_HOT_MB * 2: return curses.color_pair(4) | curses.A_BOLD - if g.cpu >= CPU_HOT or g.rss_mb >= RSS_HOT_MB or g.swap_mb >= SWAP_HOT_MB: + if group.cpu >= CPU_HOT or group.rss_mb >= RSS_HOT_MB or group.swap_mb >= SWAP_HOT_MB: return curses.color_pair(3) | curses.A_BOLD return curses.color_pair(1) - def color_proc(self, r: ProcRow): - if r.cpu >= CPU_HOT * 2 or r.rss_mb >= RSS_HOT_MB * 2 or r.swap_mb >= SWAP_HOT_MB * 2: + def color_proc(self, row: ProcRow): + if row.cpu >= CPU_HOT * 2 or row.rss_mb >= RSS_HOT_MB * 2 or row.swap_mb >= SWAP_HOT_MB * 2: return curses.color_pair(4) | curses.A_BOLD - if r.cpu >= CPU_HOT or r.rss_mb >= RSS_HOT_MB or r.swap_mb >= SWAP_HOT_MB: + if row.cpu >= CPU_HOT or row.rss_mb >= RSS_HOT_MB or row.swap_mb >= SWAP_HOT_MB: return curses.color_pair(3) | curses.A_BOLD return curses.color_pair(1) @@ -822,66 +933,62 @@ def collect_alerts(self): alerts.append(f"{now} SYSTEM {'':<10} {'':<30}: DISK CRITICAL ({disk_usage.percent:.1f}%)") elif disk_usage.percent > DISK_USAGE_HIGH: alerts.append(f"{now} SYSTEM {'':<10} {'':<30}: High disk usage ({disk_usage.percent:.1f}%)") - except Exception: + except OSError: pass - all_rows = self.last_proc_rows or [] - zombie_count = sum(1 for r in all_rows if r.status == "zombie") + zombie_count = sum(1 for row in self.last_proc_rows if row.status == "zombie") if zombie_count > 0: alerts.append(f"{now} SYSTEM {'':<10} {'':<30}: {zombie_count} zombie processes detected") - high_usage_rows = [r for r in all_rows if r.cpu >= 5.0 or r.mem_pct >= 5.0] - high_usage_rows = sorted(high_usage_rows, key=lambda x: x.cpu + x.mem_pct, reverse=True)[:20] - - for r in high_usage_rows: - cmd_short = (r.cmdline or r.name)[:30] - if r.cpu >= CPU_HOT * 2: - alerts.append(f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: CPU critical ({r.cpu:.1f}%)") - elif r.cpu >= CPU_HOT: - alerts.append(f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: High CPU ({r.cpu:.1f}%)") - if r.mem_pct >= 15.0: - alerts.append(f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: MEMORY CRITICAL ({r.mem_pct:.1f}%)") - elif r.mem_pct >= 10.0: - alerts.append(f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: High memory ({r.mem_pct:.1f}%)") - if r.swap_mb >= SWAP_HOT_MB * 3: - alerts.append(f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: SWAP CRITICAL ({r.swap_mb:.1f}MB)") - elif r.swap_mb >= SWAP_HOT_MB: - alerts.append(f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: High swap ({r.swap_mb:.1f}MB)") - if r.mem_pct >= 5.0 and r.swap_mb >= SWAP_HOT_MB: + hot_rows = [row for row in self.last_proc_rows if row.cpu >= 5.0 or row.mem_pct >= 5.0] + hot_rows = sorted(hot_rows, key=lambda row: row.cpu + row.mem_pct, reverse=True)[:20] + for row in hot_rows: + cmd_short = (row.cmdline or row.name)[:30] + if row.cpu >= CPU_HOT * 2: + alerts.append(f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: CPU critical ({row.cpu:.1f}%)") + elif row.cpu >= CPU_HOT: + alerts.append(f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: High CPU ({row.cpu:.1f}%)") + if row.mem_pct >= 15.0: + alerts.append(f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: MEMORY CRITICAL ({row.mem_pct:.1f}%)") + elif row.mem_pct >= 10.0: + alerts.append(f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: High memory ({row.mem_pct:.1f}%)") + if row.swap_mb >= SWAP_HOT_MB * 3: + alerts.append(f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: SWAP CRITICAL ({row.swap_mb:.1f}MB)") + elif row.swap_mb >= SWAP_HOT_MB: + alerts.append(f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: High swap ({row.swap_mb:.1f}MB)") + if row.mem_pct >= 5.0 and row.swap_mb >= SWAP_HOT_MB: alerts.append( - f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: Memory pressure" - f" ({r.mem_pct:.1f}% + {r.swap_mb:.1f}MB)" + f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: Memory pressure" + f" ({row.mem_pct:.1f}% + {row.swap_mb:.1f}MB)" ) - if r.io_read_mb > 100.0 or r.io_write_mb > 100.0: + if row.io_read_mb > 100.0 or row.io_write_mb > 100.0: alerts.append( - f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: High I/O" - f" ({r.io_read_mb:.1f}MB read, {r.io_write_mb:.1f}MB write)" + f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: High I/O" + f" ({row.io_read_mb:.1f}MB read, {row.io_write_mb:.1f}MB write)" ) - if r.status == "zombie": - alerts.append(f"{now} {r.pid:>6} {r.app[:10]:<10} {cmd_short:<30}: ZOMBIE PROCESS") + if row.status == "zombie": + alerts.append(f"{now} {row.pid:>6} {row.app[:10]:<10} {cmd_short:<30}: ZOMBIE PROCESS") - top_groups = sorted(self.groups, key=lambda x: x.cpu, reverse=True)[:10] - for g in top_groups: - if g.cpu >= CPU_HOT * 2: + for group in sorted(self.groups, key=lambda row: row.cpu, reverse=True)[:10]: + if group.cpu >= CPU_HOT * 2: alerts.append( - f"{now} group {g.app[:10]:<10} {g.procs:>3}procs{'':<26}: Group CPU critical ({g.cpu:.1f}%)" + f"{now} group {group.app[:10]:<10} {group.procs:>3}procs{'':<26}: Group CPU critical ({group.cpu:.1f}%)" ) - elif g.cpu >= CPU_HOT: + elif group.cpu >= CPU_HOT: alerts.append( - f"{now} group {g.app[:10]:<10} {g.procs:>3}procs{'':<26}: Group high CPU ({g.cpu:.1f}%)" + f"{now} group {group.app[:10]:<10} {group.procs:>3}procs{'':<26}: Group high CPU ({group.cpu:.1f}%)" ) - if g.swap_mb >= SWAP_HOT_MB * 4: + if group.swap_mb >= SWAP_HOT_MB * 4: alerts.append( - f"{now} group {g.app[:10]:<10} {g.procs:>3}procs{'':<26}: Group swap critical ({g.swap_mb:.1f}MB)" + f"{now} group {group.app[:10]:<10} {group.procs:>3}procs{'':<26}: Group swap critical ({group.swap_mb:.1f}MB)" ) - result = alerts[-10:] - self.alerts_cache = result + self.alerts_cache = alerts[-10:] self.alerts_cache_time = current_time - return result - except Exception as e: + return self.alerts_cache + except Exception as exc: now = time.strftime("%H:%M:%S") - error_alert = [f"{now} ERROR: Alert collection failed: {str(e)[:40]}"] + error_alert = [f"{now} ERROR: Alert collection failed: {str(exc)[:40]}"] self.alerts_cache = error_alert self.alerts_cache_time = current_time return error_alert @@ -911,38 +1018,37 @@ def prompt_filter(self): self.status("Filter cleared") def kill_selected(self, sig): - if not self.detail_list: + row = self._selected_proc() + if row is None: return - pid = self.detail_list[self.sel].pid try: - psutil.Process(pid).send_signal(sig) - name = self.detail_list[self.sel].name - self.status(f"Sent {signal.Signals(sig).name} to {pid} ({name})") - except (psutil.NoSuchProcess, psutil.AccessDenied) as e: - self.status(str(e)) + psutil.Process(row.pid).send_signal(sig) + self.status(f"Sent {signal.Signals(sig).name} to {row.pid} ({row.name})") + except (psutil.NoSuchProcess, psutil.AccessDenied) as exc: + self.status(str(exc)) def kill_group(self, sig=signal.SIGTERM): - if not self.groups: + group = self._selected_group() + if group is None: return - app = self.groups[self.sel].app sig_name = signal.Signals(sig).name - n = 0 + count = 0 denied = 0 - for p in psutil.process_iter(): + for proc in psutil.process_iter(): try: - name = p.name() or "" + name = proc.name() or "" try: - cmd = " ".join(p.cmdline()[:4]) + cmd = " ".join(proc.cmdline()[:4]) except (psutil.AccessDenied, psutil.ZombieProcess): cmd = "" - if normalize_app_name(name, cmd) == app: - p.send_signal(sig) - n += 1 + if normalize_app_name(name, cmd) == group.app: + proc.send_signal(sig) + count += 1 except psutil.AccessDenied: denied += 1 except (psutil.NoSuchProcess, psutil.ZombieProcess): - pass - msg = f"Sent {sig_name} to '{app}' ({n} procs)" + continue + msg = f"Sent {sig_name} to '{group.app}' ({count} procs)" if denied: msg += f" [{denied} denied]" self.status(msg) @@ -959,17 +1065,18 @@ def run_swap_clean(self): self.draw() try: proc = subprocess.run(cmd, capture_output=True, text=True) - if proc.returncode == 0: - self.status("swap-clean finished") - else: - msg = (proc.stderr or proc.stdout or "").strip().splitlines()[-1:] - tail = f": {msg[0]}" if msg else "" - if use_sudo and "password" in (proc.stderr or "").lower(): - self.status("swap-clean needs NOPASSWD sudo — run: sudo ptop3 --init-subscripts") - else: - self.status(f"swap-clean failed ({proc.returncode}){tail}") - except Exception as e: - self.status(f"swap-clean error: {str(e)[:60]}") + except Exception as exc: + self.status(f"swap-clean error: {str(exc)[:60]}") + return + if proc.returncode == 0: + self.status("swap-clean finished") + return + msg = (proc.stderr or proc.stdout or "").strip().splitlines()[-1:] + tail = f": {msg[0]}" if msg else "" + if use_sudo and "password" in (proc.stderr or "").lower(): + self.status("swap-clean needs NOPASSWD sudo — run: sudo ptop3 --init-subscripts") + else: + self.status(f"swap-clean failed ({proc.returncode}){tail}") def run_drop_caches(self): cmd = _subscript_cmd("ptop3-drop-caches") @@ -983,19 +1090,20 @@ def run_drop_caches(self): self.draw() try: proc = subprocess.run(cmd, capture_output=True, text=True) - if proc.returncode == 0: - msg = (proc.stdout or "").strip().splitlines()[-1:] - tail = msg[0] if msg else "done" - self.status(f"drop-caches: {tail}") - else: - msg = (proc.stderr or proc.stdout or "").strip().splitlines()[-1:] - tail = f": {msg[0]}" if msg else "" - if use_sudo and "password" in (proc.stderr or "").lower(): - self.status("drop-caches needs NOPASSWD sudo — run: sudo ptop3 --init-subscripts") - else: - self.status(f"drop-caches failed ({proc.returncode}){tail}") - except Exception as e: - self.status(f"drop-caches error: {str(e)[:60]}") + except Exception as exc: + self.status(f"drop-caches error: {str(exc)[:60]}") + return + if proc.returncode == 0: + msg = (proc.stdout or "").strip().splitlines()[-1:] + tail = msg[0] if msg else "done" + self.status(f"drop-caches: {tail}") + return + msg = (proc.stderr or proc.stdout or "").strip().splitlines()[-1:] + tail = f": {msg[0]}" if msg else "" + if use_sudo and "password" in (proc.stderr or "").lower(): + self.status("drop-caches needs NOPASSWD sudo — run: sudo ptop3 --init-subscripts") + else: + self.status(f"drop-caches failed ({proc.returncode}){tail}") def addstr(self, y, x, text, attrs=0): maxy, maxx = self.stdscr.getmaxyx() @@ -1004,7 +1112,7 @@ def addstr(self, y, x, text, attrs=0): def clrtoeol(self): y, x = self.stdscr.getyx() - maxy, maxx = self.stdscr.getmaxyx() + _maxy, maxx = self.stdscr.getmaxyx() self.stdscr.addnstr(y, x, " " * (maxx - x - 1), maxx - x - 1) def status(self, msg): @@ -1023,39 +1131,39 @@ def parse_args(): ap.add_argument("--check-sudo", action="store_true", help="check sudo config for subscripts") ap.add_argument("--init-subscripts", action="store_true", help="write /etc/sudoers.d/ptop3 for subscripts") args = ap.parse_args() - fil = None + filter_re = None if args.filter: try: - fil = re.compile(args.filter, re.IGNORECASE) + filter_re = re.compile(args.filter, re.IGNORECASE) except re.error: print("Invalid regex for --filter; ignoring.", file=sys.stderr) - return args, fil + return args, filter_re def main(): - args, fil = parse_args() + args, filter_re = parse_args() global LITE_MODE - if args.lite: - LITE_MODE = True + LITE_MODE = bool(args.lite) if args.check_sudo: from ptop3.sudo_config import check_sudo + result = check_sudo() for script, status in result.items(): print(f" {script}: {status}") - all_ok = all(v == "ok" for v in result.values()) - sys.exit(0 if all_ok else 1) + sys.exit(0 if all(value == "ok" for value in result.values()) else 1) if args.init_subscripts: from ptop3.sudo_config import init_subscripts + init_subscripts() return if args.once: - print_once(fil, args.sort, args.top) + print_once(filter_re, args.sort, args.top) return - curses.wrapper(lambda stdscr: TUI(stdscr, fil, args.sort, args.refresh).run()) + curses.wrapper(lambda stdscr: TUI(stdscr, filter_re, args.sort, args.refresh).run()) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 8cfb4a1..71be716 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,12 @@ dependencies = ["psutil>=5.9"] license = {text = "MIT"} readme = "README.md" +[project.urls] +Homepage = "https://github.com/spazyCZ/ptop3" +Repository = "https://github.com/spazyCZ/ptop3" +Changelog = "https://github.com/spazyCZ/ptop3/blob/main/CHANGELOG.md" +"Bug Tracker" = "https://github.com/spazyCZ/ptop3/issues" + [project.scripts] ptop3 = "ptop3.monitor:main" ptop3-drop-caches = "ptop3.scripts.drop_caches:main" @@ -27,6 +33,13 @@ include = ["ptop3*"] testpaths = ["tests"] addopts = "-v --tb=short" +[tool.coverage.run] +source = ["ptop3"] + +[tool.coverage.report] +include = ["ptop3/*"] +show_missing = true + [tool.ruff] line-length = 100 target-version = "py310" diff --git a/tests/conftest.py b/tests/conftest.py index 0c74ba2..6555959 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,16 @@ """Shared fixtures for ptop3 tests.""" -import io +import sys +from pathlib import Path + import pytest +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +for module_name in list(sys.modules): + if module_name == "ptop3" or module_name.startswith("ptop3."): + sys.modules.pop(module_name, None) MEMINFO_TEMPLATE = """\ MemTotal: 32768000 kB diff --git a/tests/test_drop_caches.py b/tests/test_drop_caches.py index 5adc4a1..193c48c 100644 --- a/tests/test_drop_caches.py +++ b/tests/test_drop_caches.py @@ -1,8 +1,9 @@ """Tests for ptop3.scripts.drop_caches.""" +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, call, mock_open, MagicMock -from ptop3.scripts.drop_caches import read_mem_available, drop_caches +from ptop3.scripts.drop_caches import drop_caches, read_mem_available # --------------------------------------------------------------------------- # read_mem_available @@ -45,7 +46,7 @@ def test_drop_caches_writes_correct_level(tmp_path, tmp_meminfo): with patch("ptop3.scripts.drop_caches.subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0) - freed = drop_caches( + drop_caches( level=2, dry_run=False, meminfo_path=tmp_meminfo, @@ -96,3 +97,17 @@ def test_main_passes_when_root(tmp_path, tmp_meminfo): ) as mock_dc: main() mock_dc.assert_called_once() + + +def test_main_exits_on_oserror(capsys): + """main() should surface OSError from drop_caches().""" + from ptop3.scripts.drop_caches import main + + with patch("os.geteuid", return_value=0): + with patch("sys.argv", ["ptop3-drop-caches"]): + with patch("ptop3.scripts.drop_caches.drop_caches", side_effect=OSError("boom")): + with pytest.raises(SystemExit) as exc_info: + main() + + assert exc_info.value.code == 1 + assert "error: boom" in capsys.readouterr().err diff --git a/tests/test_monitor.py b/tests/test_monitor.py index e35119a..c239d47 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -1,15 +1,12 @@ -"""Tests for ptop3.monitor data layer functions.""" -import re +"""Tests for ptop3.monitor.""" +import importlib +from types import SimpleNamespace +from unittest.mock import MagicMock + import pytest -from ptop3.monitor import ( - ProcRow, - GroupRow, - normalize_app_name, - aggregate, - build_process_tree, - SORT_KEYS, -) +import ptop3.monitor as monitor +from ptop3.monitor import SORT_KEYS, ProcRow, aggregate, build_process_tree, normalize_app_name # --------------------------------------------------------------------------- # normalize_app_name @@ -56,7 +53,7 @@ def _make_row(**kwargs) -> ProcRow: defaults = dict( pid=1, ppid=0, name="app", rss_mb=100.0, cpu=10.0, mem_pct=5.0, swap_mb=0.0, cmdline="", app="myapp", - io_read_mb=0.0, io_write_mb=0.0, net_sent_mb=0.0, net_recv_mb=0.0, + io_read_mb=0.0, io_write_mb=0.0, status="running", ) defaults.update(kwargs) @@ -223,3 +220,985 @@ def test_parse_args_init_subscripts(monkeypatch): from ptop3.monitor import parse_args args, _ = parse_args() assert args.init_subscripts is True + + +def test_parse_args_invalid_filter_prints_warning(monkeypatch, capsys): + monkeypatch.setattr("sys.argv", ["ptop3", "--filter", "["]) + args, fil = monitor.parse_args() + assert args.filter == "[" + assert fil is None + assert "Invalid regex for --filter; ignoring." in capsys.readouterr().err + + +def test_find_subscript_uses_path_lookup(monkeypatch): + monkeypatch.setattr(monitor.shutil, "which", lambda name: f"/usr/bin/{name}") + assert monitor._find_subscript("ptop3-drop-caches") == "/usr/bin/ptop3-drop-caches" + + +def test_find_subscript_uses_dev_fallback(monkeypatch, tmp_path): + monkeypatch.setattr(monitor.shutil, "which", lambda name: None) + scripts_dir = tmp_path / "scripts" + scripts_dir.mkdir() + fake_file = scripts_dir / "drop_caches.py" + fake_file.write_text("print('ok')\n") + + monkeypatch.setattr(monitor, "__file__", str(tmp_path / "monitor.py")) + + assert monitor._find_subscript("ptop3-drop-caches") == str(fake_file) + + +def test_subscript_cmd_uses_python_for_dev_script(monkeypatch, tmp_path): + monkeypatch.setattr(monitor.shutil, "which", lambda name: None) + scripts_dir = tmp_path / "scripts" + scripts_dir.mkdir() + fake_file = scripts_dir / "swap_clean.py" + fake_file.write_text("print('ok')\n") + monkeypatch.setattr(monitor, "__file__", str(tmp_path / "monitor.py")) + + assert monitor._subscript_cmd("ptop3-swap-clean") == [monitor.sys.executable, str(fake_file)] + + +def test_read_vmswap_mb_reads_and_caches(monkeypatch): + monitor.SWAP_CACHE.clear() + now = {"value": 100.0} + + def fake_time(): + return now["value"] + + file_obj = MagicMock() + file_obj.__enter__.return_value = ["Name:\tproc\n", "VmSwap:\t2048 kB\n"] + file_obj.__exit__.return_value = False + opener = MagicMock(return_value=file_obj) + + monkeypatch.setattr(monitor.time, "time", fake_time) + monkeypatch.setattr("builtins.open", opener) + + assert monitor.read_vmswap_mb(123) == pytest.approx(2.0) + assert monitor.read_vmswap_mb(123) == pytest.approx(2.0) + assert opener.call_count == 1 + + +def test_read_vmswap_mb_missing_file_returns_zero(monkeypatch): + monitor.SWAP_CACHE.clear() + monkeypatch.setattr(monitor.time, "time", lambda: 50.0) + + def raising_open(*args, **kwargs): + raise OSError("missing") + + monkeypatch.setattr("builtins.open", raising_open) + assert monitor.read_vmswap_mb(999) == 0.0 + + +class _FakeProc: + def __init__( + self, + pid, + ppid, + name, + cmdline, + rss_mb, + cpu, + status="running", + io_read_mb=0.0, + io_write_mb=0.0, + memory_denied=False, + ): + self.info = {"pid": pid, "ppid": ppid, "name": name} + self.pid = pid + self._cmdline = cmdline + self._rss_bytes = int(rss_mb * 1024 * 1024) + self._cpu = cpu + self._status = status + self._io_read_bytes = int(io_read_mb * 1024 * 1024) + self._io_write_bytes = int(io_write_mb * 1024 * 1024) + self._memory_denied = memory_denied + + def oneshot(self): + return self + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def cmdline(self): + return self._cmdline + + def memory_info(self): + if self._memory_denied: + raise monitor.psutil.AccessDenied(pid=self.info["pid"]) + return SimpleNamespace(rss=self._rss_bytes) + + def cpu_percent(self): + return self._cpu + + def io_counters(self): + return SimpleNamespace(read_bytes=self._io_read_bytes, write_bytes=self._io_write_bytes) + + def status(self): + return self._status + + +def test_get_proc_rows_filters_and_collects_metrics(monkeypatch): + monitor.PID_CACHE.clear() + monitor.SWAP_CACHE.clear() + monitor.LITE_MODE = False + + procs = [ + _FakeProc(10, 1, "python3", ["python3", "server.py"], rss_mb=64, cpu=12.5, status="sleeping", io_read_mb=5.0, io_write_mb=3.0), + _FakeProc(11, 1, "bash", ["bash"], rss_mb=1, cpu=1.0, memory_denied=True), + ] + + monkeypatch.setattr(monitor.psutil, "process_iter", lambda attrs=None: procs) + monkeypatch.setattr(monitor.psutil, "virtual_memory", lambda: SimpleNamespace(total=1024 * 1024 * 1024)) + monkeypatch.setattr(monitor, "read_vmswap_mb", lambda pid: 4.0 if pid == 10 else 0.0) + + rows = monitor.get_proc_rows(filter_re=monitor.re.compile("python")) + + assert len(rows) == 1 + row = rows[0] + assert row.pid == 10 + assert row.app == "python" + assert row.cpu == pytest.approx(12.5) + assert row.swap_mb == pytest.approx(4.0) + assert row.io_read_mb == pytest.approx(5.0) + assert row.io_write_mb == pytest.approx(3.0) + assert row.status == "sleeping" + + +def test_print_once_sorts_by_io(monkeypatch, capsys): + rows = [ + _make_row(pid=1, app="foo", io_read_mb=1.0, io_write_mb=2.0), + _make_row(pid=2, app="bar", io_read_mb=10.0, io_write_mb=0.0), + ] + monkeypatch.setattr(monitor, "sample_processes", lambda filter_re: monitor.SampleResult(rows)) + + monitor.print_once(None, "io", 2) + + out = capsys.readouterr().out.strip().splitlines() + assert out[0].startswith("APP") + assert out[1].startswith("bar") + assert out[2].startswith("foo") + + +@pytest.mark.parametrize( + ("value", "lo", "hi", "expected"), + [ + (-1, 0, 5, 0), + (10, 0, 5, 5), + (3, 0, 5, 3), + ], +) +def test_clamp(value, lo, hi, expected): + assert monitor.clamp(value, lo, hi) == expected + + +@pytest.mark.parametrize( + ("sort_key", "attr"), + [ + ("mem", "mem_pct"), + ("cpu", "cpu"), + ("rss", "rss_mb"), + ("swap", "swap_mb"), + ], +) +def test_tree_sort_key(sort_key, attr): + row = _make_row(**{attr: 9.0}) + assert monitor._tree_sort_key(sort_key)(row) == 9.0 + + +def test_tree_sort_key_io_and_default(): + row = _make_row(io_read_mb=2.0, io_write_mb=3.0, rss_mb=7.0) + assert monitor._tree_sort_key("io")(row) == 5.0 + assert monitor._tree_sort_key("count")(row) == 7.0 + + +class _FakeScreen: + def __init__(self, keys=None, text=b""): + self.calls = [] + self.keys = list(keys or []) + self.text = text + self.timeout_value = None + self.keypad_value = None + self.nodelay_calls = [] + self.erased = 0 + self.refreshed = 0 + + def getmaxyx(self): + return (24, 120) + + def addnstr(self, y, x, text, width, attrs=0): + self.calls.append((y, x, text, width, attrs)) + + def getyx(self): + return (2, 3) + + def keypad(self, value): + self.keypad_value = value + + def timeout(self, value): + self.timeout_value = value + + def getch(self): + if self.keys: + return self.keys.pop(0) + return -1 + + def nodelay(self, value): + self.nodelay_calls.append(value) + + def erase(self): + self.erased += 1 + + def refresh(self): + self.refreshed += 1 + + def getstr(self, y, x): + return self.text + + +def _patch_fake_curses(monkeypatch, has_colors=True, curs_set_error=False, pair_error=False): + monkeypatch.setattr(monitor.curses, "noecho", lambda: None) + monkeypatch.setattr(monitor.curses, "cbreak", lambda: None) + monkeypatch.setattr(monitor.curses, "start_color", lambda: None) + monkeypatch.setattr(monitor.curses, "use_default_colors", lambda: None) + monkeypatch.setattr(monitor.curses, "has_colors", lambda: has_colors) + + pair_failed = {"value": False} + + def fake_curs_set(value): + if curs_set_error: + raise monitor.curses.error("no cursor") + + def fake_init_pair(pair, fg, bg): + if pair_error and pair in (20, 21) and not pair_failed["value"]: + pair_failed["value"] = True + raise ValueError("fallback") + + monkeypatch.setattr(monitor.curses, "curs_set", fake_curs_set) + monkeypatch.setattr(monitor.curses, "init_pair", fake_init_pair) + monkeypatch.setattr(monitor.curses, "color_pair", lambda value: value * 100) + + +def _make_initialized_tui(monkeypatch, keys=None, text=b"", has_colors=True, curs_set_error=False, pair_error=False): + _patch_fake_curses(monkeypatch, has_colors=has_colors, curs_set_error=curs_set_error, pair_error=pair_error) + screen = _FakeScreen(keys=keys, text=text) + tui = monitor.TUI(screen, None, "mem", 2.0) + return tui, screen + + +def test_tui_length_and_toggle_view(): + tui = object.__new__(monitor.TUI) + tui.view = "groups" + tui.groups = [SimpleNamespace(app="python")] + tui.detail_tree = [("tree",)] + tui.detail_list = [_make_row()] + tui.tree_mode = False + tui.sel = 0 + + assert tui.length() == 1 + tui.toggle_view() + assert tui.view == "detail" + assert tui.detail_app == "python" + tui.tree_mode = True + assert tui.length() == 1 + tui.toggle_view() + assert tui.view == "groups" + assert tui.detail_app is None + assert tui.tree_mode is False + + +def test_tui_init_sets_up_screen(monkeypatch): + tui, screen = _make_initialized_tui(monkeypatch, has_colors=True, curs_set_error=True, pair_error=True) + + assert screen.keypad_value is True + assert screen.timeout_value == 2000 + assert tui.sel_attr == monitor.curses.color_pair(7) | monitor.curses.A_BOLD + + +def test_tui_init_without_colors_uses_reverse(monkeypatch): + tui, _screen = _make_initialized_tui(monkeypatch, has_colors=False) + + assert tui.sel_attr == monitor.curses.A_REVERSE + + +def test_tui_addstr_and_clrtoeol(): + tui = object.__new__(monitor.TUI) + tui.stdscr = _FakeScreen() + + tui.addstr(1, 1, "hello", 7) + tui.addstr(99, 1, "skip", 0) + tui.clrtoeol() + + assert tui.stdscr.calls[0][:3] == (1, 1, "hello") + assert tui.stdscr.calls[1][0:2] == (2, 3) + + +def test_tui_status_sets_timestamp(monkeypatch): + tui = object.__new__(monitor.TUI) + monkeypatch.setattr(monitor.time, "time", lambda: 123.0) + tui.status("ready") + assert tui.status_msg == "ready" + assert tui.status_time == 123.0 + + +def test_tui_content_and_selected_helpers(): + tui = object.__new__(monitor.TUI) + tui.scroll = 3 + tui.sel = 0 + tui.view = "groups" + tui.groups = [_make_row(app="one"), _make_row(app="two")] + tui.tree_mode = False + tui.detail_list = [] + tui.detail_tree = [] + + assert tui._content_rows(20) == 5 + tui._reset_scroll() + assert tui.scroll == 0 + assert tui._selected_group().app == "one" + assert tui._selected_proc() is None + + +def test_tui_sync_scroll(monkeypatch): + tui = object.__new__(monitor.TUI) + tui.sel = 6 + tui.scroll = 0 + tui.view = "groups" + tui.groups = [_make_row(app=str(i)) for i in range(10)] + tui.tree_mode = False + tui.detail_list = [] + tui.detail_tree = [] + + tui._sync_scroll(20) + + assert tui.scroll == 2 + + +def test_tui_kill_selected(monkeypatch): + signals = [] + proc = SimpleNamespace(send_signal=lambda sig: signals.append(sig)) + monkeypatch.setattr(monitor.psutil, "Process", lambda pid: proc) + + tui = object.__new__(monitor.TUI) + tui.view = "detail" + tui.tree_mode = False + tui.detail_list = [_make_row(pid=42, name="worker")] + tui.sel = 0 + tui.status = lambda msg: setattr(tui, "last_status", msg) + + tui.kill_selected(monitor.signal.SIGTERM) + + assert signals == [monitor.signal.SIGTERM] + assert "42" in tui.last_status + + +def test_tui_kill_selected_uses_tree_selection(monkeypatch): + signals = [] + proc = SimpleNamespace(send_signal=lambda sig: signals.append(sig)) + monkeypatch.setattr(monitor.psutil, "Process", lambda pid: proc) + + tui = object.__new__(monitor.TUI) + tui.view = "detail" + tui.tree_mode = True + tui.detail_list = [_make_row(pid=1, name="wrong")] + tui.detail_tree = [(_make_row(pid=99, name="right"), 0, "")] + tui.sel = 0 + tui.status = lambda msg: setattr(tui, "last_status", msg) + + tui.kill_selected(monitor.signal.SIGTERM) + + assert signals == [monitor.signal.SIGTERM] + assert "99" in tui.last_status + + +def test_tui_kill_group(monkeypatch): + killed = [] + + class FakeProcess: + def __init__(self, name, cmd): + self._name = name + self._cmd = cmd + + def name(self): + return self._name + + def cmdline(self): + return self._cmd + + def send_signal(self, sig): + killed.append(sig) + + monkeypatch.setattr( + monitor.psutil, + "process_iter", + lambda: [FakeProcess("python3", ["python3"]), FakeProcess("bash", ["bash"])], + ) + + tui = object.__new__(monitor.TUI) + tui.groups = [SimpleNamespace(app="python")] + tui.sel = 0 + tui.status = lambda msg: setattr(tui, "last_status", msg) + + tui.kill_group() + + assert killed == [monitor.signal.SIGTERM] + assert "python" in tui.last_status + + +def test_tui_kill_group_denied(monkeypatch): + class FakeProcess: + def name(self): + raise monitor.psutil.AccessDenied(pid=1) + + monkeypatch.setattr(monitor.psutil, "process_iter", lambda: [FakeProcess()]) + + tui = object.__new__(monitor.TUI) + tui.groups = [SimpleNamespace(app="python")] + tui.sel = 0 + tui.status = lambda msg: setattr(tui, "last_status", msg) + + tui.kill_group() + + assert "denied" in tui.last_status + + +def test_tui_run_swap_clean_handles_password_error(monkeypatch): + monkeypatch.setattr(monitor, "_subscript_cmd", lambda name: ["ptop3-swap-clean"]) + monkeypatch.setattr(monitor.os, "geteuid", lambda: 1000) + monkeypatch.setattr( + monitor.subprocess, + "run", + lambda *args, **kwargs: SimpleNamespace(returncode=1, stderr="sudo: a password is required", stdout=""), + ) + + tui = object.__new__(monitor.TUI) + msgs = [] + tui.status = lambda msg: msgs.append(msg) + tui.draw = lambda: None + + tui.run_swap_clean() + + assert msgs[-1].startswith("swap-clean needs NOPASSWD sudo") + + +def test_tui_run_swap_clean_not_found(monkeypatch): + monkeypatch.setattr(monitor, "_subscript_cmd", lambda name: None) + tui = object.__new__(monitor.TUI) + msgs = [] + tui.status = lambda msg: msgs.append(msg) + + tui.run_swap_clean() + + assert msgs[-1].startswith("ptop3-swap-clean not found") + + +def test_tui_run_swap_clean_exception(monkeypatch): + monkeypatch.setattr(monitor, "_subscript_cmd", lambda name: ["ptop3-swap-clean"]) + monkeypatch.setattr(monitor.os, "geteuid", lambda: 0) + + def raise_run(*args, **kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr(monitor.subprocess, "run", raise_run) + tui = object.__new__(monitor.TUI) + msgs = [] + tui.status = lambda msg: msgs.append(msg) + tui.draw = lambda: None + + tui.run_swap_clean() + + assert msgs[-1].startswith("swap-clean error:") + + +def test_tui_run_drop_caches_success(monkeypatch): + monkeypatch.setattr(monitor, "_subscript_cmd", lambda name: ["ptop3-drop-caches"]) + monkeypatch.setattr(monitor.os, "geteuid", lambda: 0) + monkeypatch.setattr( + monitor.subprocess, + "run", + lambda *args, **kwargs: SimpleNamespace(returncode=0, stderr="", stdout="Dropped caches (level 3), freed 0 MB.\n"), + ) + + tui = object.__new__(monitor.TUI) + msgs = [] + tui.status = lambda msg: msgs.append(msg) + tui.draw = lambda: None + + tui.run_drop_caches() + + assert msgs[-1] == "drop-caches: Dropped caches (level 3), freed 0 MB." + + +def test_tui_run_drop_caches_not_found(monkeypatch): + monkeypatch.setattr(monitor, "_subscript_cmd", lambda name: None) + tui = object.__new__(monitor.TUI) + msgs = [] + tui.status = lambda msg: msgs.append(msg) + + tui.run_drop_caches() + + assert msgs[-1].startswith("ptop3-drop-caches not found") + + +def test_tui_run_drop_caches_generic_failure(monkeypatch): + monkeypatch.setattr(monitor, "_subscript_cmd", lambda name: ["ptop3-drop-caches"]) + monkeypatch.setattr(monitor.os, "geteuid", lambda: 0) + monkeypatch.setattr( + monitor.subprocess, + "run", + lambda *args, **kwargs: SimpleNamespace(returncode=2, stderr="failed badly", stdout=""), + ) + + tui = object.__new__(monitor.TUI) + msgs = [] + tui.status = lambda msg: msgs.append(msg) + tui.draw = lambda: None + + tui.run_drop_caches() + + assert msgs[-1].startswith("drop-caches failed (2)") + + +def test_visible_window_start(): + assert monitor.visible_window_start(selected=0, current_start=0, window_size=5, total=10) == 0 + assert monitor.visible_window_start(selected=6, current_start=0, window_size=5, total=10) == 2 + assert monitor.visible_window_start(selected=9, current_start=2, window_size=5, total=10) == 5 + + +def test_visible_window_start_small_window(): + assert monitor.visible_window_start(selected=3, current_start=2, window_size=0, total=10) == 0 + assert monitor.visible_window_start(selected=2, current_start=2, window_size=5, total=3) == 0 + + +def test_sort_helpers(): + groups = monitor.sort_groups([ + monitor.GroupRow(app="a", procs=1, rss_mb=1, mem_pct=2, cpu=3, swap_mb=4, io_read_mb=0, io_write_mb=1), + monitor.GroupRow(app="b", procs=5, rss_mb=10, mem_pct=1, cpu=1, swap_mb=0, io_read_mb=0, io_write_mb=0), + ], "count") + procs = monitor.sort_processes([ + _make_row(pid=1, rss_mb=1.0), + _make_row(pid=2, rss_mb=5.0), + ], "rss") + + assert groups[0].app == "b" + assert procs[0].pid == 2 + + +def test_helper_functions(monkeypatch): + proc = _FakeProc(1, 0, "python3", ["python3"], rss_mb=64, cpu=1.0, io_read_mb=2.0, io_write_mb=3.0) + + assert monitor._matches_filter(monitor.re.compile("py").search, "python", "python3", "python3") is True + assert monitor._safe_cmdline(proc) == "python3" + assert monitor._io_values(proc, 64, False) == (2.0, 3.0) + assert monitor._io_values(proc, 1, False) == (0.0, 0.0) + assert monitor._swap_value(1, 1, False) == 0.0 + + monkeypatch.setattr(monitor, "read_vmswap_mb", lambda pid: 9.0) + assert monitor._swap_value(1, 64, False) == 9.0 + + +def test_safe_cmdline_handles_access_denied(): + class DeniedProc: + def cmdline(self): + raise monitor.psutil.AccessDenied(pid=1) + + assert monitor._safe_cmdline(DeniedProc()) == "" + + +def test_color_helpers(monkeypatch): + _patch_fake_curses(monkeypatch) + tui = object.__new__(monitor.TUI) + + assert tui.color_group(monitor.GroupRow(app="x", procs=1, rss_mb=2000, mem_pct=1, cpu=1, swap_mb=1)) == monitor.curses.color_pair(4) | monitor.curses.A_BOLD + assert tui.color_group(monitor.GroupRow(app="x", procs=1, rss_mb=900, mem_pct=1, cpu=1, swap_mb=1)) == monitor.curses.color_pair(3) | monitor.curses.A_BOLD + assert tui.color_proc(_make_row(cpu=250)) == monitor.curses.color_pair(4) | monitor.curses.A_BOLD + assert tui.color_proc(_make_row(cpu=120)) == monitor.curses.color_pair(3) | monitor.curses.A_BOLD + + +def test_sampler_cleanup_and_failure_paths(monkeypatch): + sampler = monitor.ProcessSampler(pid_cache_ttl=1.0, swap_cache_ttl=1.0) + sampler.pid_cache = {1: (0.0, "a", "", "a")} + sampler.swap_cache = {1: (0.0, 1.0)} + sampler._cleanup_cache(20.0) + + assert sampler.pid_cache == {} + assert sampler.swap_cache == {} + + monkeypatch.setattr(monitor.psutil, "virtual_memory", lambda: SimpleNamespace(total=1024)) + monkeypatch.setattr(monitor.psutil, "process_iter", lambda attrs=None: (_ for _ in ()).throw(RuntimeError("no iter"))) + + sample = sampler.sample(None) + + assert sample.rows == [] + assert sample.error.startswith("sampling failed:") + + +def test_sampler_sample_process_cached_and_status_unknown(monkeypatch): + sampler = monitor.ProcessSampler() + proc = _FakeProc(1, 0, "python3", ["python3"], rss_mb=4, cpu=3.0) + sampler.pid_cache[1] = (10.0, "python3", "", "python") + + def raising_status(): + raise monitor.psutil.AccessDenied(pid=1) + + proc.status = raising_status + row = sampler._sample_process(proc, 10.1, 1.0, False, monitor.re.compile("python").search) + + assert row is not None + assert row.status == "unknown" + + +def test_sampler_sample_process_filtered_out_and_memory_denied(monkeypatch): + sampler = monitor.ProcessSampler() + proc = _FakeProc(1, 0, "bash", ["bash"], rss_mb=4, cpu=1.0) + assert sampler._sample_process(proc, 1.0, 1.0, False, monitor.re.compile("python").search) is None + + denied = _FakeProc(2, 0, "python3", ["python3"], rss_mb=4, cpu=1.0, memory_denied=True) + assert sampler._sample_process(denied, 1.0, 1.0, False, None) is None + + +def test_sampler_read_vmswap_without_vmswap_line(monkeypatch): + sampler = monitor.ProcessSampler() + monkeypatch.setattr(monitor.time, "time", lambda: 1.0) + file_obj = MagicMock() + file_obj.__enter__.return_value = ["Name:\tproc\n"] + file_obj.__exit__.return_value = False + monkeypatch.setattr("builtins.open", lambda *args, **kwargs: file_obj) + + assert sampler.read_vmswap_mb(1) == 0.0 + + +def test_tui_sample_updates_detail_views(monkeypatch): + rows = [ + _make_row(pid=1, app="python", cpu=10.0, rss_mb=10.0), + _make_row(pid=2, app="python", cpu=5.0, rss_mb=20.0, ppid=1), + _make_row(pid=3, app="bash", cpu=1.0, rss_mb=1.0), + ] + monkeypatch.setattr(monitor, "sample_processes", lambda filter_re: monitor.SampleResult(rows, "warn")) + + tui = object.__new__(monitor.TUI) + tui.filter_re = None + tui.sort_key = "cpu" + tui.view = "detail" + tui.detail_app = "python" + tui.tree_mode = True + tui.sel = 10 + + tui.sample() + + assert tui.sample_error == "warn" + assert [group.app for group in tui.groups] == ["python", "bash"] + assert [row.pid for row in tui.detail_list] == [1, 2] + assert tui.detail_tree[0][0].pid == 1 + assert tui.sel == 1 + + +def test_tui_sample_clears_detail_when_in_group_view(monkeypatch): + monkeypatch.setattr(monitor, "sample_processes", lambda filter_re: monitor.SampleResult([])) + tui = object.__new__(monitor.TUI) + tui.filter_re = None + tui.sort_key = "mem" + tui.view = "groups" + tui.detail_app = None + tui.tree_mode = False + tui.sel = 0 + + tui.sample() + + assert tui.detail_list == [] + assert tui.detail_tree == [] + + +def test_tui_status_line_prefers_recent_status(monkeypatch): + tui = object.__new__(monitor.TUI) + tui.status_msg = "ready" + tui.status_time = 10.0 + tui.sample_error = "warn" + monkeypatch.setattr(monitor.time, "time", lambda: 12.0) + assert tui._status_line() == "ready" + monkeypatch.setattr(monitor.time, "time", lambda: 30.0) + assert tui._status_line() == "warn" + + +def test_tui_draw_group_and_detail_views(monkeypatch): + tui, screen = _make_initialized_tui(monkeypatch) + tui.groups = [monitor.GroupRow(app="python", procs=2, rss_mb=10.0, mem_pct=5.0, cpu=8.0, swap_mb=1.0, io_read_mb=2.0, io_write_mb=1.0)] + tui.sel = 0 + tui.scroll = 0 + tui.status_msg = "hello" + tui.status_time = 0.0 + tui.sample_error = "" + tui.alerts_cache = [] + tui.alerts_cache_time = 0.0 + tui.last_proc_rows = [] + monkeypatch.setattr(monitor.time, "time", lambda: 1.0) + monkeypatch.setattr(monitor.os, "getloadavg", lambda: (1.0, 0.5, 0.2)) + monkeypatch.setattr(monitor.os, "cpu_count", lambda: 4) + monkeypatch.setattr(monitor.psutil, "virtual_memory", lambda: SimpleNamespace(used=2 * 1024**3, total=8 * 1024**3, available=4 * 1024**3, free=3 * 1024**3, buffers=1 * 1024**3, cached=1 * 1024**3, percent=30.0)) + monkeypatch.setattr(monitor.psutil, "swap_memory", lambda: SimpleNamespace(used=1 * 1024**3, total=2 * 1024**3, percent=25.0)) + monkeypatch.setattr(tui, "collect_alerts", lambda: ["critical alert"]) + + tui.draw() + + rendered = "\n".join(call[2] for call in screen.calls) + assert "python" in rendered + assert "critical alert" in rendered + + screen.calls.clear() + tui.view = "detail" + tui.detail_app = "python" + tui.tree_mode = False + tui.detail_list = [_make_row(pid=1, app="python", cmdline="python app.py")] + tui.draw() + rendered = "\n".join(call[2] for call in screen.calls) + assert "App: python" in rendered + assert "python app.py" in rendered + + screen.calls.clear() + tui.tree_mode = True + tui.detail_tree = [(_make_row(pid=1, app="python", cmdline="python app.py"), 1, "└── ")] + tui.draw() + rendered = "\n".join(call[2] for call in screen.calls) + assert "TREE" in rendered + + +def test_collect_alerts_paths(monkeypatch): + tui = object.__new__(monitor.TUI) + tui.alerts_cache = [] + tui.alerts_cache_time = 0.0 + tui.last_proc_rows = [ + _make_row(pid=1, app="python", cpu=250.0, mem_pct=20.0, swap_mb=2000.0, io_read_mb=200.0, io_write_mb=0.0, status="zombie", cmdline="python worker"), + ] + tui.groups = [monitor.GroupRow(app="python", procs=2, rss_mb=10.0, mem_pct=5.0, cpu=250.0, swap_mb=2500.0, io_read_mb=0.0, io_write_mb=0.0)] + now = {"value": 100.0} + monkeypatch.setattr(monitor.time, "time", lambda: now["value"]) + monkeypatch.setattr(monitor.time, "strftime", lambda fmt: "12:00:00") + monkeypatch.setattr(monitor.psutil, "virtual_memory", lambda: SimpleNamespace(percent=96.0)) + monkeypatch.setattr(monitor.psutil, "swap_memory", lambda: SimpleNamespace(percent=85.0)) + monkeypatch.setattr(monitor.psutil, "disk_usage", lambda path: SimpleNamespace(percent=96.0)) + + alerts = tui.collect_alerts() + cached = tui.collect_alerts() + + assert alerts == cached + assert any("MEMORY CRITICAL" in alert for alert in alerts) + assert any("Group CPU critical" in alert or "CPU critical" in alert for alert in alerts) + assert any("ZOMBIE PROCESS" in alert for alert in alerts) + + +def test_collect_alerts_error_path(monkeypatch): + tui = object.__new__(monitor.TUI) + tui.alerts_cache = [] + tui.alerts_cache_time = 0.0 + tui.last_proc_rows = [] + tui.groups = [] + monkeypatch.setattr(monitor.time, "time", lambda: 1.0) + monkeypatch.setattr(monitor.time, "strftime", lambda fmt: "12:00:00") + + def raise_vm(): + raise RuntimeError("boom") + + monkeypatch.setattr(monitor.psutil, "virtual_memory", raise_vm) + alerts = tui.collect_alerts() + assert "Alert collection failed" in alerts[0] + + +def test_prompt_filter_paths(monkeypatch): + tui, _screen = _make_initialized_tui(monkeypatch, text=b"python") + monkeypatch.setattr(monitor.curses, "echo", lambda: None) + monkeypatch.setattr(monitor.curses, "noecho", lambda: None) + msgs = [] + tui.status = lambda msg: msgs.append(msg) + tui.prompt_filter() + assert tui.filter_text == "python" + assert msgs[-1] == "Filter: python" + + tui.stdscr.text = b"[" + tui.prompt_filter() + assert tui.filter_text == "invalid" + assert msgs[-1] == "Filter invalid" + + tui.stdscr.text = b"" + tui.prompt_filter() + assert tui.filter_text == "" + assert msgs[-1] == "Filter cleared" + + +def test_tui_run_loop_branches(monkeypatch): + keys = [ + monitor.curses.KEY_DOWN, + monitor.curses.KEY_UP, + monitor.curses.KEY_NPAGE, + monitor.curses.KEY_PPAGE, + monitor.curses.KEY_HOME, + monitor.curses.KEY_END, + 10, + ord("k"), + ord("K"), + ord("t"), + ord("h"), + ord("g"), + ord("s"), + ord("+"), + ord("-"), + ord("r"), + ord("f"), + ord("w"), + ord("d"), + ord("q"), + ] + tui, _screen = _make_initialized_tui(monkeypatch, keys=keys) + calls = [] + tui.sample = lambda: None + tui.draw = lambda: None + tui.length = lambda: 12 if tui.view == "groups" else 4 + + def toggle(): + tui.view = "detail" if tui.view == "groups" else "groups" + if tui.view == "groups": + tui.tree_mode = False + + tui.toggle_view = toggle + tui.prompt_filter = lambda: calls.append("filter") + tui.kill_selected = lambda sig: calls.append(("kill_selected", sig)) + tui.kill_group = lambda sig=monitor.signal.SIGTERM: calls.append(("kill_group", sig)) + tui.run_swap_clean = lambda: calls.append("swap") + tui.run_drop_caches = lambda: calls.append("drop") + + tui.run() + + assert ("kill_selected", monitor.signal.SIGTERM) in calls + assert ("kill_selected", monitor.signal.SIGKILL) in calls + assert ("kill_group", monitor.signal.SIGTERM) in calls + assert "filter" in calls + assert "swap" in calls + assert "drop" in calls + + +def test_tui_run_esc_arrow_translation(monkeypatch): + keys = [27, 91, 65, ord("q")] + tui, _screen = _make_initialized_tui(monkeypatch, keys=keys) + tui.sample = lambda: None + tui.draw = lambda: None + tui.length = lambda: 3 + tui.sel = 1 + + tui.run() + + assert tui.sel == 0 + + +def test_main_check_sudo_exits_zero(monkeypatch, capsys): + monkeypatch.setattr(monitor, "parse_args", lambda: (SimpleNamespace( + once=False, + lite=False, + check_sudo=True, + init_subscripts=False, + sort="mem", + top=15, + refresh=2.0, + ), None)) + fake_module = SimpleNamespace(check_sudo=lambda: {"a": "ok"}) + monkeypatch.setitem(monitor.sys.modules, "ptop3.sudo_config", fake_module) + + with pytest.raises(SystemExit) as exc_info: + monitor.main() + + assert exc_info.value.code == 0 + assert "a: ok" in capsys.readouterr().out + + +def test_main_init_subscripts(monkeypatch): + called = [] + monkeypatch.setattr(monitor, "parse_args", lambda: (SimpleNamespace( + once=False, + lite=False, + check_sudo=False, + init_subscripts=True, + sort="mem", + top=15, + refresh=2.0, + ), None)) + fake_module = SimpleNamespace(init_subscripts=lambda: called.append(True)) + monkeypatch.setitem(monitor.sys.modules, "ptop3.sudo_config", fake_module) + + monitor.main() + + assert called == [True] + + +def test_main_once_mode(monkeypatch): + calls = [] + monkeypatch.setattr(monitor, "parse_args", lambda: (SimpleNamespace( + once=True, + lite=True, + check_sudo=False, + init_subscripts=False, + sort="cpu", + top=5, + refresh=2.0, + ), "FILTER")) + monkeypatch.setattr(monitor, "print_once", lambda fil, sort, top: calls.append((fil, sort, top))) + + monitor.main() + + assert monitor.LITE_MODE is True + assert calls == [("FILTER", "cpu", 5)] + + +def test_main_tui_mode_uses_curses_wrapper(monkeypatch): + wrapped = [] + monkeypatch.setattr(monitor, "parse_args", lambda: (SimpleNamespace( + once=False, + lite=False, + check_sudo=False, + init_subscripts=False, + sort="rss", + top=15, + refresh=3.0, + ), "FILTER")) + monkeypatch.setattr(monitor, "TUI", lambda stdscr, fil, sort, refresh: SimpleNamespace(run=lambda: wrapped.append((stdscr, fil, sort, refresh)))) + monkeypatch.setattr(monitor.curses, "wrapper", lambda fn: fn("STDOUT")) + + monitor.main() + + assert wrapped == [("STDOUT", "FILTER", "rss", 3.0)] + + +def test_main_module_invokes_monitor_main(monkeypatch): + called = [] + monkeypatch.setattr(monitor, "main", lambda: called.append(True)) + monkeypatch.delitem(monitor.sys.modules, "ptop3.__main__", raising=False) + + importlib.import_module("ptop3.__main__") + + assert called == [True] + + +def test_sample_processes_surfaces_unexpected_error(monkeypatch): + class BrokenProc: + info = {"pid": 1, "ppid": 0, "name": "broken"} + + def oneshot(self): + return self + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def cmdline(self): + return ["broken"] + + def memory_info(self): + raise RuntimeError("boom") + + monkeypatch.setattr(monitor.psutil, "virtual_memory", lambda: SimpleNamespace(total=1024 * 1024 * 1024)) + monkeypatch.setattr(monitor.psutil, "process_iter", lambda attrs=None: [BrokenProc()]) + + sample = monitor.sample_processes(None) + + assert sample.rows == [] + assert sample.error.startswith("sampling warning:") diff --git a/tests/test_sudo_config.py b/tests/test_sudo_config.py index e2fd7be..a4bedc2 100644 --- a/tests/test_sudo_config.py +++ b/tests/test_sudo_config.py @@ -1,9 +1,7 @@ """Tests for ptop3.sudo_config.""" -import os -import pytest -from unittest.mock import patch, MagicMock -from ptop3.sudo_config import check_sudo, init_subscripts, _build_sudoers_content +from unittest.mock import MagicMock, patch +from ptop3.sudo_config import _build_sudoers_content, check_sudo, init_subscripts # --------------------------------------------------------------------------- # _build_sudoers_content diff --git a/tests/test_swap_clean.py b/tests/test_swap_clean.py index bec24f0..daf3971 100644 --- a/tests/test_swap_clean.py +++ b/tests/test_swap_clean.py @@ -1,8 +1,9 @@ """Tests for ptop3.scripts.swap_clean.""" +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock, call -from ptop3.scripts.swap_clean import read_meminfo, read_swaps, swap_clean +from ptop3.scripts.swap_clean import read_meminfo, read_swaps, swap_clean # --------------------------------------------------------------------------- # read_meminfo @@ -56,6 +57,16 @@ def test_read_swaps_multiple_entries(tmp_path): assert entries[1]["filename"] == "/swapfile" +def test_read_swaps_skips_short_lines(tmp_path): + p = tmp_path / "swaps" + p.write_text( + "Filename\t\t\t\tType\t\tSize\t\tUsed\t\tPriority\n" + "broken line\n" + ) + + assert read_swaps(str(p)) == [] + + # --------------------------------------------------------------------------- # swap_clean — no swap configured # --------------------------------------------------------------------------- @@ -175,6 +186,18 @@ def test_swap_clean_calls_swapoff_swapon(tmp_meminfo, tmp_swaps): assert ["swapon", "-a"] in called_cmds +def test_swap_clean_returns_error_when_swapoff_fails(tmp_meminfo, tmp_swaps, capsys): + with patch("ptop3.scripts.swap_clean.subprocess.run", return_value=MagicMock(returncode=1, stderr="nope")): + rc = swap_clean( + target="/dev/sda2", + meminfo_path=tmp_meminfo, + swaps_path=tmp_swaps, + ) + + assert rc == 1 + assert "error running swapoff: nope" in capsys.readouterr().err + + # --------------------------------------------------------------------------- # root check in main() # --------------------------------------------------------------------------- @@ -186,3 +209,67 @@ def test_main_root_check_exits_when_not_root(): with pytest.raises(SystemExit) as exc_info: main() assert exc_info.value.code == 1 + + +def test_swap_clean_target_success_dry_run(tmp_meminfo, tmp_swaps, capsys): + rc = swap_clean( + target="/dev/sda2", + dry_run=True, + meminfo_path=tmp_meminfo, + swaps_path=tmp_swaps, + ) + + assert rc == 0 + out = capsys.readouterr().out + assert "[dry-run] swapoff /dev/sda2" in out + assert "[dry-run] swapon /dev/sda2" in out + + +def test_swap_clean_file_by_file_fallback(tmp_path, capsys): + meminfo = tmp_path / "meminfo" + swaps = tmp_path / "swaps" + meminfo.write_text( + "MemAvailable: 1000 kB\n" + "SwapTotal: 2000 kB\n" + "SwapFree: 0 kB\n" + ) + swaps.write_text( + "Filename\t\t\t\tType\t\tSize\t\tUsed\t\tPriority\n" + "/swap-a\t\t\t\tfile\t\t1024\t\t400\t\t-2\n" + "/swap-b\t\t\t\tfile\t\t1024\t\t300\t\t-3\n" + ) + + mem_values = iter( + [ + {"MemAvailable": 1000, "SwapTotal": 2000, "SwapFree": 0}, + {"MemAvailable": 1000}, + {"MemAvailable": 1000}, + ] + ) + + with patch("ptop3.scripts.swap_clean.read_meminfo", side_effect=lambda path: next(mem_values)): + with patch("ptop3.scripts.swap_clean.subprocess.run", return_value=MagicMock(returncode=0)) as mock_run: + rc = swap_clean( + safety_mb=1, + meminfo_path=str(meminfo), + swaps_path=str(swaps), + ) + + assert rc == 0 + assert "file-by-file" in capsys.readouterr().out + called_cmds = [call.args[0] for call in mock_run.call_args_list] + assert ["swapoff", "/swap-b"] in called_cmds + assert ["swapon", "/swap-b"] in called_cmds + + +def test_main_passes_when_root(): + from ptop3.scripts.swap_clean import main + + with patch("os.geteuid", return_value=0): + with patch("sys.argv", ["ptop3-swap-clean", "--dry-run"]): + with patch("ptop3.scripts.swap_clean.swap_clean", return_value=0) as mock_clean: + with pytest.raises(SystemExit) as exc_info: + main() + + assert exc_info.value.code == 0 + mock_clean.assert_called_once() From d7319f2ccffb0514e2d736f336afdd69d275ee50 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 14:08:17 +0100 Subject: [PATCH 02/10] fix: quality gate agents comment-only, add test branch trigger MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove auto-commit instructions from all three agents — contents: read prevents pushing commits on pull_request events - Agents now post PR comments with proposed fixes/snippets instead - Add test branch to workflow trigger (feature PRs target test, not main) Fixes Copilot review comments on PR #5. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/claude-quality-gate.yml | 113 ++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 .github/workflows/claude-quality-gate.yml diff --git a/.github/workflows/claude-quality-gate.yml b/.github/workflows/claude-quality-gate.yml new file mode 100644 index 0000000..bbb3652 --- /dev/null +++ b/.github/workflows/claude-quality-gate.yml @@ -0,0 +1,113 @@ +name: Claude Quality Gate + +on: + pull_request: + branches: [main, test] + types: [opened, synchronize, reopened] + +permissions: + contents: read + pull-requests: write + issues: write + id-token: write + +jobs: + # Agent 1 — verify tests exist for all changed code + test-coverage-agent: + name: Test Coverage Agent + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: anthropics/claude-code-action@v1 + with: + claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + prompt: | + You are a test quality agent for the ptop3 project. + + Review the diff of this PR (compare HEAD to the base branch) and: + 1. List every new or modified function/method in ptop3/ source files. + 2. Check tests/ to confirm each one has a corresponding test. + 3. For any function missing a test, post a PR comment that: + - Identifies the untested function (module + name). + - Provides a ready-to-use test snippet following the project rules: + * Mock /proc/* files with tmp_path fixtures + * Mock curses — never test TUI rendering directly + * Patch os.geteuid for root-required paths + * Tests must pass on Python 3.10–3.13 + 4. If all functions are covered, post a short confirmation comment. + + Do NOT modify files or create commits. All feedback must be via PR comments. + Project test style is in tests/conftest.py and existing test files. + Quality rules are in .github/instructions/code-review.instructions.md. + + # Agent 2 — verify CHANGELOG and docs are updated + docs-changelog-agent: + name: Docs & Changelog Agent + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: anthropics/claude-code-action@v1 + with: + claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + prompt: | + You are a documentation quality agent for the ptop3 project. + + Review the diff of this PR and: + + 1. CHANGELOG check: + - If any user-facing behavior changed (new feature, bug fix, CLI flag, keybinding), + verify CHANGELOG.md has an entry under ## [Unreleased]. + - If missing, post a PR comment that states what is missing and proposes + the exact text to add under the correct section + (Added / Changed / Fixed / Security / Deprecated / Removed). + - Do NOT bump version numbers. + + 2. README check: + - If new keybindings, CLI flags, or entry points were added, verify README.md documents them. + - If missing, post a PR comment explaining what is missing and proposing + the exact wording or patch snippet to add. + + 3. Docstring check: + - For any new public function in ptop3/ that lacks a docstring, post a PR comment + identifying the function (module, name, signature) and proposing a one-line docstring. + + Do NOT modify files or create commits. All feedback must be via PR comments. + If everything is already documented, post a short confirmation comment summarising what you checked. + + # Agent 3 — code quality and security review + code-review-agent: + name: Code Review Agent + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: anthropics/claude-code-action@v1 + with: + claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + prompt: | + You are a code quality and security agent for the ptop3 project. + + Review the diff of this PR against the standards in + .github/instructions/code-review.instructions.md and check for: + + 1. Style violations: bare except, print() in library code, typing.Dict/List/Optional, + shell=True in subprocess, string path concatenation instead of pathlib. + 2. Security issues: unsanitized input to subprocess, hardcoded secrets, + missing root check before writing /proc/sys/vm/drop_caches or calling swapoff/swapon, + missing visudo validation before sudoers writes. + 3. GitHub Actions: missing permissions blocks, missing skip-existing on TestPyPI steps. + 4. Module-specific rules from the instructions file. + + For each issue found, post an inline PR review comment at the exact file+line + with a clear explanation and a concrete suggested fix. + + Do NOT modify files or create commits. All feedback must be via PR comments. + If no issues are found, post a brief approval comment summarising what was checked. From b2114fe1e94edd24c0d3c448f5aa05a6dc5025da Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 14:30:55 +0100 Subject: [PATCH 03/10] fix: address PR review feedback --- CHANGELOG.md | 1 + ptop3/monitor.py | 10 ++++++---- pyproject.toml | 1 + tests/conftest.py | 10 ---------- tests/test_monitor.py | 15 ++++++++++++++- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bfbca1..4a07e3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - TestPyPI publish now triggers on merge to `test` branch (was `main`) - PyPI production publish is now manual (`workflow_dispatch`) instead of automatic on tag - GitHub Release creation on tag push remains automatic +- `ptop3` no longer advertises or supports the non-functional `net` sort mode in the TUI/CLI ### Added - `CLAUDE.md` with project context for Claude Code agents diff --git a/ptop3/monitor.py b/ptop3/monitor.py index 36a16e1..9b25bec 100644 --- a/ptop3/monitor.py +++ b/ptop3/monitor.py @@ -226,7 +226,7 @@ def _sample_process( rss_mb = rss_bytes / (1024 * 1024) cpu = 0.0 if lite and rss_mb < 2.0 else proc.cpu_percent() - swap_mb = _swap_value(proc.pid, rss_mb, lite) + swap_mb = _swap_value(proc.pid, rss_mb, lite, read_swap=self.read_vmswap_mb) io_read_mb, io_write_mb = _io_values(proc, rss_mb, lite) try: @@ -291,11 +291,13 @@ def _matches_filter(filter_search, app: str, name: str, cmdline: str) -> bool: return bool(filter_search(app) or filter_search(name) or (cmdline and filter_search(cmdline))) -def _swap_value(pid: int, rss_mb: float, lite: bool) -> float: +def _swap_value(pid: int, rss_mb: float, lite: bool, read_swap=None) -> float: + if read_swap is None: + read_swap = read_vmswap_mb if not lite and rss_mb > 50: - return read_vmswap_mb(pid) + return read_swap(pid) if lite and rss_mb > 200: - return read_vmswap_mb(pid) + return read_swap(pid) return 0.0 diff --git a/pyproject.toml b/pyproject.toml index 71be716..12ba198 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ include = ["ptop3*"] [tool.pytest.ini_options] testpaths = ["tests"] +pythonpath = ["."] addopts = "-v --tb=short" [tool.coverage.run] diff --git a/tests/conftest.py b/tests/conftest.py index 6555959..6d35e83 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,17 +1,7 @@ """Shared fixtures for ptop3 tests.""" -import sys -from pathlib import Path import pytest -ROOT = Path(__file__).resolve().parents[1] -if str(ROOT) not in sys.path: - sys.path.insert(0, str(ROOT)) - -for module_name in list(sys.modules): - if module_name == "ptop3" or module_name.startswith("ptop3."): - sys.modules.pop(module_name, None) - MEMINFO_TEMPLATE = """\ MemTotal: 32768000 kB MemFree: 4096000 kB diff --git a/tests/test_monitor.py b/tests/test_monitor.py index c239d47..9bdc1aa 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -352,7 +352,7 @@ def test_get_proc_rows_filters_and_collects_metrics(monkeypatch): monkeypatch.setattr(monitor.psutil, "process_iter", lambda attrs=None: procs) monkeypatch.setattr(monitor.psutil, "virtual_memory", lambda: SimpleNamespace(total=1024 * 1024 * 1024)) - monkeypatch.setattr(monitor, "read_vmswap_mb", lambda pid: 4.0 if pid == 10 else 0.0) + monkeypatch.setattr(monitor.ProcessSampler, "read_vmswap_mb", lambda self, pid: 4.0 if pid == 10 else 0.0) rows = monitor.get_proc_rows(filter_re=monitor.re.compile("python")) @@ -849,6 +849,19 @@ def raising_status(): assert row.status == "unknown" +def test_sampler_sample_process_uses_sampler_swap_reader(monkeypatch): + sampler = monitor.ProcessSampler() + proc = _FakeProc(1, 0, "python3", ["python3"], rss_mb=64, cpu=3.0) + + monkeypatch.setattr(monitor, "read_vmswap_mb", lambda pid: 0.0) + monkeypatch.setattr(monitor.ProcessSampler, "read_vmswap_mb", lambda self, pid: 7.0) + + row = sampler._sample_process(proc, 1.0, 1.0, False, None) + + assert row is not None + assert row.swap_mb == 7.0 + + def test_sampler_sample_process_filtered_out_and_memory_denied(monkeypatch): sampler = monitor.ProcessSampler() proc = _FakeProc(1, 0, "bash", ["bash"], rss_mb=4, cpu=1.0) From e625b028cc27cf53fde25bbe5d4aeaf85481d4ff Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 14:47:32 +0100 Subject: [PATCH 04/10] fix: simplify claude code review workflow --- .github/workflows/claude-code-review.yml | 33 ++++++++++++++---------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/.github/workflows/claude-code-review.yml b/.github/workflows/claude-code-review.yml index b5e8cfd..e931048 100644 --- a/.github/workflows/claude-code-review.yml +++ b/.github/workflows/claude-code-review.yml @@ -12,33 +12,40 @@ on: jobs: claude-review: - # Optional: Filter by PR author - # if: | - # github.event.pull_request.user.login == 'external-contributor' || - # github.event.pull_request.user.login == 'new-developer' || - # github.event.pull_request.author_association == 'FIRST_TIME_CONTRIBUTOR' - runs-on: ubuntu-latest permissions: contents: read - pull-requests: read - issues: read + pull-requests: write + issues: write id-token: write steps: - name: Checkout repository uses: actions/checkout@v4 with: - fetch-depth: 1 + fetch-depth: 0 - name: Run Claude Code Review id: claude-review uses: anthropics/claude-code-action@v1 with: claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} - plugin_marketplaces: 'https://github.com/anthropics/claude-code.git' - plugins: 'code-review@claude-code-plugins' - prompt: '/code-review:code-review ${{ github.repository }}/pull/${{ github.event.pull_request.number }}' + prompt: | + You are a code review agent for the ptop3 project. + + Review this pull request against the standards in + .github/instructions/code-review.instructions.md. + + Focus on: + - correctness bugs or behavioral regressions + - security issues and missing guards around privileged actions + - maintainability issues in changed code + - missing or weak test coverage for the changed behavior + + For each issue found, post an inline PR review comment at the exact file and line + with a concise explanation and a concrete suggested fix. + + Do NOT modify files or create commits. All feedback must be via PR comments. + If no issues are found, post a brief approval-style summary of what you checked. # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md # or https://code.claude.com/docs/en/cli-reference for available options - From a991fa982a2280d2c979b556428654767ab0eb19 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 14:54:09 +0100 Subject: [PATCH 05/10] ci: auto-version testpypi publishes --- .github/workflows/publish-testpypi.yml | 50 ++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish-testpypi.yml b/.github/workflows/publish-testpypi.yml index 71569f9..8e172ae 100644 --- a/.github/workflows/publish-testpypi.yml +++ b/.github/workflows/publish-testpypi.yml @@ -17,6 +17,7 @@ jobs: runs-on: ubuntu-latest permissions: contents: read # checkout only + id-token: write # required for Trusted Publishing steps: - uses: actions/checkout@v4 @@ -27,7 +28,53 @@ jobs: python-version: "3.12" - name: Install build tools - run: pip install build + run: pip install build packaging + + - name: Set unique TestPyPI version + env: + RUN_NUMBER: ${{ github.run_number }} + run: | + python - <<'PY' + import os + import re + from pathlib import Path + + from packaging.version import Version + + pyproject = Path("pyproject.toml") + init_py = Path("ptop3/__init__.py") + + pyproject_text = pyproject.read_text() + match = re.search(r'^version = "([^"]+)"', pyproject_text, re.M) + if not match: + raise SystemExit("Could not find project version in pyproject.toml") + + base = Version(match.group(1)) + test_version = f"{base.major}.{base.minor}.{base.micro + 1}.dev{os.environ['RUN_NUMBER']}" + + pyproject.write_text( + re.sub( + r'^version = "[^"]+"', + f'version = "{test_version}"', + pyproject_text, + count=1, + flags=re.M, + ) + ) + + init_text = init_py.read_text() + init_py.write_text( + re.sub( + r'^__version__ = "[^"]+"', + f'__version__ = "{test_version}"', + init_text, + count=1, + flags=re.M, + ) + ) + + print(f"Publishing TestPyPI version: {test_version}") + PY - name: Build package run: python -m build @@ -36,5 +83,4 @@ jobs: uses: pypa/gh-action-pypi-publish@release/v1 with: repository-url: https://test.pypi.org/legacy/ - password: ${{ secrets.TEST_PYPI_API_TOKEN }} skip-existing: true From 2a24eeda245e98645badca826ddbe178a8231b50 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 17:23:15 +0100 Subject: [PATCH 06/10] ci: set testpypi environment --- .github/workflows/publish-testpypi.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/publish-testpypi.yml b/.github/workflows/publish-testpypi.yml index 8e172ae..a78b7f9 100644 --- a/.github/workflows/publish-testpypi.yml +++ b/.github/workflows/publish-testpypi.yml @@ -15,6 +15,7 @@ jobs: publish: needs: [test] runs-on: ubuntu-latest + environment: testpypi permissions: contents: read # checkout only id-token: write # required for Trusted Publishing From db459f41088a4e17f2d193f2e27efa5ba33377e0 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 17:24:59 +0100 Subject: [PATCH 07/10] chore: ignore coverage artifacts --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index d9fff16..1092ae6 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,8 @@ venv/ .mypy_cache/ .ruff_cache/ .pytest_cache/ +.coverage +.coverage.* +htmlcov/ *.egg MANIFEST From 6cdbe07a3a63b27564db82fa8f14c0a1c48794f0 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Mon, 9 Mar 2026 17:29:01 +0100 Subject: [PATCH 08/10] chore: use standard python gitignore --- .gitignore | 53 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 1092ae6..3120dbf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,16 +1,51 @@ __pycache__/ *.py[cod] -*.egg-info/ -dist/ -build/ -.eggs/ +*$py.class + +# Virtual environments .venv/ venv/ -.mypy_cache/ -.ruff_cache/ -.pytest_cache/ +env/ +ENV/ +.python-version + +# Packaging +.Python +build/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +pip-wheel-metadata/ +*.egg +*.egg-info/ +MANIFEST + +# Test and coverage outputs .coverage .coverage.* htmlcov/ -*.egg -MANIFEST +.pytest_cache/ +.hypothesis/ +.tox/ +.nox/ +coverage.xml +*.cover +*.py,cover + +# Type checkers and linters +.mypy_cache/ +.dmypy.json +dmypy.json +.pyre/ +.ruff_cache/ + +# Notebooks and local tooling +.ipynb_checkpoints/ From 31d781db54dd10f2ce3ef08ef337c3e73823c950 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Tue, 10 Mar 2026 05:34:23 +0100 Subject: [PATCH 09/10] test: tighten swap fallback assertion --- tests/test_swap_clean.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_swap_clean.py b/tests/test_swap_clean.py index daf3971..4533558 100644 --- a/tests/test_swap_clean.py +++ b/tests/test_swap_clean.py @@ -256,7 +256,9 @@ def test_swap_clean_file_by_file_fallback(tmp_path, capsys): ) assert rc == 0 - assert "file-by-file" in capsys.readouterr().out + captured = capsys.readouterr() + assert "Not enough RAM to clean all swap at once. Trying file-by-file..." in captured.err + assert "Swap clean completed (file-by-file)." in captured.out called_cmds = [call.args[0] for call in mock_run.call_args_list] assert ["swapoff", "/swap-b"] in called_cmds assert ["swapon", "/swap-b"] in called_cmds From 6a9c73d935828a7df9bf2ff6e7f6642fc06c9330 Mon Sep 17 00:00:00 2001 From: Roman Marek~ Date: Tue, 10 Mar 2026 05:37:35 +0100 Subject: [PATCH 10/10] fix: pass github_token to claude-code-action to resolve permission denials --- .github/workflows/claude-code-review.yml | 1 + .github/workflows/claude-quality-gate.yml | 3 +++ 2 files changed, 4 insertions(+) diff --git a/.github/workflows/claude-code-review.yml b/.github/workflows/claude-code-review.yml index e931048..6177539 100644 --- a/.github/workflows/claude-code-review.yml +++ b/.github/workflows/claude-code-review.yml @@ -30,6 +30,7 @@ jobs: uses: anthropics/claude-code-action@v1 with: claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + github_token: ${{ secrets.GITHUB_TOKEN }} prompt: | You are a code review agent for the ptop3 project. diff --git a/.github/workflows/claude-quality-gate.yml b/.github/workflows/claude-quality-gate.yml index bbb3652..f7c61ca 100644 --- a/.github/workflows/claude-quality-gate.yml +++ b/.github/workflows/claude-quality-gate.yml @@ -24,6 +24,7 @@ jobs: - uses: anthropics/claude-code-action@v1 with: claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + github_token: ${{ secrets.GITHUB_TOKEN }} prompt: | You are a test quality agent for the ptop3 project. @@ -55,6 +56,7 @@ jobs: - uses: anthropics/claude-code-action@v1 with: claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + github_token: ${{ secrets.GITHUB_TOKEN }} prompt: | You are a documentation quality agent for the ptop3 project. @@ -92,6 +94,7 @@ jobs: - uses: anthropics/claude-code-action@v1 with: claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + github_token: ${{ secrets.GITHUB_TOKEN }} prompt: | You are a code quality and security agent for the ptop3 project.