diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..941e223 --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +# MailPlus live adapter credentials (never commit real values) +# Copy to .env and fill in before running the live adapter. + +MAILPLUS_HOST=imap.example.com +MAILPLUS_USER=your-address@example.com +MAILPLUS_TOKEN=your-oauth2-bearer-token-or-app-password + +# Optional overrides +MAILPLUS_MAILBOX=INBOX +MAILPLUS_PAGE_SIZE=50 diff --git a/docs/live-adapter.md b/docs/live-adapter.md new file mode 100644 index 0000000..3a92c46 --- /dev/null +++ b/docs/live-adapter.md @@ -0,0 +1,47 @@ +# Live Adapter + +`live_adapter.py` bridges the fixture-backed sync pipeline to a real MailPlus +account. It produces the same `SyncBatch` type used by `run_sync_batch()`, so +no other code changes when switching from fixtures to live. + +## Configuration + +Copy `.env.example` to `.env` (never commit `.env`) and fill in: + +| Variable | Required | Description | +|---|---|---| +| `MAILPLUS_HOST` | yes | IMAP or MailPlus API hostname | +| `MAILPLUS_USER` | yes | Mailbox address | +| `MAILPLUS_TOKEN` | yes | OAuth2 bearer token or app password | +| `MAILPLUS_MAILBOX` | no | Folder to sync (default `INBOX`) | +| `MAILPLUS_PAGE_SIZE` | no | Messages per batch (default `50`) | + +If any required variable is absent, `load_live_config()` raises +`LiveAdapterNotConfigured`. CI omits these variables deliberately, so the live +path is never exercised in automated tests. + +## Gate Pattern + +```python +from mailplus_intelligence.live_adapter import LiveAdapterNotConfigured, fetch_batch, load_live_config + +try: + config = load_live_config() +except LiveAdapterNotConfigured as exc: + print(f"Live adapter not available: {exc}") + raise SystemExit(1) + +batch = fetch_batch(config, cursor="") +``` + +## Current Status + +`_fetch_messages()` is a stub that returns an empty list. Replace it with the +MailPlus API client call once the client library is available. The public +interface (`fetch_batch`, `load_live_config`) is stable and will not change. + +## Security + +- Store credentials in `.env` or a secrets manager — never hard-code them. +- Rotate `MAILPLUS_TOKEN` on any suspected exposure. +- The adapter is metadata-only; it must never fetch raw message bodies. diff --git a/docs/phase2-planning.md b/docs/phase2-planning.md new file mode 100644 index 0000000..5e09603 --- /dev/null +++ b/docs/phase2-planning.md @@ -0,0 +1,65 @@ +# Phase 2 Planning + +## Scope + +Phase 2 extends the metadata intelligence pipeline from the offline/fixture +baseline (Phase 1) to a production-capable system. The three pillars are: + +1. **LLM-backed extraction** — richer semantic artifacts from classified threads +2. **Live account sync** — incremental, checkpointed sync from real MailPlus accounts +3. **Scheduler + job locking** — reliable recurring sync without overlapping runs + +--- + +## Completed in Phase 2 (this branch) + +| Module | Issue | Status | +|---|---|---| +| `sync.py` | #3 | done | +| `extractor.py` | #6 | done | +| `llm_extractor.py` | #70 | done | +| `scheduler.py` | #74 | done | +| `live_adapter.py` | #71 | done | +| `cli.py` (search, queue, export, doctor) | #2, #4, #5, #7 | done | +| `index_writer.py` + search | #39 | done | + +--- + +## Architectural Decisions + +### Metadata-only invariant +Raw message bodies are never fetched, stored, or transmitted. All extraction +(deterministic and LLM) operates on subject, sender, date, folder, and +attachment metadata only. This is enforced at the mapper layer. + +### Deterministic-first, LLM-second +`extractor.py` runs first and produces candidates with `provenance="deterministic"`. +`llm_extractor.py` runs on the same threads and produces candidates with +`provenance="llm"`. Downstream consumers can filter by provenance. + +### Prompt caching +LLM extraction uses `cache_control: {"type": "ephemeral"}` on the shared +system prompt and per-thread context blocks. This reduces token costs +significantly when processing multiple threads in one session. + +### Offline CI gate +`llm_extractor.py` accepts a `cassette` dict mapping thread IDs to recorded +response strings. CI passes a cassette so no Anthropic API calls are made. +Live calls happen only in environments where `ANTHROPIC_API_KEY` is set and +no cassette is provided. + +### Job locking +`scheduler.py` uses a `scheduler_locks` SQLite table to prevent overlapping +runs. Locks older than `LOCK_STALE_SECONDS` (300 s) are considered stale and +cleared automatically. + +--- + +## Phase 3 Candidates + +- **MailPlus API client**: replace `live_adapter._fetch_messages()` stub +- **Streaming LLM extraction**: use `client.messages.stream()` for large threads +- **Promotion workflow UI**: web interface for the queue review flow +- **Multi-account support**: per-account checkpoints and lane configuration +- **Relationship graph**: entity_update artifacts feeding a contact knowledge graph +- **Evaluation regressions**: nightly evaluation run against a golden fixture set diff --git a/src/mailplus_intelligence/cli.py b/src/mailplus_intelligence/cli.py index e684c7e..e625827 100644 --- a/src/mailplus_intelligence/cli.py +++ b/src/mailplus_intelligence/cli.py @@ -156,6 +156,56 @@ def cmd_export(args: argparse.Namespace) -> int: return 0 +# ── sync ───────────────────────────────────────────────────────────────────── + + +def cmd_sync(args: argparse.Namespace) -> int: + from .scheduler import get_job_status, list_jobs + from .sync import get_checkpoint + + conn = _setup_db(args.db) + try: + if args.sync_action == "status": + if args.job: + status = get_job_status(conn, args.job) + if status is None: + print(f"No job registered: {args.job}") + return 1 + jobs = [status] + else: + jobs = list_jobs(conn) + if args.json: + print(json.dumps([j.__dict__ for j in jobs], indent=2)) + else: + if not jobs: + print("No sync jobs registered.") + for j in jobs: + lock_str = f"LOCKED by {j.lock_holder}" if j.locked else "unlocked" + print(f"{j.job_name} [{lock_str}]") + print(f" last run: {j.last_run_at or 'never'}") + print(f" last success: {j.last_success_at or 'never'}") + + elif args.sync_action == "checkpoint": + source = args.source or "fixture-corpus" + cp = get_checkpoint(conn, source) + if cp is None: + print(f"No checkpoint for source: {source}") + return 1 + if args.json: + print(json.dumps(cp, indent=2)) + else: + print(f"source: {cp.get('source_name')}") + print(f"cursor: {cp.get('cursor') or '(none)'}") + print(f"last attempt: {cp.get('last_attempt_at') or 'never'}") + print(f"last success: {cp.get('last_success_at') or 'never'}") + else: + print("Usage: mpi sync {status|checkpoint}") + return 1 + finally: + conn.close() + return 0 + + # ── doctor ──────────────────────────────────────────────────────────────────── def cmd_doctor(args: argparse.Namespace) -> int: @@ -223,6 +273,14 @@ def build_parser() -> argparse.ArgumentParser: ep = sub.add_parser("export", help="Dry-run export of approved candidates") ep.add_argument("--output", default="./export-artifacts", help="Output directory") + # sync + syp = sub.add_parser("sync", help="Sync job status and checkpoint inspection") + sya = syp.add_subparsers(dest="sync_action") + ss = sya.add_parser("status", help="List scheduler job statuses") + ss.add_argument("--job", help="Filter by job name") + sc = sya.add_parser("checkpoint", help="Show sync checkpoint for a source") + sc.add_argument("--source", help="Source name (default: fixture-corpus)") + # doctor dp = sub.add_parser("doctor", help="Run fixture-mode preflight checks") dp.add_argument("--project-root", dest="project_root", default=".") @@ -244,6 +302,8 @@ def main(argv: list[str] | None = None) -> int: return cmd_export(args) elif args.command == "doctor": return cmd_doctor(args) + elif args.command == "sync": + return cmd_sync(args) else: parser.print_help() return 1 diff --git a/src/mailplus_intelligence/extractor.py b/src/mailplus_intelligence/extractor.py new file mode 100644 index 0000000..c8f2aed --- /dev/null +++ b/src/mailplus_intelligence/extractor.py @@ -0,0 +1,211 @@ +"""Deterministic extraction pipeline for selected high-value mail (#6). + +Generates thread summaries, obligation candidates, and entity update candidates +from classified metadata only — no raw body required. +Outputs conform to the semantic artifact contract (semantic_contract.py). +""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from typing import Any + +from .classifier import ClassificationResult, classify_metadata +from .threading import ReconstructedThread + + +EXTRACTION_LANES = frozenset({ + "vip", "project", "admin", "financial", "travel", "legal", +}) + +OBLIGATION_SUBJECT_TOKENS = ( + "follow-up", + "action required", + "please confirm", + "please review", + "deadline", + "due by", + "reminder", + "need your", + "sign", + "approve", + "nda", + "contract", + "invoice", +) + +EVENT_SUBJECT_TOKENS = ( + "flight", + "hotel", + "rental car", + "itinerary", + "booking", + "reservation", + "confirmation", + "invoice", + "payment", + "statement", + "billing", + "filing", + "settlement", +) + + +@dataclass(frozen=True) +class ExtractionCandidate: + """A single deterministic extraction candidate conforming to the semantic contract.""" + + artifact_id: str + artifact_type: str + source_thread_key: str + source_message_ids: tuple[str, ...] + source_locators: tuple[str, ...] + evidence_refs: tuple[str, ...] + summary: str + confidence: str + review_status: str + provenance: str + + +def extract_from_thread( + thread: ReconstructedThread, + messages: list[dict[str, Any]], +) -> list[ExtractionCandidate]: + """Run deterministic extraction for one reconstructed thread. + + Returns a list of candidates (may be empty if the thread is suppressed). + """ + thread_messages = [ + m for m in messages if m["fixture_id"] in thread.message_fixture_ids + ] + if not thread_messages: + return [] + + representative = thread_messages[0] + classification = classify_metadata( + str(representative.get("subject", "")), + str(representative.get("from", "")), + ) + + if not classification.extraction_allowed or classification.lane not in EXTRACTION_LANES: + return [] + + candidates: list[ExtractionCandidate] = [] + + summary_candidate = _build_thread_summary(thread, thread_messages, classification) + candidates.append(summary_candidate) + + obligation = _try_obligation(thread, thread_messages, classification) + if obligation: + candidates.append(obligation) + + event = _try_event(thread, thread_messages, classification) + if event: + candidates.append(event) + + return candidates + + +def extract_from_corpus( + threads: tuple[ReconstructedThread, ...], + messages: list[dict[str, Any]] | tuple[dict[str, Any], ...], +) -> list[ExtractionCandidate]: + """Extract candidates from all threads in a corpus.""" + all_candidates: list[ExtractionCandidate] = [] + msg_list = list(messages) + for thread in threads: + all_candidates.extend(extract_from_thread(thread, msg_list)) + return all_candidates + + +def _locators(thread_messages: list[dict[str, Any]]) -> tuple[str, ...]: + return tuple( + str(m["locator"].get("export_id", "")) + for m in thread_messages + if m.get("locator", {}).get("export_id") + ) + + +def _message_ids(thread_messages: list[dict[str, Any]]) -> tuple[str, ...]: + return tuple(str(m["message_id"]) for m in thread_messages if m.get("message_id")) + + +def _build_thread_summary( + thread: ReconstructedThread, + thread_messages: list[dict[str, Any]], + classification: ClassificationResult, +) -> ExtractionCandidate: + subjects = list({str(m.get("subject", "")) for m in thread_messages}) + senders = list({str(m.get("from", "")) for m in thread_messages}) + count = len(thread_messages) + lane = classification.lane + subject_str = subjects[0] if subjects else "(no subject)" + sender_str = ", ".join(senders[:3]) + summary = ( + f"{lane.upper()} thread '{subject_str}' — {count} message(s) between {sender_str}. " + f"Thread confidence: {thread.confidence}." + ) + locs = _locators(thread_messages) + return ExtractionCandidate( + artifact_id=str(uuid.uuid5(uuid.NAMESPACE_DNS, f"summary:{thread.thread_id}")), + artifact_type="thread_summary", + source_thread_key=thread.thread_id, + source_message_ids=_message_ids(thread_messages), + source_locators=locs, + evidence_refs=locs, + summary=summary, + confidence="high" if thread.confidence == "high" else "medium", + review_status="candidate", + provenance="deterministic", + ) + + +def _try_obligation( + thread: ReconstructedThread, + thread_messages: list[dict[str, Any]], + classification: ClassificationResult, +) -> ExtractionCandidate | None: + for msg in thread_messages: + subject = str(msg.get("subject", "")).lower() + if any(token in subject for token in OBLIGATION_SUBJECT_TOKENS): + locs = _locators([msg]) + return ExtractionCandidate( + artifact_id=str(uuid.uuid5(uuid.NAMESPACE_DNS, f"obligation:{msg['message_id']}")), + artifact_type="obligation", + source_thread_key=thread.thread_id, + source_message_ids=(str(msg["message_id"]),), + source_locators=locs, + evidence_refs=locs, + summary=f"Possible commitment or action item: '{msg.get('subject', '')}' from {msg.get('from', '?')}.", + confidence="low", + review_status="review_needed", + provenance="deterministic", + ) + return None + + +def _try_event( + thread: ReconstructedThread, + thread_messages: list[dict[str, Any]], + classification: ClassificationResult, +) -> ExtractionCandidate | None: + if classification.lane not in {"travel", "financial", "legal"}: + return None + for msg in thread_messages: + subject = str(msg.get("subject", "")).lower() + if any(token in subject for token in EVENT_SUBJECT_TOKENS): + locs = _locators([msg]) + return ExtractionCandidate( + artifact_id=str(uuid.uuid5(uuid.NAMESPACE_DNS, f"event:{msg['message_id']}")), + artifact_type="event", + source_thread_key=thread.thread_id, + source_message_ids=(str(msg["message_id"]),), + source_locators=locs, + evidence_refs=locs, + summary=f"{classification.lane.capitalize()} event: '{msg.get('subject', '')}' from {msg.get('from', '?')}.", + confidence="medium", + review_status="candidate", + provenance="deterministic", + ) + return None diff --git a/src/mailplus_intelligence/live_adapter.py b/src/mailplus_intelligence/live_adapter.py new file mode 100644 index 0000000..6f90d46 --- /dev/null +++ b/src/mailplus_intelligence/live_adapter.py @@ -0,0 +1,85 @@ +"""Live MailPlus adapter stub (#71). + +Provides the same SyncBatch interface as the fixture path but reads from a +real MailPlus account via environment-configured credentials. + +GATE: This module requires MAILPLUS_HOST, MAILPLUS_USER, and MAILPLUS_TOKEN to +be set in the environment. It raises ``LiveAdapterNotConfigured`` if any are +missing, so CI (which omits them) will never attempt a live connection. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Any + +from .sync import SyncBatch + + +class LiveAdapterNotConfigured(RuntimeError): + """Raised when required environment variables are absent.""" + + +@dataclass(frozen=True) +class LiveAdapterConfig: + host: str + user: str + token: str + mailbox: str = "INBOX" + page_size: int = 50 + + +def load_live_config() -> LiveAdapterConfig: + """Load adapter config from environment variables. + + Required variables: + MAILPLUS_HOST — e.g. imap.example.com + MAILPLUS_USER — mailbox address + MAILPLUS_TOKEN — OAuth2 bearer token or app password + Optional: + MAILPLUS_MAILBOX (default INBOX) + MAILPLUS_PAGE_SIZE (default 50) + """ + missing = [v for v in ("MAILPLUS_HOST", "MAILPLUS_USER", "MAILPLUS_TOKEN") if not os.getenv(v)] + if missing: + raise LiveAdapterNotConfigured( + f"Live adapter requires environment variables: {', '.join(missing)}" + ) + return LiveAdapterConfig( + host=os.environ["MAILPLUS_HOST"], + user=os.environ["MAILPLUS_USER"], + token=os.environ["MAILPLUS_TOKEN"], + mailbox=os.getenv("MAILPLUS_MAILBOX", "INBOX"), + page_size=int(os.getenv("MAILPLUS_PAGE_SIZE", "50")), + ) + + +def fetch_batch( + config: LiveAdapterConfig, + cursor: str = "", +) -> SyncBatch: + """Fetch one page of metadata-only messages from the live account. + + This is a stub — the actual IMAP/MailPlus API integration is out of scope + for the current phase. Returns an empty batch so callers can be wired up + without a live server. + + Replace the body of this function (and the _fetch_messages helper below) + once the MailPlus API client library is available. + """ + messages = _fetch_messages(config, cursor) + new_cursor = messages[-1].get("message_id", cursor) if messages else cursor + return SyncBatch( + source_name=f"live:{config.user}", + cursor=new_cursor, + messages=tuple(messages), + ) + + +def _fetch_messages( + config: LiveAdapterConfig, + cursor: str, +) -> list[dict[str, Any]]: + # Stub: replace with live API call. + return [] diff --git a/src/mailplus_intelligence/llm_extractor.py b/src/mailplus_intelligence/llm_extractor.py new file mode 100644 index 0000000..7fecba7 --- /dev/null +++ b/src/mailplus_intelligence/llm_extractor.py @@ -0,0 +1,224 @@ +"""LLM-backed semantic extraction using the Anthropic SDK (#70). + +Routes classified high-value threads through Claude for richer artifact +extraction. Designed for offline CI via a fixture-cassette playback dict. +Caches the shared system prompt and thread context using prompt caching. +""" + +from __future__ import annotations + +import json +import uuid +from dataclasses import dataclass, field +from typing import Any + +from .classifier import ClassificationResult +from .extractor import EXTRACTION_LANES, ExtractionCandidate, _locators, _message_ids +from .threading import ReconstructedThread + + +_SYSTEM_PROMPT = ( + "You are MailPlus Intelligence, a semantic extraction engine for email metadata. " + "You receive classified thread metadata (no raw body) and produce structured " + "semantic artifacts conforming to the extraction contract. " + "Respond with a single JSON array of artifact objects. " + "Each artifact must have: artifact_type (thread_summary|obligation|event|decision|entity_update), " + "summary (plain-English string), confidence (high|medium|low), " + "review_status (candidate|review_needed). " + "Be concise and grounded only in the metadata provided. " + "If nothing actionable is present, return a single thread_summary artifact." +) + +_EXTRACTION_LANES_LLM = EXTRACTION_LANES + + +@dataclass +class LLMUsageStats: + input_tokens: int = 0 + output_tokens: int = 0 + cache_read_tokens: int = 0 + cache_write_tokens: int = 0 + calls: int = 0 + + +@dataclass +class LLMExtractionResult: + candidates: list[ExtractionCandidate] + usage: LLMUsageStats + cassette_hit: bool = False + + +def _build_thread_context( + thread: ReconstructedThread, + thread_messages: list[dict[str, Any]], + classification: ClassificationResult, +) -> str: + lines = [ + f"Thread ID: {thread.thread_id}", + f"Lane: {classification.lane}", + f"Thread confidence: {thread.confidence}", + f"Message count: {len(thread_messages)}", + ] + for msg in thread_messages[:10]: + lines.append( + f" - [{msg.get('date', '?')}] From: {msg.get('from', '?')} | " + f"Subject: {msg.get('subject', '(no subject)')} | " + f"Folder: {msg.get('folder', '?')}" + ) + return "\n".join(lines) + + +def _parse_llm_response( + raw: str, + thread: ReconstructedThread, + thread_messages: list[dict[str, Any]], + classification: ClassificationResult, +) -> list[ExtractionCandidate]: + locs = _locators(thread_messages) + msg_ids = _message_ids(thread_messages) + + try: + items = json.loads(raw) + if not isinstance(items, list): + items = [items] + except (json.JSONDecodeError, ValueError): + items = [{"artifact_type": "thread_summary", "summary": raw.strip(), + "confidence": "low", "review_status": "review_needed"}] + + candidates: list[ExtractionCandidate] = [] + for item in items: + artifact_type = item.get("artifact_type", "thread_summary") + summary = item.get("summary", "") + confidence = item.get("confidence", "medium") + review_status = item.get("review_status", "candidate") + candidates.append( + ExtractionCandidate( + artifact_id=str(uuid.uuid5( + uuid.NAMESPACE_DNS, + f"llm:{thread.thread_id}:{artifact_type}:{summary[:40]}", + )), + artifact_type=artifact_type, + source_thread_key=thread.thread_id, + source_message_ids=msg_ids, + source_locators=locs, + evidence_refs=locs, + summary=summary, + confidence=confidence, + review_status=review_status, + provenance="llm", + ) + ) + return candidates + + +def extract_with_llm( + thread: ReconstructedThread, + messages: list[dict[str, Any]], + *, + client: Any = None, + model: str = "claude-opus-4-7", + cassette: dict[str, str] | None = None, + usage_stats: LLMUsageStats | None = None, +) -> LLMExtractionResult: + """Run LLM-backed extraction for one thread. + + Pass a ``cassette`` dict mapping thread_id → raw JSON string to play back + recorded responses in offline / CI environments without hitting the API. + """ + thread_messages = [m for m in messages if m["fixture_id"] in thread.message_fixture_ids] + if not thread_messages: + return LLMExtractionResult(candidates=[], usage=LLMUsageStats()) + + from .classifier import classify_metadata + + representative = thread_messages[0] + classification = classify_metadata( + str(representative.get("subject", "")), + str(representative.get("from", "")), + ) + + if not classification.extraction_allowed or classification.lane not in _EXTRACTION_LANES_LLM: + return LLMExtractionResult(candidates=[], usage=LLMUsageStats()) + + stats = usage_stats or LLMUsageStats() + thread_context = _build_thread_context(thread, thread_messages, classification) + + # Cassette playback for offline CI. + if cassette is not None and thread.thread_id in cassette: + raw = cassette[thread.thread_id] + candidates = _parse_llm_response(raw, thread, thread_messages, classification) + return LLMExtractionResult(candidates=candidates, usage=stats, cassette_hit=True) + + if client is None: + import anthropic + client = anthropic.Anthropic() + + response = client.messages.create( + model=model, + max_tokens=1024, + thinking={"type": "adaptive"}, + system=[ + { + "type": "text", + "text": _SYSTEM_PROMPT, + "cache_control": {"type": "ephemeral"}, + } + ], + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": thread_context, + "cache_control": {"type": "ephemeral"}, + } + ], + } + ], + ) + + stats.calls += 1 + usage = response.usage + stats.input_tokens += getattr(usage, "input_tokens", 0) + stats.output_tokens += getattr(usage, "output_tokens", 0) + stats.cache_read_tokens += getattr(usage, "cache_read_input_tokens", 0) + stats.cache_write_tokens += getattr(usage, "cache_creation_input_tokens", 0) + + raw = "" + for block in response.content: + if getattr(block, "type", None) == "text": + raw = block.text + break + + candidates = _parse_llm_response(raw, thread, thread_messages, classification) + return LLMExtractionResult(candidates=candidates, usage=stats) + + +def extract_corpus_with_llm( + threads: tuple[ReconstructedThread, ...], + messages: list[dict[str, Any]], + *, + client: Any = None, + model: str = "claude-opus-4-7", + cassette: dict[str, str] | None = None, +) -> LLMExtractionResult: + """Run LLM extraction over all threads, sharing usage stats.""" + stats = LLMUsageStats() + all_candidates: list[ExtractionCandidate] = [] + any_cassette_hit = False + + for thread in threads: + result = extract_with_llm( + thread, list(messages), + client=client, model=model, cassette=cassette, usage_stats=stats, + ) + all_candidates.extend(result.candidates) + if result.cassette_hit: + any_cassette_hit = True + + return LLMExtractionResult( + candidates=all_candidates, + usage=stats, + cassette_hit=any_cassette_hit, + ) diff --git a/src/mailplus_intelligence/scheduler.py b/src/mailplus_intelligence/scheduler.py new file mode 100644 index 0000000..c1b2bf2 --- /dev/null +++ b/src/mailplus_intelligence/scheduler.py @@ -0,0 +1,202 @@ +"""Recurring sync scheduler with job locking and stale detection (#74). + +Provides a lightweight in-process scheduler backed by the SQLite index. +Prevents overlapping runs, detects stale locks, and emits lifecycle events. +""" + +from __future__ import annotations + +import sqlite3 +import threading +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Callable + + +# Job lock TTL: a lock older than this is considered stale. +LOCK_STALE_SECONDS = 300 + + +@dataclass(frozen=True) +class JobEvent: + job_name: str + event: str # acquired | released | skipped | stale_cleared | error + detail: str = "" + timestamp: str = "" + + def __post_init__(self) -> None: + if not self.timestamp: + object.__setattr__( + self, "timestamp", datetime.now(timezone.utc).isoformat() + ) + + +@dataclass +class JobStatus: + job_name: str + locked: bool + locked_at: str | None + last_run_at: str | None + last_success_at: str | None + lock_holder: str | None + + +def _ensure_scheduler_table(connection: sqlite3.Connection) -> None: + connection.executescript( + """ + CREATE TABLE IF NOT EXISTS scheduler_locks ( + job_name TEXT PRIMARY KEY, + locked INTEGER NOT NULL DEFAULT 0, + locked_at TEXT, + lock_holder TEXT, + last_run_at TEXT, + last_success_at TEXT + ); + """ + ) + connection.commit() + + +def acquire_lock( + connection: sqlite3.Connection, + job_name: str, + holder: str = "local", +) -> tuple[bool, list[JobEvent]]: + """Try to acquire a named job lock. + + Clears stale locks automatically. Returns (acquired, events). + """ + _ensure_scheduler_table(connection) + events: list[JobEvent] = [] + now = datetime.now(timezone.utc).isoformat() + + row = connection.execute( + "SELECT locked, locked_at FROM scheduler_locks WHERE job_name = ?", + (job_name,), + ).fetchone() + + if row and row["locked"]: + locked_at_str = row["locked_at"] + if locked_at_str: + try: + locked_at = datetime.fromisoformat(locked_at_str) + age = (datetime.now(timezone.utc) - locked_at).total_seconds() + if age > LOCK_STALE_SECONDS: + connection.execute( + "UPDATE scheduler_locks SET locked = 0, locked_at = NULL, lock_holder = NULL " + "WHERE job_name = ?", + (job_name,), + ) + connection.commit() + events.append(JobEvent(job_name, "stale_cleared", f"lock age {age:.0f}s")) + else: + events.append(JobEvent(job_name, "skipped", "already locked")) + return False, events + except ValueError: + pass + + connection.execute( + """ + INSERT INTO scheduler_locks (job_name, locked, locked_at, lock_holder, last_run_at) + VALUES (?, 1, ?, ?, ?) + ON CONFLICT(job_name) DO UPDATE SET + locked = 1, locked_at = excluded.locked_at, + lock_holder = excluded.lock_holder, + last_run_at = excluded.last_run_at + """, + (job_name, now, holder, now), + ) + connection.commit() + events.append(JobEvent(job_name, "acquired")) + return True, events + + +def release_lock( + connection: sqlite3.Connection, + job_name: str, + *, + success: bool = True, +) -> list[JobEvent]: + """Release a previously acquired job lock.""" + _ensure_scheduler_table(connection) + now = datetime.now(timezone.utc).isoformat() + update = ( + "UPDATE scheduler_locks SET locked = 0, locked_at = NULL, lock_holder = NULL, " + "last_success_at = ? WHERE job_name = ?" + if success + else "UPDATE scheduler_locks SET locked = 0, locked_at = NULL, lock_holder = NULL " + "WHERE job_name = ?" + ) + params = (now, job_name) if success else (job_name,) + connection.execute(update, params) + connection.commit() + return [JobEvent(job_name, "released", "success" if success else "failure")] + + +def get_job_status(connection: sqlite3.Connection, job_name: str) -> JobStatus | None: + """Return current status of a named job, or None if never registered.""" + _ensure_scheduler_table(connection) + row = connection.execute( + "SELECT * FROM scheduler_locks WHERE job_name = ?", (job_name,) + ).fetchone() + if not row: + return None + return JobStatus( + job_name=row["job_name"], + locked=bool(row["locked"]), + locked_at=row["locked_at"], + last_run_at=row["last_run_at"], + last_success_at=row["last_success_at"], + lock_holder=row["lock_holder"], + ) + + +def list_jobs(connection: sqlite3.Connection) -> list[JobStatus]: + """Return status for all known jobs.""" + _ensure_scheduler_table(connection) + rows = connection.execute("SELECT * FROM scheduler_locks ORDER BY job_name").fetchall() + return [ + JobStatus( + job_name=row["job_name"], + locked=bool(row["locked"]), + locked_at=row["locked_at"], + last_run_at=row["last_run_at"], + last_success_at=row["last_success_at"], + lock_holder=row["lock_holder"], + ) + for row in rows + ] + + +def run_job( + connection: sqlite3.Connection, + job_name: str, + fn: Callable[[], Any], + *, + holder: str = "local", + event_sink: list[JobEvent] | None = None, +) -> tuple[bool, Any]: + """Acquire lock, run fn(), release lock. Returns (ran, result). + + Skips execution if lock cannot be acquired. + """ + acquired, events = acquire_lock(connection, job_name, holder=holder) + if event_sink is not None: + event_sink.extend(events) + if not acquired: + return False, None + + try: + result = fn() + release_events = release_lock(connection, job_name, success=True) + except Exception as exc: + release_events = release_lock(connection, job_name, success=False) + if event_sink is not None: + event_sink.extend(release_events) + event_sink.append(JobEvent(job_name, "error", str(exc))) + raise + + if event_sink is not None: + event_sink.extend(release_events) + return True, result diff --git a/src/mailplus_intelligence/sync.py b/src/mailplus_intelligence/sync.py new file mode 100644 index 0000000..e5077c8 --- /dev/null +++ b/src/mailplus_intelligence/sync.py @@ -0,0 +1,139 @@ +"""Incremental sync job with checkpoint management. + +Reads fixture batches (or live adapter batches via the same interface), +writes to the SQLite index idempotently, and updates sync_checkpoints. +""" + +from __future__ import annotations + +import sqlite3 +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any + +from .index_writer import write_index_records +from .mapper import map_fixture_messages + + +@dataclass(frozen=True) +class SyncBatch: + """A batch of raw message dicts ready for sync processing.""" + + source_name: str + cursor: str + messages: tuple[dict[str, Any], ...] + + +@dataclass(frozen=True) +class SyncResult: + """Summary of one sync batch run.""" + + source_name: str + cursor: str + inserted: int + skipped: int + mapper_issues: int + write_errors: tuple[str, ...] + success: bool + completed_at: str + + +def run_sync_batch( + connection: sqlite3.Connection, + batch: SyncBatch, + *, + dry_run: bool = False, +) -> SyncResult: + """Process one sync batch against the index. + + Idempotent: messages whose locator_export_id is already indexed are skipped. + Updates sync_checkpoints on success. + """ + _record_attempt(connection, batch.source_name) + + map_result = map_fixture_messages(list(batch.messages)) + + if dry_run: + return SyncResult( + source_name=batch.source_name, + cursor=batch.cursor, + inserted=0, + skipped=len(map_result.records), + mapper_issues=len(map_result.issues), + write_errors=(), + success=True, + completed_at=datetime.now(timezone.utc).isoformat(), + ) + + write_result = write_index_records(connection, map_result.records) + + if not write_result.errors: + _update_checkpoint(connection, batch.source_name, batch.cursor) + + completed_at = datetime.now(timezone.utc).isoformat() + return SyncResult( + source_name=batch.source_name, + cursor=batch.cursor, + inserted=write_result.inserted, + skipped=write_result.skipped, + mapper_issues=len(map_result.issues), + write_errors=write_result.errors, + success=len(write_result.errors) == 0, + completed_at=completed_at, + ) + + +def get_checkpoint(connection: sqlite3.Connection, source_name: str) -> dict | None: + """Return the current checkpoint dict for a source, or None if not started.""" + row = connection.execute( + "SELECT * FROM sync_checkpoints WHERE source_name = ?", (source_name,) + ).fetchone() + return dict(row) if row else None + + +def _record_attempt(connection: sqlite3.Connection, source_name: str) -> None: + now = datetime.now(timezone.utc).isoformat() + connection.execute( + """ + INSERT INTO sync_checkpoints (source_name, cursor, last_attempt_at) + VALUES (?, '', ?) + ON CONFLICT(source_name) DO UPDATE SET last_attempt_at = excluded.last_attempt_at + """, + (source_name, now), + ) + connection.commit() + + +def _update_checkpoint( + connection: sqlite3.Connection, source_name: str, cursor: str +) -> None: + now = datetime.now(timezone.utc).isoformat() + connection.execute( + """ + INSERT INTO sync_checkpoints (source_name, cursor, last_success_at, last_attempt_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(source_name) DO UPDATE SET + cursor = excluded.cursor, + last_success_at = excluded.last_success_at, + last_attempt_at = excluded.last_attempt_at + """, + (source_name, cursor, now, now), + ) + connection.commit() + + +def sync_from_fixture_corpus( + connection: sqlite3.Connection, + corpus_dir: str, + source_name: str = "fixture-corpus", +) -> SyncResult: + """Convenience: sync all messages from a fixture corpus directory.""" + from .fixtures import load_metadata_fixture_corpus + + corpus = load_metadata_fixture_corpus(corpus_dir) + batch = SyncBatch( + source_name=source_name, + cursor=f"fixture-v{corpus.version}", + messages=corpus.messages, + ) + return run_sync_batch(connection, batch) diff --git a/tests/test_extractor.py b/tests/test_extractor.py new file mode 100644 index 0000000..b665d18 --- /dev/null +++ b/tests/test_extractor.py @@ -0,0 +1,60 @@ +"""Tests for the deterministic extractor pipeline.""" + +from __future__ import annotations + +import unittest + +from mailplus_intelligence.extractor import extract_from_corpus, extract_from_thread +from mailplus_intelligence.fixtures import load_metadata_fixture_corpus +from mailplus_intelligence.threading import reconstruct_fixture_threads + + +FIXTURE_DIR = "fixtures/mailplus_metadata" + + +class ExtractorTests(unittest.TestCase): + def setUp(self) -> None: + corpus = load_metadata_fixture_corpus(FIXTURE_DIR) + self.threads = reconstruct_fixture_threads(corpus.messages) + self.messages = list(corpus.messages) + + def test_extract_from_corpus_returns_candidates(self) -> None: + candidates = extract_from_corpus(self.threads, self.messages) + self.assertGreater(len(candidates), 0) + + def test_all_candidates_have_required_fields(self) -> None: + candidates = extract_from_corpus(self.threads, self.messages) + for c in candidates: + self.assertIsNotNone(c.artifact_id) + self.assertIn(c.artifact_type, {"thread_summary", "obligation", "event"}) + self.assertIn(c.review_status, {"candidate", "review_needed"}) + self.assertEqual(c.provenance, "deterministic") + + def test_thread_summary_produced_for_extraction_lane(self) -> None: + candidates = extract_from_corpus(self.threads, self.messages) + types = {c.artifact_type for c in candidates} + self.assertIn("thread_summary", types) + + def test_noise_thread_suppressed(self) -> None: + messages = [ + { + "fixture_id": "msg-noise-x", + "message_id": "", + "subject": "Daily digest newsletter", + "from": "no-reply@newsletter.example.com", + "to": ["operator@example.test"], + "date": "2026-01-01T00:00:00Z", + "mailbox": "operator@example.test", + "folder": "Inbox", + "locator": {"uid": "77", "account": "fixture-account"}, + "attachments": [], + } + ] + threads = reconstruct_fixture_threads(messages) + for thread in threads: + candidates = extract_from_thread(thread, messages) + self.assertEqual(candidates, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_live_adapter.py b/tests/test_live_adapter.py new file mode 100644 index 0000000..ede6c8f --- /dev/null +++ b/tests/test_live_adapter.py @@ -0,0 +1,71 @@ +"""Tests for live_adapter.py gate and stub behaviour.""" + +from __future__ import annotations + +import os +import unittest + +from mailplus_intelligence.live_adapter import ( + LiveAdapterNotConfigured, + fetch_batch, + load_live_config, +) + + +class LiveAdapterConfigTests(unittest.TestCase): + def test_raises_when_env_absent(self) -> None: + for var in ("MAILPLUS_HOST", "MAILPLUS_USER", "MAILPLUS_TOKEN"): + os.environ.pop(var, None) + with self.assertRaises(LiveAdapterNotConfigured): + load_live_config() + + def test_loads_config_from_env(self) -> None: + os.environ["MAILPLUS_HOST"] = "imap.example.com" + os.environ["MAILPLUS_USER"] = "user@example.com" + os.environ["MAILPLUS_TOKEN"] = "secret" + try: + config = load_live_config() + self.assertEqual(config.host, "imap.example.com") + self.assertEqual(config.user, "user@example.com") + self.assertEqual(config.token, "secret") + self.assertEqual(config.mailbox, "INBOX") + self.assertEqual(config.page_size, 50) + finally: + for var in ("MAILPLUS_HOST", "MAILPLUS_USER", "MAILPLUS_TOKEN"): + os.environ.pop(var, None) + + def test_optional_env_overrides(self) -> None: + os.environ["MAILPLUS_HOST"] = "imap.example.com" + os.environ["MAILPLUS_USER"] = "user@example.com" + os.environ["MAILPLUS_TOKEN"] = "secret" + os.environ["MAILPLUS_MAILBOX"] = "Sent" + os.environ["MAILPLUS_PAGE_SIZE"] = "25" + try: + config = load_live_config() + self.assertEqual(config.mailbox, "Sent") + self.assertEqual(config.page_size, 25) + finally: + for var in ("MAILPLUS_HOST", "MAILPLUS_USER", "MAILPLUS_TOKEN", + "MAILPLUS_MAILBOX", "MAILPLUS_PAGE_SIZE"): + os.environ.pop(var, None) + + +class LiveAdapterStubTests(unittest.TestCase): + def _make_config(self): + from mailplus_intelligence.live_adapter import LiveAdapterConfig + return LiveAdapterConfig(host="imap.example.com", user="u@example.com", token="t") + + def test_fetch_batch_returns_empty_stub(self) -> None: + config = self._make_config() + batch = fetch_batch(config) + self.assertEqual(batch.messages, ()) + self.assertEqual(batch.source_name, "live:u@example.com") + + def test_fetch_batch_source_name_includes_user(self) -> None: + config = self._make_config() + batch = fetch_batch(config, cursor="abc") + self.assertIn("u@example.com", batch.source_name) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_llm_extractor.py b/tests/test_llm_extractor.py new file mode 100644 index 0000000..47edc74 --- /dev/null +++ b/tests/test_llm_extractor.py @@ -0,0 +1,105 @@ +"""Tests for llm_extractor.py using cassette playback (no live API calls).""" + +from __future__ import annotations + +import json +import unittest + +from mailplus_intelligence.extractor import ExtractionCandidate +from mailplus_intelligence.fixtures import load_metadata_fixture_corpus +from mailplus_intelligence.llm_extractor import ( + LLMUsageStats, + extract_corpus_with_llm, + extract_with_llm, +) +from mailplus_intelligence.threading import reconstruct_fixture_threads + + +FIXTURE_DIR = "fixtures/mailplus_metadata" + + +def _build_cassette(thread_id: str, artifacts: list[dict]) -> dict[str, str]: + return {thread_id: json.dumps(artifacts)} + + +class LLMExtractorCassetteTests(unittest.TestCase): + def setUp(self) -> None: + corpus = load_metadata_fixture_corpus(FIXTURE_DIR) + self.threads = reconstruct_fixture_threads(corpus.messages) + self.messages = list(corpus.messages) + + def test_cassette_hit_returns_candidates(self) -> None: + thread = next(t for t in self.threads if t.thread_id) + cassette = _build_cassette( + thread.thread_id, + [{"artifact_type": "thread_summary", "summary": "Test summary.", + "confidence": "high", "review_status": "candidate"}], + ) + result = extract_with_llm(thread, self.messages, cassette=cassette) + self.assertTrue(result.cassette_hit) + self.assertGreaterEqual(len(result.candidates), 1) + self.assertEqual(result.candidates[0].provenance, "llm") + self.assertEqual(result.candidates[0].summary, "Test summary.") + + def test_cassette_miss_raises_without_credentials(self) -> None: + thread = next(t for t in self.threads if t.thread_id) + # A non-matching cassette key forces a live API call, which fails without + # credentials. Verify the right error is raised rather than a silent pass. + with self.assertRaises((TypeError, Exception)): + extract_with_llm(thread, self.messages, cassette={"other-thread": "[]"}) + + def test_corpus_cassette_aggregates_across_threads(self) -> None: + cassette: dict[str, str] = {} + for t in self.threads: + cassette[t.thread_id] = json.dumps( + [{"artifact_type": "thread_summary", "summary": f"Summary for {t.thread_id}.", + "confidence": "medium", "review_status": "candidate"}] + ) + result = extract_corpus_with_llm(self.threads, self.messages, cassette=cassette) + self.assertGreaterEqual(len(result.candidates), 1) + for c in result.candidates: + self.assertEqual(c.provenance, "llm") + + def test_invalid_json_response_falls_back_gracefully(self) -> None: + thread = next(t for t in self.threads if t.thread_id) + cassette = {thread.thread_id: "not valid json {{ }}"} + result = extract_with_llm(thread, self.messages, cassette=cassette) + self.assertGreaterEqual(len(result.candidates), 1) + self.assertEqual(result.candidates[0].artifact_type, "thread_summary") + + def test_usage_stats_accumulate_across_corpus(self) -> None: + cassette: dict[str, str] = {} + for t in self.threads: + cassette[t.thread_id] = json.dumps( + [{"artifact_type": "thread_summary", "summary": "S", + "confidence": "low", "review_status": "candidate"}] + ) + result = extract_corpus_with_llm(self.threads, self.messages, cassette=cassette) + # Cassette calls don't increment API token stats, but calls counter stays 0. + self.assertEqual(result.usage.calls, 0) + + def test_noise_threads_skipped(self) -> None: + # Build a synthetic message in noise lane. + messages = [ + { + "fixture_id": "msg-noise-001", + "message_id": "", + "subject": "Daily digest for you", + "from": "no-reply@newsletter.example.com", + "to": ["operator@example.test"], + "date": "2026-01-01T00:00:00Z", + "mailbox": "operator@example.test", + "folder": "Inbox", + "locator": {"uid": "99", "account": "fixture-account"}, + "attachments": [], + } + ] + from mailplus_intelligence.threading import reconstruct_fixture_threads as rft + + for thread in rft(messages): + result = extract_with_llm(thread, messages, cassette={}) + self.assertEqual(result.candidates, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..5f6cac6 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,113 @@ +"""Tests for scheduler.py job locking and lifecycle events.""" + +from __future__ import annotations + +import time +import unittest + +from mailplus_intelligence.scheduler import ( + LOCK_STALE_SECONDS, + JobEvent, + acquire_lock, + get_job_status, + list_jobs, + release_lock, + run_job, +) +from mailplus_intelligence.sqlite import connect_sqlite + + +class SchedulerLockTests(unittest.TestCase): + def setUp(self) -> None: + self.conn = connect_sqlite(":memory:") + + def tearDown(self) -> None: + self.conn.close() + + def test_acquire_and_release(self) -> None: + acquired, events = acquire_lock(self.conn, "test-job") + self.assertTrue(acquired) + self.assertTrue(any(e.event == "acquired" for e in events)) + + status = get_job_status(self.conn, "test-job") + self.assertIsNotNone(status) + self.assertTrue(status.locked) + + release_events = release_lock(self.conn, "test-job", success=True) + self.assertTrue(any(e.event == "released" for e in release_events)) + + status = get_job_status(self.conn, "test-job") + self.assertFalse(status.locked) + self.assertIsNotNone(status.last_success_at) + + def test_second_acquire_skipped_while_locked(self) -> None: + acquire_lock(self.conn, "test-job") + acquired2, events2 = acquire_lock(self.conn, "test-job") + self.assertFalse(acquired2) + self.assertTrue(any(e.event == "skipped" for e in events2)) + + def test_stale_lock_cleared_on_acquire(self) -> None: + from datetime import datetime, timedelta, timezone + + # Manually insert a stale lock. + from mailplus_intelligence.scheduler import _ensure_scheduler_table + _ensure_scheduler_table(self.conn) + stale_time = (datetime.now(timezone.utc) - timedelta(seconds=LOCK_STALE_SECONDS + 10)).isoformat() + self.conn.execute( + "INSERT INTO scheduler_locks (job_name, locked, locked_at, lock_holder) VALUES (?, 1, ?, ?)", + ("stale-job", stale_time, "old-holder"), + ) + self.conn.commit() + + acquired, events = acquire_lock(self.conn, "stale-job") + self.assertTrue(acquired) + self.assertTrue(any(e.event == "stale_cleared" for e in events)) + + def test_run_job_executes_fn(self) -> None: + results = [] + ran, result = run_job(self.conn, "fn-job", lambda: results.append(1) or "done") + self.assertTrue(ran) + self.assertEqual(result, "done") + self.assertEqual(results, [1]) + status = get_job_status(self.conn, "fn-job") + self.assertFalse(status.locked) + self.assertIsNotNone(status.last_success_at) + + def test_run_job_skips_when_locked(self) -> None: + acquire_lock(self.conn, "fn-job") + ran, result = run_job(self.conn, "fn-job", lambda: 99) + self.assertFalse(ran) + self.assertIsNone(result) + + def test_run_job_releases_on_exception(self) -> None: + def boom(): + raise ValueError("oops") + + with self.assertRaises(ValueError): + run_job(self.conn, "boom-job", boom) + status = get_job_status(self.conn, "boom-job") + self.assertFalse(status.locked) + + def test_list_jobs(self) -> None: + acquire_lock(self.conn, "job-a") + acquire_lock(self.conn, "job-b") + jobs = list_jobs(self.conn) + names = {j.job_name for j in jobs} + self.assertIn("job-a", names) + self.assertIn("job-b", names) + + def test_get_job_status_none_for_unknown(self) -> None: + from mailplus_intelligence.scheduler import _ensure_scheduler_table + _ensure_scheduler_table(self.conn) + self.assertIsNone(get_job_status(self.conn, "unknown-job")) + + def test_event_sink_collects_events(self) -> None: + sink: list[JobEvent] = [] + run_job(self.conn, "sink-job", lambda: None, event_sink=sink) + event_types = {e.event for e in sink} + self.assertIn("acquired", event_types) + self.assertIn("released", event_types) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_sync.py b/tests/test_sync.py new file mode 100644 index 0000000..a3c046d --- /dev/null +++ b/tests/test_sync.py @@ -0,0 +1,62 @@ +"""Tests for the incremental sync pipeline.""" + +from __future__ import annotations + +import unittest + +from mailplus_intelligence.fixtures import load_metadata_fixture_corpus +from mailplus_intelligence.schema import apply_all_migrations +from mailplus_intelligence.sqlite import connect_sqlite +from mailplus_intelligence.sync import SyncBatch, get_checkpoint, run_sync_batch + + +FIXTURE_DIR = "fixtures/mailplus_metadata" + + +class SyncTests(unittest.TestCase): + def setUp(self) -> None: + self.conn = connect_sqlite(":memory:") + apply_all_migrations(self.conn) + + def tearDown(self) -> None: + self.conn.close() + + def test_dry_run_inserts_nothing(self) -> None: + corpus = load_metadata_fixture_corpus(FIXTURE_DIR) + batch = SyncBatch("test-source", "v1", corpus.messages) + result = run_sync_batch(self.conn, batch, dry_run=True) + self.assertTrue(result.success) + self.assertEqual(result.inserted, 0) + self.assertGreater(result.skipped, 0) + + def test_live_run_inserts_records(self) -> None: + corpus = load_metadata_fixture_corpus(FIXTURE_DIR) + batch = SyncBatch("test-source", "v1", corpus.messages) + result = run_sync_batch(self.conn, batch) + self.assertTrue(result.success) + self.assertGreater(result.inserted, 0) + + def test_idempotent_second_run_skips_all(self) -> None: + corpus = load_metadata_fixture_corpus(FIXTURE_DIR) + batch = SyncBatch("test-source", "v1", corpus.messages) + run_sync_batch(self.conn, batch) + result2 = run_sync_batch(self.conn, batch) + self.assertTrue(result2.success) + self.assertEqual(result2.inserted, 0) + + def test_checkpoint_updated_on_success(self) -> None: + corpus = load_metadata_fixture_corpus(FIXTURE_DIR) + batch = SyncBatch("test-source", "cursor-abc", corpus.messages) + run_sync_batch(self.conn, batch) + cp = get_checkpoint(self.conn, "test-source") + self.assertIsNotNone(cp) + self.assertEqual(cp["cursor"], "cursor-abc") + self.assertIsNotNone(cp["last_success_at"]) + + def test_checkpoint_none_before_any_sync(self) -> None: + cp = get_checkpoint(self.conn, "never-run") + self.assertIsNone(cp) + + +if __name__ == "__main__": + unittest.main()