diff --git a/README.md b/README.md index 847e350..0f8cb23 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,32 @@ Then using your favorite web browser, go to running locally. If the INDI Web Manager is installed on a remote system, simply replace localhost with the hostname or IP address of the remote system. +# PAA Live Monitor + +The PAA (Polar Alignment Assistant) live monitor is an optional feature that +shows **live** polar alignment error from Ekos/KStars. It displays total, +altitude, and azimuth error in degrees-minutes-seconds (DMS) and arcseconds, +with direction arrows; values update as Ekos writes PAA Refresh lines to its +log file. + +PAA Live Monitor on mobile + +To enable the PAA monitor, start INDI Web Manager with `--with-paa`. You may +optionally pass one or more log directories with `--kstars-logs DIR [DIR ...]`. +If `--kstars-logs` is omitted, the application searches (in order): +`~/.local/share/kstars/logs` (native KStars install) and +`~/.var/app/org.kde.kstars/data/kstars/logs` (Flatpak). Example: +`indi-web --with-paa` or `indi-web --with-paa --kstars-logs /path/to/kstars/logs`. + +**Ekos configuration (required):** In Ekos, enable **Log to file** so that PAA +Refresh lines are written to the KStars log. Run the **Polar Alignment +Assistant** in Ekos so the log contains PAA data. Without Log to file enabled, +the monitor will report that no PAA data was found and prompt you to enable it. + +Open the PAA page in the app at [http://localhost:8624/paa](http://localhost:8624/paa) +(when using the default port). Status is also available via REST at +`GET /api/paa/status` and live updates via WebSocket at `/ws/paa` for integration. + # Auto Start If you selected any profile as **Auto Start** then the INDI server shall be diff --git a/img/paa-monitor-mobile.jpeg b/img/paa-monitor-mobile.jpeg new file mode 100644 index 0000000..097f339 Binary files /dev/null and b/img/paa-monitor-mobile.jpeg differ diff --git a/indiweb/main.py b/indiweb/main.py index 1a6c8c3..2917100 100644 --- a/indiweb/main.py +++ b/indiweb/main.py @@ -4,6 +4,7 @@ import logging import os import socket +from contextlib import asynccontextmanager import uvicorn from fastapi.middleware.cors import CORSMiddleware @@ -15,6 +16,7 @@ from .device import Device from .driver import INDI_DATA_DIR, DriverCollection from .indi_server import INDI_CONFIG_DIR, INDI_FIFO, INDI_PORT, IndiServer +from .paa_monitor import PaaMonitor, _default_kstars_log_dirs, paa_router from .routes import router, start_profile from .state import AppState, IndiWebApp @@ -55,6 +57,10 @@ def _build_parser(): help='HTTP server [standalone|apache] (default: standalone') parser.add_argument('--sudo', '-S', action='store_true', help='Run poweroff/reboot commands with sudo') + parser.add_argument('--kstars-logs', default=None, nargs='+', + help='KStars/Ekos log directory/ies. Default: search native and Flatpak locations') + parser.add_argument('--with-paa', action='store_true', + help='Enable the PAA (Polar Alignment Assistant) live monitor') return parser @@ -73,6 +79,14 @@ def parse_args(argv=None): return args +@asynccontextmanager +async def _lifespan(app: IndiWebApp): + """Application lifespan: clean up PAA monitor on shutdown.""" + yield + if app.state.paa_monitor is not None: + await app.state.paa_monitor.shutdown() + + def create_app(argv=None): """ Create and configure the FastAPI application. @@ -92,8 +106,11 @@ def create_app(argv=None): format='%(asctime)s - %(levelname)s: %(message)s', level=logging_level) else: - logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s', - level=logging_level) + from uvicorn.logging import DefaultFormatter + handler = logging.StreamHandler() + handler.setFormatter(DefaultFormatter("%(levelprefix)s %(message)s")) + logging.root.addHandler(handler) + logging.root.setLevel(logging_level) logging.debug("command line arguments: " + str(vars(args))) collection = DriverCollection(args.xmldir) @@ -104,7 +121,11 @@ def create_app(argv=None): collection.parse_custom_drivers(db.get_custom_drivers()) templates = Jinja2Templates(directory=views_path) - app = IndiWebApp(title="INDI Web Manager", version=__version__) + paa_monitor = None + if getattr(args, 'with_paa', False): + kstars_log_dirs = args.kstars_logs or [str(d) for d in _default_kstars_log_dirs()] + paa_monitor = PaaMonitor(kstars_log_dirs) + app = IndiWebApp(title="INDI Web Manager", version=__version__, lifespan=_lifespan) app.state = AppState( db=db, collection=collection, @@ -115,6 +136,7 @@ def create_app(argv=None): hostname=socket.gethostname(), saved_profile=None, active_profile="", + paa_monitor=paa_monitor, ) app.add_middleware( @@ -128,6 +150,8 @@ def create_app(argv=None): app.mount("/favicon.ico", StaticFiles(directory=views_path), name="favicon.ico") app.include_router(router) + if paa_monitor is not None: + app.include_router(paa_router) return app diff --git a/indiweb/paa_monitor.py b/indiweb/paa_monitor.py new file mode 100644 index 0000000..a24a82e --- /dev/null +++ b/indiweb/paa_monitor.py @@ -0,0 +1,385 @@ +"""Polar Alignment Assistant (PAA) live monitor for Ekos/KStars logs.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import re +import time +from pathlib import Path + +from fastapi import APIRouter, Depends, Request, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse, JSONResponse + +from .routes import get_state, get_state_ws +from .state import AppState + +# KStars Qt log format. Arcseconds: " or \" +# Groups: 1=ts, 2-4=az dms, 5-7=alt dms, 8-10=total dms +PAA_PATTERN = re.compile(r""" + \[ \d{4}-\d{2}-\d{2} T # [YYYY-MM-DDT + (\d{2}:\d{2}:\d{2}\.\d{3}) # (1) HH:MM:SS.mmm timestamp + \s+ [^\]]+ \] # rest of bracket header ] + .* PAA\ Refresh # PAA Refresh marker + .* Corrected\ az: \s* # azimuth label + (-?\d{1,2}) [°] \s* (\d{1,2}) ' \s* (\d{1,2}) # (2-4) az DMS: deg° min' sec + (?:" | \\") # arcsec terminator: " or \" + .* alt: \s* # altitude label + (-?\d{1,2}) [°] \s* (\d{1,2}) ' \s* (\d{1,2}) # (5-7) alt DMS: deg° min' sec + (?:" | \\") # arcsec terminator + .* total: \s* # total label + (\d{1,2}) [°] \s* (\d{1,2}) ' \s* (\d{1,2}) # (8-10) total DMS: deg° min' sec + (?:" | \\") # arcsec terminator +""", re.VERBOSE) + +# Date directory pattern YYYY-MM-DD +DATE_DIR_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$") + +# Re-discovery: when cached file hasn't been modified for this long, check for newer files +REDISCOVER_AFTER_SEC = 60 + +# Age after which PAA data is considered stale (no new updates) +STALE_THRESHOLD_SEC = 30 + +logger = logging.getLogger(__name__) + +# Default KStars log locations (native and Flatpak) +def _default_kstars_log_dirs() -> list[Path]: + """Return default KStars log directories to search (native first, then Flatpak).""" + home = Path.home() + return [ + home / ".local" / "share" / "kstars" / "logs", + home / ".var" / "app" / "org.kde.kstars" / "data" / "kstars" / "logs", + ] + + +def _match_to_dict(match: tuple, file_mtime: float) -> dict: + """Build PAA result dict from regex match groups. Passes DMS as display strings.""" + az_deg_str = match[1] + alt_deg_str = match[4] + az_direction = "left" if az_deg_str.lstrip().startswith("-") else "right" + alt_direction = "down" if alt_deg_str.lstrip().startswith("-") else "up" + # Format DMS display strings: DD° MM' SS" + az = f"{abs(int(az_deg_str)):02d}° {int(match[2]):02d}' {int(match[3]):02d}\"" + alt = f"{abs(int(alt_deg_str)):02d}° {int(match[5]):02d}' {int(match[6]):02d}\"" + total = f"{int(match[7]):02d}° {int(match[8]):02d}' {int(match[9]):02d}\"" + total_arcsec = int(match[7]) * 3600 + int(match[8]) * 60 + int(match[9]) + return { + "timestamp": match[0], + "az": az, + "alt": alt, + "total": total, + "total_arcsec": total_arcsec, + "az_direction": az_direction, + "alt_direction": alt_direction, + "file_mtime": file_mtime, + } + + +class PaaMonitor: + """Log watcher and WebSocket manager for PAA live updates.""" + + def __init__(self, log_base_dirs: str | list[str]) -> None: + """Initialize with one or more log base directories to search. + When a list, searches all and uses the most recently modified log. + Supports native (~/.local/share/kstars/logs) and Flatpak + (~/.var/app/org.kde.kstars/data/kstars/logs) locations. + """ + if isinstance(log_base_dirs, str): + self._log_base_dirs = [Path(log_base_dirs).expanduser()] + else: + self._log_base_dirs = [Path(d).expanduser() for d in log_base_dirs] + self._clients: set[WebSocket] = set() + self._monitor_task: asyncio.Task | None = None + self._cached_log_path: str | None = None + self._last_entry_mtime: float = 0 + self._last_no_match_log: tuple[str, float] = ("", 0) # (path, time) for throttling + self._last_diagnostic: str = "" # User-facing message when discovery/parse fails + self._tail_path: str | None = None # path for which _tail_offset applies + self._tail_offset: int = 0 # byte offset: only read lines after this (set on first connect) + + def _find_latest_log_in_dir(self, base_dir: Path) -> tuple[str | None, str]: + """Find the most recent .txt log in a single base directory. + Returns (path, diagnostic). path is None on failure. + """ + if not base_dir.exists(): + return None, f"Log directory does not exist: {base_dir}" + + date_dirs = [] + for entry in base_dir.iterdir(): + if entry.is_dir() and DATE_DIR_PATTERN.match(entry.name): + date_dirs.append(entry) + date_dirs.sort(key=lambda d: d.name, reverse=True) + + if not date_dirs: + return None, f"No date subdirectories (YYYY-MM-DD) in {base_dir}" + + latest_dir = date_dirs[0] + txt_files = list(latest_dir.glob("*.txt")) + if not txt_files: + return None, f"No .txt files in {latest_dir}" + + latest_file = max(txt_files, key=os.path.getmtime) + return str(latest_file), "" + + def _find_latest_log(self) -> str | None: + """Discover the most recent Ekos log file across all configured base directories. + Searches native and Flatpak locations, picks the newest by mtime. + """ + candidates: list[tuple[str, float]] = [] # (path, mtime) + diagnostics: list[str] = [] + + for base_dir in self._log_base_dirs: + path, diag = self._find_latest_log_in_dir(base_dir) + if path: + try: + mtime = os.path.getmtime(path) + candidates.append((path, mtime)) + except OSError: + pass + elif diag: + diagnostics.append(diag) + + if candidates: + path = max(candidates, key=lambda x: x[1])[0] + self._last_diagnostic = "" + return path + + if len(self._log_base_dirs) == 1: + self._last_diagnostic = diagnostics[0] if diagnostics else f"Log directory does not exist: {self._log_base_dirs[0]}" + else: + checked = ", ".join(str(d) for d in self._log_base_dirs) + self._last_diagnostic = f"No KStars log directory found. Checked: {checked}" + logger.info("PAA monitor: %s", self._last_diagnostic) + return None + + def _should_rediscover(self) -> bool: + """Check if we should re-run log discovery (new session, etc).""" + if self._cached_log_path is None: + return True + if not os.path.exists(self._cached_log_path): + return True + mtime = os.path.getmtime(self._cached_log_path) + age = time.time() - mtime + if age > REDISCOVER_AFTER_SEC: + return True + return False + + def _get_current_log_path(self) -> str | None: + """Get the log file to read, using cache when appropriate.""" + if self._should_rediscover(): + path = self._find_latest_log() + if path: + self._cached_log_path = path + else: + self._cached_log_path = None + self._tail_path = None + return self._cached_log_path + + def _parse_latest(self) -> dict | None: + """Read the discovered log file and extract the latest PAA entry from tail only. + Only considers lines after connection (tail). + """ + path = self._get_current_log_path() + if not path: + return None # _last_diagnostic already set by _find_latest_log + + try: + if self._tail_path is None or self._tail_path != path: + self._tail_path = path + if self._clients: + self._tail_offset = os.path.getsize(path) if os.path.exists(path) else 0 + else: + self._tail_offset = 0 + with open(path, "r", encoding="utf-8", errors="replace") as f: + f.seek(self._tail_offset) + content = f.read() + self._tail_offset = f.tell() # Next poll reads from here + + if not content: + return None # No new data written since last poll -- expected during tail + + matches = PAA_PATTERN.findall(content) + + if not matches: + # Content was written but contained no PAA lines. + # Throttle: log at most once per 30s per path to avoid flooding. + now = time.time() + if self._last_no_match_log[0] != path or (now - self._last_no_match_log[1]) > 30: + self._last_diagnostic = ( + f"No PAA data in {path}. Enable Ekos 'Log to file' and run PAA." + ) + logger.debug( + "PAA monitor: no regex match in %s (pattern: PAA Refresh, Corrected az: DD° MM' SS\", alt:, total:)", + path, + ) + self._last_no_match_log = (path, now) + return None + + self._last_diagnostic = "" + last_match = matches[-1] + file_mtime = os.path.getmtime(path) + logger.debug( + "PAA monitor: parsed ts=%s az=%s alt=%s", + last_match[0], last_match[1], last_match[4], + ) + return _match_to_dict(last_match, file_mtime) + except (OSError, ValueError) as e: + self._last_diagnostic = f"Error reading log: {e}" + logger.warning("PAA monitor: parse error reading %s: %s", path, e) + return None + + async def _broadcast(self, message: dict) -> None: + """Send JSON message to all connected clients, remove dead connections.""" + dead = set() + payload = json.dumps(message) + for ws in list(self._clients): + try: + await ws.send_text(payload) + except Exception: + logger.debug("Dropping dead PAA WebSocket client") + dead.add(ws) + for ws in dead: + self._clients.discard(ws) + + def _entry_payload(self, entry: dict) -> dict: + """Copy entry for client (exclude file_mtime).""" + return {k: v for k, v in entry.items() if k != "file_mtime"} + + async def _monitor_loop(self) -> None: + """Poll logs periodically and broadcast updates to WebSocket clients. + Only broadcasts full update when a new PAA match is found; otherwise sends heartbeat. + Poll interval adapts: 1.5s active, 3s stale, 5s waiting. + """ + last_entry: dict | None = None + while self._clients: + entry = self._parse_latest() + now = time.time() + poll_delay = 1.5 # default: active + + if entry: + age = now - entry["file_mtime"] + last_entry = entry + self._last_entry_mtime = entry["file_mtime"] + msg = self._entry_payload(entry) + + if age > STALE_THRESHOLD_SEC: + msg.update(type="status", state="stale", message=f"No new PAA data for {int(age)}s") + await self._broadcast(msg) + poll_delay = 3.0 + else: + msg.update(type="update", state="active") + await self._broadcast(msg) + else: + if last_entry: + age = now - self._last_entry_mtime + if age > STALE_THRESHOLD_SEC: + msg = self._entry_payload(last_entry) + msg.update(type="status", state="stale", message=f"No new PAA data for {int(age)}s") + await self._broadcast(msg) + poll_delay = 3.0 + else: + await self._broadcast({"type": "heartbeat"}) + else: + message_text = ( + self._last_diagnostic if self._last_diagnostic else "Waiting for PAA data..." + ) + await self._broadcast({ + "type": "status", + "state": "waiting", + "message": message_text, + }) + poll_delay = 5.0 + + await asyncio.sleep(poll_delay) + + def connect(self, ws: WebSocket) -> None: + """Register a WebSocket client and start monitor loop if first client.""" + self._clients.add(ws) + if len(self._clients) == 1 and (self._monitor_task is None or self._monitor_task.done()): + self._tail_path = None # Force fresh tail offset for new connection + self._monitor_task = asyncio.create_task(self._monitor_loop()) + + def disconnect(self, ws: WebSocket) -> None: + """Unregister a WebSocket client.""" + self._clients.discard(ws) + + async def shutdown(self) -> None: + """Cancel the monitor task if running. Call from app shutdown/lifespan.""" + if self._monitor_task and not self._monitor_task.done(): + self._monitor_task.cancel() + try: + await self._monitor_task + except asyncio.CancelledError: + pass + self._clients.clear() + + def get_status(self) -> dict: + """Return current PAA status as a serializable dict (read-only, no tail mutation). + + Unlike ``_parse_latest`` this performs a standalone full-file read so it + can be called safely from the REST endpoint without interfering with the + WebSocket tail state. + """ + path = self._find_latest_log() + if not path: + return { + "state": "waiting", + "message": self._last_diagnostic or "Waiting for PAA data...", + } + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + content = f.read() + matches = PAA_PATTERN.findall(content) + if not matches: + return { + "state": "waiting", + "message": f"No PAA data in {path}. Enable Ekos 'Log to file' and run PAA.", + } + last_match = matches[-1] + file_mtime = os.path.getmtime(path) + entry = _match_to_dict(last_match, file_mtime) + payload = {k: v for k, v in entry.items() if k != "file_mtime"} + return {"state": "active", **payload} + except (OSError, ValueError) as e: + return {"state": "waiting", "message": f"Error reading log: {e}"} + + +paa_router = APIRouter() + + +@paa_router.get("/paa", response_class=HTMLResponse) +async def paa_page(request: Request, state: AppState = Depends(get_state)): + """Render the PAA monitor page.""" + return state.templates.TemplateResponse(request, "paa.tpl", {}) + + +@paa_router.get("/api/paa/status", tags=["PAA"]) +async def paa_status(state: AppState = Depends(get_state)): + """Get current PAA status (REST API).""" + monitor = state.paa_monitor + if monitor is None: + return JSONResponse({"state": "disabled"}) + return JSONResponse(monitor.get_status()) + + +@paa_router.websocket("/ws/paa") +async def paa_websocket(websocket: WebSocket, state: AppState = Depends(get_state_ws)): + """WebSocket endpoint for live PAA updates.""" + monitor = state.paa_monitor + if monitor is None: + await websocket.close() + return + await websocket.accept() + monitor.connect(websocket) + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + pass + except Exception: + logger.debug("PAA WebSocket connection error", exc_info=True) + finally: + monitor.disconnect(websocket) diff --git a/indiweb/routes.py b/indiweb/routes.py index c41aaea..518e64c 100644 --- a/indiweb/routes.py +++ b/indiweb/routes.py @@ -9,7 +9,7 @@ from threading import Timer from typing import cast -from fastapi import APIRouter, Depends, HTTPException, Request, Response +from fastapi import APIRouter, Depends, HTTPException, Request, Response, WebSocket from fastapi.responses import HTMLResponse, JSONResponse from importlib_metadata import version @@ -30,6 +30,11 @@ def get_state(request: Request) -> AppState: return cast(AppState, request.app.state) +def get_state_ws(websocket: WebSocket) -> AppState: + """Extract typed app state from WebSocket.""" + return cast(AppState, websocket.app.state) + + def get_db(request: Request) -> Database: state: AppState = get_state(request) return state.db diff --git a/indiweb/state.py b/indiweb/state.py index 5d4dc5a..aaa2508 100644 --- a/indiweb/state.py +++ b/indiweb/state.py @@ -3,11 +3,14 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Any +from typing import TYPE_CHECKING, Any from fastapi import FastAPI from fastapi.templating import Jinja2Templates +if TYPE_CHECKING: + from .paa_monitor import PaaMonitor + from .database import Database from .device import Device from .driver import DriverCollection @@ -27,6 +30,7 @@ class AppState: hostname: str saved_profile: str | None active_profile: str + paa_monitor: PaaMonitor | None = None class IndiWebApp(FastAPI): diff --git a/indiweb/views/form.tpl b/indiweb/views/form.tpl index 8c32f03..30b006d 100644 --- a/indiweb/views/form.tpl +++ b/indiweb/views/form.tpl @@ -27,6 +27,7 @@

{{hostname}} INDI Web Manager

+ PAA API
diff --git a/indiweb/views/paa.tpl b/indiweb/views/paa.tpl new file mode 100644 index 0000000..7940584 --- /dev/null +++ b/indiweb/views/paa.tpl @@ -0,0 +1,308 @@ + + + + + + PAA Live Monitor + + + +
+
+ + + Connecting... + + + +
+ +
+
+
Total
+
+
+
+
+
Altitude
+
+
+
+
+
Azimuth
+
+
+
+
+ +
+ + +
+ + ← Back to INDI Web Manager + + + + diff --git a/tests/test_paa_monitor.py b/tests/test_paa_monitor.py new file mode 100644 index 0000000..8f1ff83 --- /dev/null +++ b/tests/test_paa_monitor.py @@ -0,0 +1,352 @@ +"""Tests for PAA (Polar Alignment Assistant) monitor.""" + +import os + +import pytest +from fastapi.testclient import TestClient + +from indiweb.paa_monitor import ( + DATE_DIR_PATTERN, + PAA_PATTERN, + PaaMonitor, +) + +# --- Regex tests --- + + +def test_paa_pattern_matches_real_ekos_format(): + """PAA regex matches real Ekos Qt log format (DMS values).""" + line = ( + '[2026-02-14T16:31:36.864 EST INFO ][ org.kde.kstars.ekos.align] - ' + '"PAA Refresh(1): Corrected az: 01° 06\' 18" alt: 00° 47\' 11" total: 01° 21\' 23""' + ) + m = PAA_PATTERN.search(line) + assert m is not None + assert m.group(1) == "16:31:36.864" + assert m.group(2) == "01" + assert m.group(3) == "06" + assert m.group(4) == "18" + assert m.group(5) == "00" + assert m.group(6) == "47" + assert m.group(7) == "11" + assert m.group(8) == "01" + assert m.group(9) == "21" + assert m.group(10) == "23" + + +def test_paa_pattern_matches_negative_azimuth(): + """PAA regex captures negative DMS (southern hemisphere).""" + line = ( + '[2026-02-14T12:00:00.000 EST INFO ][ org.kde.kstars.ekos.align] - ' + '"PAA Refresh(1): Corrected az: -01° 30\' 00" alt: 00° 47\' 11" total: 01° 21\' 23""' + ) + m = PAA_PATTERN.search(line) + assert m is not None + assert m.group(2) == "-01" + assert m.group(5) == "00" + + +def test_paa_parses_negative_zero_dms(tmp_path): + """PaaMonitor preserves sign when parsing -00deg 30' 10" (int('-00') loses sign).""" + logs_dir = tmp_path / "kstars_logs" + date_dir = logs_dir / "2026-02-14" + date_dir.mkdir(parents=True) + log_file = date_dir / "ekos.txt" + # -00° 30' 10" (az), 00° 47' 11" for alt + log_file.write_text( + '[2026-02-14T22:15:30.123 EST INFO ][ org.kde.kstars.ekos.align] - ' + '"PAA Refresh(1): Corrected az: -00° 30\' 10" alt: 00° 47\' 11" total: 00° 57\' 45""\n' + ) + monitor = PaaMonitor(str(logs_dir)) + entry = monitor._parse_latest() + assert entry is not None + assert entry["az_direction"] == "left" + assert "00° 30' 10\"" in entry["az"] + assert entry["alt"] == "00° 47' 11\"" + + +def test_paa_pattern_rejects_non_paa_line(): + """PAA regex does not match lines without PAA Refresh.""" + line = '[2026-02-14T22:15:30.123 EST INFO ] - "Some other log message"' + assert PAA_PATTERN.search(line) is None + + +def test_date_dir_pattern(): + """Date directory pattern matches YYYY-MM-DD.""" + assert DATE_DIR_PATTERN.match("2026-02-14") + assert DATE_DIR_PATTERN.match("2024-01-01") + assert not DATE_DIR_PATTERN.match("2026-2-14") + assert not DATE_DIR_PATTERN.match("logs") + + +# --- PaaMonitor tests --- + + +@pytest.fixture +def tmp_kstars_logs(tmp_path): + """Create temp KStars log directory with sample PAA content (real Ekos format).""" + logs_dir = tmp_path / "kstars_logs" + logs_dir.mkdir() + date_dir = logs_dir / "2026-02-14" + date_dir.mkdir() + log_file = date_dir / "ekos_session.txt" + log_file.write_text( + '[2026-02-14T22:15:30.123 EST INFO ][ org.kde.kstars.ekos.align] - ' + '"PAA Refresh(1): Corrected az: 01° 06\' 18" alt: 00° 47\' 11" total: 01° 21\' 23""\n' + '[2026-02-14T22:15:32.456 EST INFO ][ org.kde.kstars.ekos.align] - ' + '"PAA Refresh(2): Corrected az: 01° 06\' 24" alt: 00° 47\' 10" total: 01° 21\' 27""\n' + ) + return str(logs_dir) + + +def test_monitor_find_latest_log(tmp_kstars_logs): + """PaaMonitor discovers latest log file in date directory.""" + monitor = PaaMonitor(tmp_kstars_logs) + path = monitor._find_latest_log() + assert path is not None + assert "2026-02-14" in path + assert path.endswith(".txt") + + +def test_monitor_parse_latest(tmp_kstars_logs): + """PaaMonitor parses latest PAA entry from log file (real DMS format).""" + monitor = PaaMonitor(tmp_kstars_logs) + entry = monitor._parse_latest() + assert entry is not None + assert entry["timestamp"] == "22:15:32.456" + assert entry["az"] == "01° 06' 24\"" + assert entry["alt"] == "00° 47' 10\"" + assert entry["total"] == "01° 21' 27\"" + + +def test_monitor_searches_multiple_locations(tmp_path): + """PaaMonitor searches all configured dirs and picks the newest log.""" + native_logs = tmp_path / "native" / "kstars" / "logs" / "2026-02-14" + flatpak_logs = tmp_path / "flatpak" / "kstars" / "logs" / "2026-02-14" + native_logs.mkdir(parents=True) + flatpak_logs.mkdir(parents=True) + native_log = native_logs / "old.txt" + flatpak_log = flatpak_logs / "new.txt" + paa_line = '[2026-02-14T22:15:30.123 EST INFO ] - "PAA Refresh(1): Corrected az: 01° 06\' 18" alt: 00° 47\' 11" total: 01° 21\' 23"\n' + native_log.write_text(paa_line) + flatpak_log.write_text(paa_line) + # Make flatpak log newer + import time + time.sleep(0.01) + flatpak_log.touch() + # Base dirs are the "logs" dirs that contain YYYY-MM-DD subdirs + base_dirs = [str(native_logs.parent), str(flatpak_logs.parent)] + monitor = PaaMonitor(base_dirs) + path = monitor._find_latest_log() + assert path is not None + assert "flatpak" in path + entry = monitor._parse_latest() + assert entry is not None + + +def test_monitor_no_logs_dir(): + """PaaMonitor returns None when log directory does not exist.""" + monitor = PaaMonitor("/nonexistent/path/to/logs") + assert monitor._find_latest_log() is None + assert monitor._parse_latest() is None + + +def test_monitor_empty_logs_dir(tmp_path): + """PaaMonitor returns None when directory has no date subdirs.""" + empty = tmp_path / "empty_logs" + empty.mkdir() + monitor = PaaMonitor(str(empty)) + assert monitor._find_latest_log() is None + assert monitor._parse_latest() is None + + +def test_monitor_rejects_format_without_degree_symbols(tmp_path): + """Monitor returns None when log format lacks required degree/arcmin symbols.""" + logs_dir = tmp_path / "kstars_logs" + date_dir = logs_dir / "2026-02-14" + date_dir.mkdir(parents=True) + log_file = date_dir / "odd_format.txt" + log_file.write_text( + '[2026-02-14T22:15:30.123 EST INFO ] - "PAA Refresh(1): Corrected az: 01 06 18 alt: 00 47 11 total: 01 21 23"\n' + ) + monitor = PaaMonitor(str(logs_dir)) + entry = monitor._parse_latest() + assert entry is None + + +def test_monitor_tail_ignores_existing_content_when_client_connected(tmp_path): + """With a client connected, only content appended after connect is parsed (tail).""" + logs_dir = tmp_path / "kstars_logs" + date_dir = logs_dir / "2026-02-14" + date_dir.mkdir(parents=True) + log_file = date_dir / "ekos.txt" + paa_line = ( + '[2026-02-14T22:15:30.123 EST INFO ] - ' + '"PAA Refresh(1): Corrected az: 01° 06\' 18" alt: 00° 47\' 11" total: 01° 21\' 23"\n' + ) + log_file.write_text(paa_line) + monitor = PaaMonitor(str(logs_dir)) + monitor._clients.add(object()) # Simulate connected client + entry = monitor._parse_latest() + assert entry is None # Tail is empty (content existed before "connect") + monitor._clients.clear() + + +# --- API and WebSocket tests --- + + +@pytest.fixture +def paa_client(tmp_conf, xmldir, tmp_path): + """TestClient with PAA enabled and temp kstars logs (real Ekos format).""" + from indiweb.main import create_app + + logs_dir = tmp_path / "kstars_logs" + logs_dir.mkdir() + date_dir = logs_dir / "2026-02-14" + date_dir.mkdir() + log_file = date_dir / "ekos.txt" + log_file.write_text( + '[2026-02-14T22:15:30.123 EST INFO ][ org.kde.kstars.ekos.align] - ' + '"PAA Refresh(1): Corrected az: 00° 06\' 00" alt: 00° 12\' 00" total: 00° 13\' 25""\n' + ) + + fifo_path = os.path.join(tmp_conf, "indi_fifo") + argv = [ + "--conf", tmp_conf, + "--fifo", fifo_path, + "--xmldir", xmldir, + "--indi-port", "17624", + "--with-paa", + "--kstars-logs", str(logs_dir), + ] + app = create_app(argv) + return TestClient(app) + + +def test_paa_status_api_returns_active_when_data(paa_client): + """GET /api/paa/status returns active state with PAA data when log has content.""" + r = paa_client.get("/api/paa/status") + assert r.status_code == 200 + data = r.json() + assert data["state"] == "active" + assert "az" in data + assert "alt" in data + assert "total" in data + assert "total_arcsec" in data + assert "az_direction" in data + assert "alt_direction" in data + + +def test_paa_status_api_returns_waiting_when_no_logs(tmp_conf, xmldir, tmp_path): + """GET /api/paa/status returns waiting state when log directory does not exist.""" + from indiweb.main import create_app + + nonexistent = tmp_path / "nonexistent_logs" + assert not nonexistent.exists() + fifo_path = os.path.join(tmp_conf, "indi_fifo") + argv = [ + "--conf", tmp_conf, "--fifo", fifo_path, "--xmldir", xmldir, + "--indi-port", "17624", "--with-paa", "--kstars-logs", str(nonexistent), + ] + app = create_app(argv) + client = TestClient(app) + r = client.get("/api/paa/status") + assert r.status_code == 200 + data = r.json() + assert data["state"] == "waiting" + assert "does not exist" in data["message"] + + +def test_paa_page_renders(paa_client): + """GET /paa returns HTML page.""" + r = paa_client.get("/paa") + assert r.status_code == 200 + assert b"PAA" in r.content + assert b"altitude" in r.content.lower() + assert b"azimuth" in r.content.lower() + + +def test_paa_websocket_shows_diagnostic_when_log_dir_missing(tmp_conf, xmldir, tmp_path): + """WebSocket sends diagnostic message when log directory does not exist.""" + from indiweb.main import create_app + + nonexistent_logs = tmp_path / "nonexistent_kstars_logs" + assert not nonexistent_logs.exists() + fifo_path = os.path.join(tmp_conf, "indi_fifo") + argv = [ + "--conf", tmp_conf, + "--fifo", fifo_path, + "--xmldir", xmldir, + "--indi-port", "17624", + "--with-paa", + "--kstars-logs", str(nonexistent_logs), + ] + app = create_app(argv) + client = TestClient(app) + with client.websocket_connect("/ws/paa") as ws: + msg = ws.receive_json() + assert msg["type"] == "status" + assert msg["state"] == "waiting" + assert "does not exist" in msg["message"] + assert str(nonexistent_logs) in msg["message"] + + +def test_paa_websocket_connects(paa_client): + """WebSocket /ws/paa accepts connection and sends messages.""" + with paa_client.websocket_connect("/ws/paa") as ws: + # Should receive at least one message (update, status, or heartbeat) + msg = ws.receive_json() + assert "type" in msg + assert msg["type"] in ("update", "status", "heartbeat") + if msg["type"] == "update": + assert "az" in msg + assert "alt" in msg + assert "total" in msg + assert "total_arcsec" in msg + assert "az_direction" in msg + assert "alt_direction" in msg + elif msg["type"] == "status": + assert "state" in msg + assert "message" in msg + + +def test_paa_websocket_receives_heartbeat(tmp_conf, xmldir, tmp_path): + """WebSocket can receive heartbeat (sent when no new match, has last_entry).""" + from indiweb.main import create_app + + logs_dir = tmp_path / "kstars_logs" + date_dir = logs_dir / "2026-02-14" + date_dir.mkdir(parents=True) + log_file = date_dir / "ekos.txt" + paa_line = ( + '[2026-02-14T22:15:30.123 EST INFO ] - ' + '"PAA Refresh(1): Corrected az: 01° 06\' 18" alt: 00° 47\' 11" total: 01° 21\' 23"\n' + ) + log_file.write_text(paa_line) + fifo_path = os.path.join(tmp_conf, "indi_fifo") + argv = [ + "--conf", tmp_conf, + "--fifo", fifo_path, + "--xmldir", xmldir, + "--indi-port", "17624", + "--with-paa", + "--kstars-logs", str(logs_dir), + ] + app = create_app(argv) + client = TestClient(app) + with client.websocket_connect("/ws/paa") as ws: + msg1 = ws.receive_json() + assert msg1["type"] in ("status", "update", "heartbeat") + if msg1["type"] == "update": + pass # Got update from tail (content was written before connect - no, we have clients so tail) + msgs = [msg1] + for _ in range(5): + try: + m = ws.receive_json() + msgs.append(m) + if m["type"] == "heartbeat": + break + except Exception: + break + types = [m["type"] for m in msgs] + assert any(t in ("heartbeat", "status") for t in types), f"Expected heartbeat or status, got: {types}"