Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# MailPlus live adapter credentials (never commit real values)
# Copy to .env and fill in before running the live adapter.

MAILPLUS_HOST=imap.example.com
MAILPLUS_USER=your-address@example.com
MAILPLUS_TOKEN=your-oauth2-bearer-token-or-app-password

# Optional overrides
MAILPLUS_MAILBOX=INBOX
MAILPLUS_PAGE_SIZE=50
47 changes: 47 additions & 0 deletions docs/live-adapter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Live Adapter

`live_adapter.py` bridges the fixture-backed sync pipeline to a real MailPlus
account. It produces the same `SyncBatch` type used by `run_sync_batch()`, so
no other code changes when switching from fixtures to live.

## Configuration

Copy `.env.example` to `.env` (never commit `.env`) and fill in:

| Variable | Required | Description |
|---|---|---|
| `MAILPLUS_HOST` | yes | IMAP or MailPlus API hostname |
| `MAILPLUS_USER` | yes | Mailbox address |
| `MAILPLUS_TOKEN` | yes | OAuth2 bearer token or app password |
| `MAILPLUS_MAILBOX` | no | Folder to sync (default `INBOX`) |
| `MAILPLUS_PAGE_SIZE` | no | Messages per batch (default `50`) |

If any required variable is absent, `load_live_config()` raises
`LiveAdapterNotConfigured`. CI omits these variables deliberately, so the live
path is never exercised in automated tests.

## Gate Pattern

```python
from mailplus_intelligence.live_adapter import LiveAdapterNotConfigured, fetch_batch, load_live_config

try:
config = load_live_config()
except LiveAdapterNotConfigured as exc:
print(f"Live adapter not available: {exc}")
raise SystemExit(1)

batch = fetch_batch(config, cursor="")
```

## Current Status

`_fetch_messages()` is a stub that returns an empty list. Replace it with the
MailPlus API client call once the client library is available. The public
interface (`fetch_batch`, `load_live_config`) is stable and will not change.

## Security

- Store credentials in `.env` or a secrets manager — never hard-code them.
- Rotate `MAILPLUS_TOKEN` on any suspected exposure.
- The adapter is metadata-only; it must never fetch raw message bodies.
65 changes: 65 additions & 0 deletions docs/phase2-planning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Phase 2 Planning

## Scope

Phase 2 extends the metadata intelligence pipeline from the offline/fixture
baseline (Phase 1) to a production-capable system. The three pillars are:

1. **LLM-backed extraction** — richer semantic artifacts from classified threads
2. **Live account sync** — incremental, checkpointed sync from real MailPlus accounts
3. **Scheduler + job locking** — reliable recurring sync without overlapping runs

---

## Completed in Phase 2 (this branch)

| Module | Issue | Status |
|---|---|---|
| `sync.py` | #3 | done |
| `extractor.py` | #6 | done |
| `llm_extractor.py` | #70 | done |
| `scheduler.py` | #74 | done |
| `live_adapter.py` | #71 | done |
| `cli.py` (search, queue, export, doctor) | #2, #4, #5, #7 | done |
| `index_writer.py` + search | #39 | done |

---

## Architectural Decisions

### Metadata-only invariant
Raw message bodies are never fetched, stored, or transmitted. All extraction
(deterministic and LLM) operates on subject, sender, date, folder, and
attachment metadata only. This is enforced at the mapper layer.

### Deterministic-first, LLM-second
`extractor.py` runs first and produces candidates with `provenance="deterministic"`.
`llm_extractor.py` runs on the same threads and produces candidates with
`provenance="llm"`. Downstream consumers can filter by provenance.

### Prompt caching
LLM extraction uses `cache_control: {"type": "ephemeral"}` on the shared
system prompt and per-thread context blocks. This reduces token costs
significantly when processing multiple threads in one session.

### Offline CI gate
`llm_extractor.py` accepts a `cassette` dict mapping thread IDs to recorded
response strings. CI passes a cassette so no Anthropic API calls are made.
Live calls happen only in environments where `ANTHROPIC_API_KEY` is set and
no cassette is provided.

### Job locking
`scheduler.py` uses a `scheduler_locks` SQLite table to prevent overlapping
runs. Locks older than `LOCK_STALE_SECONDS` (300 s) are considered stale and
cleared automatically.

---

## Phase 3 Candidates

- **MailPlus API client**: replace `live_adapter._fetch_messages()` stub
- **Streaming LLM extraction**: use `client.messages.stream()` for large threads
- **Promotion workflow UI**: web interface for the queue review flow
- **Multi-account support**: per-account checkpoints and lane configuration
- **Relationship graph**: entity_update artifacts feeding a contact knowledge graph
- **Evaluation regressions**: nightly evaluation run against a golden fixture set
60 changes: 60 additions & 0 deletions src/mailplus_intelligence/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,56 @@ def cmd_export(args: argparse.Namespace) -> int:
return 0


# ── sync ─────────────────────────────────────────────────────────────────────


def cmd_sync(args: argparse.Namespace) -> int:
from .scheduler import get_job_status, list_jobs
from .sync import get_checkpoint

conn = _setup_db(args.db)
try:
if args.sync_action == "status":
if args.job:
status = get_job_status(conn, args.job)
if status is None:
print(f"No job registered: {args.job}")
return 1
jobs = [status]
else:
jobs = list_jobs(conn)
if args.json:
print(json.dumps([j.__dict__ for j in jobs], indent=2))
else:
if not jobs:
print("No sync jobs registered.")
for j in jobs:
lock_str = f"LOCKED by {j.lock_holder}" if j.locked else "unlocked"
print(f"{j.job_name} [{lock_str}]")
print(f" last run: {j.last_run_at or 'never'}")
print(f" last success: {j.last_success_at or 'never'}")

elif args.sync_action == "checkpoint":
source = args.source or "fixture-corpus"
cp = get_checkpoint(conn, source)
if cp is None:
print(f"No checkpoint for source: {source}")
return 1
if args.json:
print(json.dumps(cp, indent=2))
else:
print(f"source: {cp.get('source_name')}")
print(f"cursor: {cp.get('cursor') or '(none)'}")
print(f"last attempt: {cp.get('last_attempt_at') or 'never'}")
print(f"last success: {cp.get('last_success_at') or 'never'}")
else:
print("Usage: mpi sync {status|checkpoint}")
return 1
finally:
conn.close()
return 0


# ── doctor ────────────────────────────────────────────────────────────────────

def cmd_doctor(args: argparse.Namespace) -> int:
Expand Down Expand Up @@ -223,6 +273,14 @@ def build_parser() -> argparse.ArgumentParser:
ep = sub.add_parser("export", help="Dry-run export of approved candidates")
ep.add_argument("--output", default="./export-artifacts", help="Output directory")

# sync
syp = sub.add_parser("sync", help="Sync job status and checkpoint inspection")
sya = syp.add_subparsers(dest="sync_action")
ss = sya.add_parser("status", help="List scheduler job statuses")
ss.add_argument("--job", help="Filter by job name")
sc = sya.add_parser("checkpoint", help="Show sync checkpoint for a source")
sc.add_argument("--source", help="Source name (default: fixture-corpus)")

# doctor
dp = sub.add_parser("doctor", help="Run fixture-mode preflight checks")
dp.add_argument("--project-root", dest="project_root", default=".")
Expand All @@ -244,6 +302,8 @@ def main(argv: list[str] | None = None) -> int:
return cmd_export(args)
elif args.command == "doctor":
return cmd_doctor(args)
elif args.command == "sync":
return cmd_sync(args)
else:
parser.print_help()
return 1
Expand Down
Loading
Loading