From ed50ff44671eb03cca04f123bd1496ee4850ed94 Mon Sep 17 00:00:00 2001 From: spcpza Date: Sun, 8 Mar 2026 13:55:12 +0700 Subject: [PATCH] Add relay-status CLI subcommand (closes #4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements `lightning-memory relay-status` as requested in issue #4. Changes: - lightning_memory/cli.py (new): relay connectivity prober and sync stats reader. Probes each configured relay concurrently using a limit:0 REQ/EOSE round-trip — lightest possible liveness check that exercises the full WebSocket + NIP-01 handshake. - lightning_memory/server.py: dispatch relay-status subcommand before starting the MCP server, so the existing entry point gains the new command without a second binary. - tests/test_relay_status.py (new): 15 tests covering _probe_relay (reachable, unreachable, NOTICE errors, latency), _sync_stats (populated DB, missing DB, empty DB), _relay_status_async (exit codes, --json output, sync stats in human output), and the server.py dispatch path. Usage: lightning-memory relay-status # human-readable lightning-memory relay-status --json # machine-readable --- lightning_memory/cli.py | 183 +++++++++++++++++++++ lightning_memory/server.py | 28 +++- tests/test_relay_status.py | 329 +++++++++++++++++++++++++++++++++++++ 3 files changed, 538 insertions(+), 2 deletions(-) create mode 100644 lightning_memory/cli.py create mode 100644 tests/test_relay_status.py diff --git a/lightning_memory/cli.py b/lightning_memory/cli.py new file mode 100644 index 0000000..f127c51 --- /dev/null +++ b/lightning_memory/cli.py @@ -0,0 +1,183 @@ +"""CLI commands for Lightning Memory. + +Provides diagnostic and status subcommands accessible via: + + lightning-memory relay-status # check relay connectivity and sync state + lightning-memory relay-status --json # machine-readable output +""" + +from __future__ import annotations + +import asyncio +import json +import sqlite3 +import sys +import time +from pathlib import Path + +from .config import load_config +from .relay import RelayResponse, fetch_events + + +# --------------------------------------------------------------------------- +# Relay connectivity probe +# --------------------------------------------------------------------------- + + +async def _probe_relay(relay_url: str, timeout: float = 6.0) -> tuple[bool, str, float]: + """Connect to a relay and measure round-trip time. + + Sends a REQ with ``limit: 0`` just to get an EOSE back — the lightest + possible liveness check that exercises the full WebSocket handshake and + NIP-01 protocol without fetching any events. + + Returns: + (reachable, message, latency_ms) + """ + t0 = time.monotonic() + result: RelayResponse = await fetch_events( + relay_url, + {"kinds": [30078], "limit": 0}, + timeout=timeout, + ) + latency_ms = (time.monotonic() - t0) * 1000.0 + + if result.success: + return True, "ok", latency_ms + return False, result.message, latency_ms + + +# --------------------------------------------------------------------------- +# Sync log queries +# --------------------------------------------------------------------------- + + +def _db_path() -> Path: + return Path.home() / ".lightning-memory" / "memory.db" + + +def _sync_stats(db_path: Path) -> dict: + """Return push/pull stats from the local sync log.""" + if not db_path.exists(): + return {} + + try: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + + pushed = conn.execute("SELECT COUNT(*) FROM sync_log").fetchone()[0] + last_push_row = conn.execute( + "SELECT MAX(pushed_at) as ts, relay_count FROM sync_log" + ).fetchone() + last_push_ts = last_push_row["ts"] if last_push_row else None + + last_pull_row = conn.execute( + "SELECT value FROM sync_cursor WHERE key = 'last_pull_timestamp'" + ).fetchone() + last_pull_ts = float(last_pull_row["value"]) if last_pull_row else None + + total_memories = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0] + conn.close() + + return { + "total_memories": total_memories, + "pushed_events": pushed, + "last_push_ts": last_push_ts, + "last_pull_ts": last_pull_ts, + } + except sqlite3.OperationalError: + return {} + + +def _fmt_ts(ts: float | None) -> str: + if not ts: + return "never" + import datetime + return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") + + +# --------------------------------------------------------------------------- +# relay-status command +# --------------------------------------------------------------------------- + + +async def _relay_status_async(as_json: bool = False) -> int: + """Check relay connectivity and show sync state. Returns exit code.""" + config = load_config() + db_path = _db_path() + + # Probe all relays concurrently + probe_tasks = [_probe_relay(url) for url in config.relays] + probe_results = await asyncio.gather(*probe_tasks, return_exceptions=True) + + # Sync stats from DB + stats = _sync_stats(db_path) + + if as_json: + output: dict = { + "relays": [], + "sync": stats, + "db_path": str(db_path), + } + all_ok = True + for url, res in zip(config.relays, probe_results): + if isinstance(res, Exception): + output["relays"].append({"url": url, "reachable": False, "message": str(res), "latency_ms": None}) + all_ok = False + else: + reachable, msg, latency = res + output["relays"].append({"url": url, "reachable": reachable, "message": msg, "latency_ms": round(latency)}) + if not reachable: + all_ok = False + print(json.dumps(output, indent=2)) + return 0 if all_ok else 1 + + # --- Human-readable output --- + print(f"Lightning Memory — relay status") + print(f"DB: {db_path}{' (not found)' if not db_path.exists() else ''}") + print() + + all_ok = True + for url, res in zip(config.relays, probe_results): + if isinstance(res, Exception): + print(f" ✗ {url}") + print(f" error: {res}") + all_ok = False + continue + + reachable, msg, latency = res + if reachable: + print(f" ✓ {url} ({latency:.0f}ms)") + else: + # Trim common noise from error messages + short_msg = msg[:80] if len(msg) > 80 else msg + print(f" ✗ {url}") + print(f" {short_msg}") + all_ok = False + + if stats: + print() + print(f" memories in DB : {stats.get('total_memories', 0)}") + print(f" events pushed : {stats.get('pushed_events', 0)}") + print(f" last push : {_fmt_ts(stats.get('last_push_ts'))}") + print(f" last pull : {_fmt_ts(stats.get('last_pull_ts'))}") + else: + print() + print(" (no sync history found — run 'lightning-memory sync' first)") + + print() + if all_ok: + print(f"All {len(config.relays)} relays reachable.") + else: + reachable_count = sum( + 1 for r in probe_results if not isinstance(r, Exception) and r[0] + ) + print(f"{reachable_count}/{len(config.relays)} relays reachable.") + + return 0 if all_ok else 1 + + +def cmd_relay_status(args: list[str]) -> int: + """Entry point for ``lightning-memory relay-status``.""" + as_json = "--json" in args + return asyncio.run(_relay_status_async(as_json=as_json)) diff --git a/lightning_memory/server.py b/lightning_memory/server.py index 3a6f6fb..17ad127 100644 --- a/lightning_memory/server.py +++ b/lightning_memory/server.py @@ -298,8 +298,32 @@ def ln_budget_status() -> dict: } -def main(): - """Run the MCP server.""" +def main() -> None: + """Run the MCP server, or dispatch a CLI subcommand. + + Subcommands: + relay-status [--json] Check relay connectivity and last sync state. + """ + import sys + + args = sys.argv[1:] + + if args and args[0] == "relay-status": + from .cli import cmd_relay_status + + raise SystemExit(cmd_relay_status(args[1:])) + + if args and args[0] in ("-h", "--help"): + print( + "Usage: lightning-memory [subcommand]\n" + "\n" + "Subcommands:\n" + " relay-status [--json] Check relay connectivity and last sync state.\n" + "\n" + "With no subcommand, starts the MCP server." + ) + raise SystemExit(0) + mcp.run() diff --git a/tests/test_relay_status.py b/tests/test_relay_status.py new file mode 100644 index 0000000..962cb54 --- /dev/null +++ b/tests/test_relay_status.py @@ -0,0 +1,329 @@ +"""Tests for the relay-status CLI command (lightning_memory.cli).""" + +from __future__ import annotations + +import asyncio +import json +import sqlite3 +import tempfile +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from lightning_memory.cli import ( + _fmt_ts, + _probe_relay, + _relay_status_async, + _sync_stats, +) +from lightning_memory.relay import RelayResponse + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mock_ws_connect(responses: list[str]): + """Return a mock websockets.connect context manager.""" + ws = AsyncMock() + ws.send = AsyncMock() + recv_iter = iter(responses) + ws.recv = AsyncMock(side_effect=lambda: next(recv_iter)) + cm = AsyncMock() + cm.__aenter__ = AsyncMock(return_value=ws) + cm.__aexit__ = AsyncMock(return_value=False) + return cm + + +def _make_db(path: Path, memories: int = 3, pushed: int = 2) -> None: + """Create a minimal memory.db fixture.""" + conn = sqlite3.connect(str(path)) + conn.execute(""" + CREATE TABLE memories ( + id TEXT PRIMARY KEY, + content TEXT, + type TEXT, + metadata TEXT, + created_at REAL, + nostr_event_id TEXT + ) + """) + conn.execute(""" + CREATE TABLE sync_log ( + memory_id TEXT PRIMARY KEY, + event_id TEXT, + pushed_at REAL, + relay_count INTEGER + ) + """) + conn.execute(""" + CREATE TABLE sync_cursor ( + key TEXT PRIMARY KEY, + value TEXT + ) + """) + now = time.time() + for i in range(memories): + conn.execute( + "INSERT INTO memories VALUES (?,?,?,?,?,?)", + (f"m{i}", f"memory {i}", "general", "{}", now - i * 60, None), + ) + for i in range(pushed): + conn.execute( + "INSERT INTO sync_log VALUES (?,?,?,?)", + (f"m{i}", f"evt{i}", now - i * 10, 2), + ) + conn.execute( + "INSERT INTO sync_cursor VALUES (?,?)", + ("last_pull_timestamp", str(now - 300)), + ) + conn.commit() + conn.close() + + +# --------------------------------------------------------------------------- +# _probe_relay +# --------------------------------------------------------------------------- + + +class TestProbeRelay: + def test_reachable_relay(self): + """A relay that responds with EOSE is reported as reachable.""" + eose = json.dumps(["EOSE", "sub1"]) + mock_connect = _mock_ws_connect([eose]) + + with patch("lightning_memory.relay.websockets") as mock_ws: + mock_ws.connect = MagicMock(return_value=mock_connect) + reachable, msg, latency = asyncio.run( + _probe_relay("wss://relay.example.com") + ) + + assert reachable is True + assert msg == "ok" + assert latency >= 0 + + def test_unreachable_relay(self): + """Connection errors are reported as not reachable.""" + with patch("lightning_memory.relay.websockets") as mock_ws: + mock_ws.connect = MagicMock(side_effect=OSError("connection refused")) + reachable, msg, latency = asyncio.run( + _probe_relay("wss://dead.relay.example.com") + ) + + assert reachable is False + assert "connection refused" in msg + + def test_relay_notice_error(self): + """NOTICE responses that signal errors are reported as failures.""" + notice = json.dumps(["NOTICE", "rate limited"]) + mock_connect = _mock_ws_connect([notice]) + + with patch("lightning_memory.relay.websockets") as mock_ws: + mock_ws.connect = MagicMock(return_value=mock_connect) + reachable, msg, _ = asyncio.run(_probe_relay("wss://rate.limited.relay")) + + assert reachable is False + assert "rate limited" in msg + + def test_latency_measured(self): + """Latency is a non-negative float in milliseconds.""" + eose = json.dumps(["EOSE", "sub1"]) + mock_connect = _mock_ws_connect([eose]) + + with patch("lightning_memory.relay.websockets") as mock_ws: + mock_ws.connect = MagicMock(return_value=mock_connect) + _, _, latency = asyncio.run(_probe_relay("wss://relay.example.com")) + + assert isinstance(latency, float) + assert latency >= 0 + + +# --------------------------------------------------------------------------- +# _sync_stats +# --------------------------------------------------------------------------- + + +class TestSyncStats: + def test_stats_from_populated_db(self, tmp_path): + db = tmp_path / "memory.db" + _make_db(db, memories=5, pushed=3) + stats = _sync_stats(db) + + assert stats["total_memories"] == 5 + assert stats["pushed_events"] == 3 + assert stats["last_push_ts"] is not None + assert stats["last_pull_ts"] is not None + + def test_missing_db_returns_empty(self, tmp_path): + stats = _sync_stats(tmp_path / "nonexistent.db") + assert stats == {} + + def test_empty_db_returns_zeros(self, tmp_path): + db = tmp_path / "memory.db" + _make_db(db, memories=0, pushed=0) + stats = _sync_stats(db) + + assert stats["total_memories"] == 0 + assert stats["pushed_events"] == 0 + + +# --------------------------------------------------------------------------- +# _fmt_ts +# --------------------------------------------------------------------------- + + +class TestFmtTs: + def test_none_returns_never(self): + assert _fmt_ts(None) == "never" + + def test_zero_returns_never(self): + assert _fmt_ts(0) == "never" + + def test_valid_ts_returns_formatted(self): + # 2024-01-15 12:00:00 UTC (platform local time may vary, just check shape) + ts = 1705320000.0 + result = _fmt_ts(ts) + assert len(result) == 19 # "YYYY-MM-DD HH:MM:SS" + assert result[4] == "-" + + +# --------------------------------------------------------------------------- +# _relay_status_async (integration) +# --------------------------------------------------------------------------- + + +class TestRelayStatusAsync: + def _patch_config(self, relay_urls: list[str]): + from lightning_memory.config import Config + + cfg = Config(relays=relay_urls) + return patch("lightning_memory.cli.load_config", return_value=cfg) + + def test_all_relays_reachable_returns_0(self, tmp_path, capsys): + """Exit code 0 when all relays respond.""" + db = tmp_path / "memory.db" + _make_db(db) + eose = json.dumps(["EOSE", "sub1"]) + mock_connect = _mock_ws_connect([eose, eose]) + + with ( + self._patch_config(["wss://r1.test", "wss://r2.test"]), + patch("lightning_memory.cli._db_path", return_value=db), + patch("lightning_memory.relay.websockets") as mock_ws, + ): + mock_ws.connect = MagicMock(return_value=mock_connect) + code = asyncio.run(_relay_status_async()) + + assert code == 0 + out = capsys.readouterr().out + assert "✓" in out + assert "r1.test" in out + + def test_unreachable_relay_returns_1(self, tmp_path, capsys): + """Exit code 1 when at least one relay is unreachable.""" + db = tmp_path / "memory.db" + _make_db(db) + + with ( + self._patch_config(["wss://dead.relay"]), + patch("lightning_memory.cli._db_path", return_value=db), + patch("lightning_memory.relay.websockets") as mock_ws, + ): + mock_ws.connect = MagicMock(side_effect=OSError("refused")) + code = asyncio.run(_relay_status_async()) + + assert code == 1 + out = capsys.readouterr().out + assert "✗" in out + + def test_json_output(self, tmp_path, capsys): + """--json flag produces parseable JSON.""" + db = tmp_path / "memory.db" + _make_db(db, memories=4, pushed=2) + eose = json.dumps(["EOSE", "sub1"]) + mock_connect = _mock_ws_connect([eose]) + + with ( + self._patch_config(["wss://r1.test"]), + patch("lightning_memory.cli._db_path", return_value=db), + patch("lightning_memory.relay.websockets") as mock_ws, + ): + mock_ws.connect = MagicMock(return_value=mock_connect) + asyncio.run(_relay_status_async(as_json=True)) + + out = capsys.readouterr().out + data = json.loads(out) + + assert "relays" in data + assert len(data["relays"]) == 1 + assert data["relays"][0]["reachable"] is True + assert "sync" in data + assert data["sync"]["total_memories"] == 4 + + def test_sync_stats_shown_in_output(self, tmp_path, capsys): + """Sync statistics (memories, pushed, last sync) appear in human output.""" + db = tmp_path / "memory.db" + _make_db(db, memories=7, pushed=5) + eose = json.dumps(["EOSE", "sub1"]) + mock_connect = _mock_ws_connect([eose]) + + with ( + self._patch_config(["wss://r1.test"]), + patch("lightning_memory.cli._db_path", return_value=db), + patch("lightning_memory.relay.websockets") as mock_ws, + ): + mock_ws.connect = MagicMock(return_value=mock_connect) + asyncio.run(_relay_status_async()) + + out = capsys.readouterr().out + assert "7" in out # total_memories + assert "5" in out # pushed_events + + +# --------------------------------------------------------------------------- +# server.py subcommand dispatch +# --------------------------------------------------------------------------- + + +class TestServerDispatch: + def test_relay_status_subcommand_dispatched(self): + """lightning-memory relay-status dispatches to cmd_relay_status.""" + with ( + patch("sys.argv", ["lightning-memory", "relay-status"]), + patch("lightning_memory.cli.cmd_relay_status", return_value=0) as mock_cmd, + ): + from lightning_memory import server + + with pytest.raises(SystemExit) as exc: + server.main() + + mock_cmd.assert_called_once_with([]) + assert exc.value.code == 0 + + def test_relay_status_json_flag_passed_through(self): + """--json flag is forwarded to cmd_relay_status.""" + with ( + patch("sys.argv", ["lightning-memory", "relay-status", "--json"]), + patch("lightning_memory.cli.cmd_relay_status", return_value=0) as mock_cmd, + ): + from lightning_memory import server + + with pytest.raises(SystemExit): + server.main() + + mock_cmd.assert_called_once_with(["--json"]) + + def test_no_args_starts_mcp_server(self): + """With no subcommand, mcp.run() is called normally.""" + with ( + patch("sys.argv", ["lightning-memory"]), + patch("lightning_memory.server.mcp") as mock_mcp, + ): + from lightning_memory import server + + server.main() + + mock_mcp.run.assert_called_once()