From c1577ee25f692f676c0b79bf358fe73bf7f7f362 Mon Sep 17 00:00:00 2001 From: Jonathan Kingston Date: Sun, 29 Mar 2026 20:13:57 +0100 Subject: [PATCH] Make CQ propose local-first under team sync. Keep durable local knowledge units even when team sync is enabled by writing locally before team submission and tracking sync state for retries instead of deleting synced local rows. Preserve KU IDs across local and team stores so merged reads and later propagation remain coherent. Made-with: Cursor --- README.md | 12 +- docs/architecture.md | 13 +- plugins/cq/commands/cq-status.md | 6 +- plugins/cq/server/cq_mcp/local_store.py | 107 +++++++++++- plugins/cq/server/cq_mcp/server.py | 174 +++++++++++++++----- plugins/cq/server/cq_mcp/team_client.py | 2 + plugins/cq/server/tests/test_local_store.py | 84 ++++++++++ plugins/cq/server/tests/test_server.py | 112 ++++++++++--- plugins/cq/server/tests/test_team_client.py | 19 +++ team-api/team_api/app.py | 28 +++- team-api/tests/test_app.py | 17 ++ 11 files changed, 482 insertions(+), 92 deletions(-) diff --git a/README.md b/README.md index 43b3efe..b1ac11b 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,17 @@ cq works out of the box in **local-only mode** with no configuration. Set enviro | `CQ_TEAM_ADDR` | No | *(disabled)* | Team API URL. Set to enable team sync (e.g. `http://localhost:8742`) | | `CQ_TEAM_API_KEY` | When team configured | — | API key for team API authentication | -When `CQ_TEAM_ADDR` is unset or empty, cq runs in local-only mode — knowledge stays on your machine. Set it to a team API URL to enable shared knowledge across your team. +When `CQ_TEAM_ADDR` is unset or empty, cq runs in local-only mode and knowledge stays on your machine. Set it to a team API URL to enable shared knowledge across your team while keeping a durable local copy. + +## Storage Semantics + +cq always keeps a machine-local knowledge store. Team sync adds a shared store; it does not replace the local one. + +- `query` reads the local store first and, when `CQ_TEAM_ADDR` is configured and reachable, also queries the team API. Results are merged, deduplicated by knowledge-unit ID, and returned with a `source` value showing whether they came from `local`, `team`, or `both`. +- `propose` stores the new knowledge unit locally first. If team sync is configured, cq then submits the same knowledge unit to the team API using the same ID. +- If the team API is unreachable, the local copy remains stored and is marked for retry. +- If the team API rejects the shared submission, the local copy still remains stored; rejection affects team sharing, not machine-local durability. +- Team query visibility depends on the team review workflow. A proposal can be submitted to the team API immediately but may not appear in team query results until it is approved there. ### Claude Code diff --git a/docs/architecture.md b/docs/architecture.md index 8fae69b..bc9ac1b 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -85,15 +85,14 @@ sequenceDiagram CC->>MCP: propose(summary="...", domain=["api","webhooks"]) MCP->>MCP: Guardrails check (PII, prompt injection, quality) MCP->>Local: Store as ku_abc123 (confidence: 0.5) - MCP-->>CC: Stored locally as ku_abc123 - - Note over CC,Team: Graduation to team requires human approval... - - MCP->>Team: POST /propose (flagged for HITL review) + MCP->>Team: POST /propose with ku_abc123 Team-->>MCP: Queued for review + MCP-->>CC: Stored locally; team submission queued + + Note over CC,Team: Team visibility requires human approval... ``` -The agent queries before writing code, avoiding repeated failures. When it discovers something novel, it proposes a new knowledge unit. The proposal passes through guardrails (PII detection, prompt injection filtering, quality checks) before entering the local store. Graduation to the team store is not automatic — it requires human approval through a review process. In the enterprise path, a team reviewer approves promotion; in the individual path, the contributor nominates local knowledge directly for global graduation. +The agent queries before writing code, avoiding repeated failures. When it discovers something novel, it proposes a new knowledge unit. The proposal passes through guardrails (PII detection, prompt injection filtering, quality checks) before entering the local store. When team sync is configured, cq then submits the same knowledge unit to the team API using the same ID. Human review governs whether that proposal becomes visible through team query results; it does not determine whether the local copy persists. In the enterprise path, a team reviewer approves shared visibility; in the individual path, the contributor nominates local knowledge directly for global graduation. --- @@ -336,7 +335,7 @@ The tiered architecture implies different storage characteristics at each level. | Tier | Backing Store | Characteristics | |------|--------------|-----------------| -| **Tier 1: Local** | SQLite / embedded | Fast, offline-capable, private. Data never leaves the machine unless explicitly graduated. | +| **Tier 1: Local** | SQLite / embedded | Fast, offline-capable, private by default. This is the durable machine-scoped store even when team sync is enabled. | | **Tier 2: Team** | Postgres + pgvector | Multi-user access, RBAC, hybrid keyword + semantic search. Natural home for the enterprise SaaS offering. | | **Tier 3: Global** | Federated / decentralised | Publicly readable, highly available, resistant to single points of failure. Content-addressed storage for immutability and provenance. | diff --git a/plugins/cq/commands/cq-status.md b/plugins/cq/commands/cq-status.md index 9d25e56..63a4d37 100644 --- a/plugins/cq/commands/cq-status.md +++ b/plugins/cq/commands/cq-status.md @@ -38,12 +38,12 @@ Present the results using this structure: If the response includes `promoted_to_team`, add this line after the total count: ``` -Promoted {promoted_to_team} knowledge units to team at startup. +Synced {promoted_to_team} pending local knowledge units to team at startup. ``` ## Empty Store When `total_count` is 0: -- **With `promoted_to_team`:** Show the header, total count line, and promotion line. Omit Domains, Recent Additions, and Confidence sections (there is no data to display). -- **Without `promoted_to_team`:** Display only: "The local cq store is empty. Knowledge units are added via `propose` or the `/cq:reflect` command." +- Display the header and total count line, then: "The local cq store is empty. Knowledge units are added via `propose` or the `/cq:reflect` command." +- If `promoted_to_team` is present, add the startup sync line after the empty-store message. diff --git a/plugins/cq/server/cq_mcp/local_store.py b/plugins/cq/server/cq_mcp/local_store.py index 5d99bb1..929dec5 100644 --- a/plugins/cq/server/cq_mcp/local_store.py +++ b/plugins/cq/server/cq_mcp/local_store.py @@ -13,6 +13,7 @@ import sqlite3 import threading from datetime import UTC, datetime +from enum import StrEnum from pathlib import Path from types import TracebackType from typing import Any @@ -38,6 +39,15 @@ _LEGACY_DB_PATH = Path.home() / ".cq" / "local.db" +class TeamSyncStatus(StrEnum): + """Local sync status for propagating a KU to the team store.""" + + NOT_APPLICABLE = "not_applicable" + PENDING = "pending" + SYNCED = "synced" + REJECTED = "rejected" + + def _default_db_path() -> Path: """Return the default database path per the XDG Base Directory spec.""" xdg = os.environ.get("XDG_DATA_HOME") @@ -96,7 +106,10 @@ class StoreStats(BaseModel): _SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS knowledge_units ( id TEXT PRIMARY KEY, - data TEXT NOT NULL + data TEXT NOT NULL, + team_sync_status TEXT NOT NULL DEFAULT 'not_applicable', + team_sync_attempted_at TEXT, + team_sync_error TEXT ); CREATE TABLE IF NOT EXISTS knowledge_unit_domains ( @@ -115,6 +128,12 @@ class StoreStats(BaseModel): USING fts5(id UNINDEXED, summary, detail, action); """ +_SYNC_COLUMN_STATEMENTS = [ + "ALTER TABLE knowledge_units ADD COLUMN team_sync_status TEXT NOT NULL DEFAULT 'not_applicable'", + "ALTER TABLE knowledge_units ADD COLUMN team_sync_attempted_at TEXT", + "ALTER TABLE knowledge_units ADD COLUMN team_sync_error TEXT", +] + def _normalise_domains(domains: list[str]) -> list[str]: """Lowercase, strip whitespace, drop empties, and deduplicate domain tags.""" @@ -195,6 +214,19 @@ def _ensure_schema(self) -> None: """Create tables, indexes, and FTS virtual table if they do not exist.""" self._conn.executescript(_SCHEMA_SQL) self._conn.executescript(_FTS_SCHEMA_SQL) + self._ensure_sync_columns() + + def _ensure_sync_columns(self) -> None: + """Add team sync metadata columns if they do not exist.""" + existing = { + row[1] + for row in self._conn.execute("PRAGMA table_info(knowledge_units)").fetchall() + } + for statement in _SYNC_COLUMN_STATEMENTS: + column = statement.split("COLUMN ")[1].split()[0] + if column not in existing: + self._conn.execute(statement) + self._conn.commit() def _check_open(self) -> None: """Raise if the store has been closed.""" @@ -227,7 +259,14 @@ def db_path(self) -> Path: """Path to the SQLite database file.""" return self._db_path - def insert(self, unit: KnowledgeUnit) -> None: + def insert( + self, + unit: KnowledgeUnit, + *, + team_sync_status: TeamSyncStatus = TeamSyncStatus.NOT_APPLICABLE, + team_sync_attempted_at: datetime | None = None, + team_sync_error: str | None = None, + ) -> None: """Insert a knowledge unit into the store. Args: @@ -242,12 +281,21 @@ def insert(self, unit: KnowledgeUnit) -> None: raise ValueError("At least one non-empty domain is required") unit = unit.model_copy(update={"domain": domains}) data = unit.model_dump_json() + attempted_at = team_sync_attempted_at.isoformat() if team_sync_attempted_at else None with self._lock: self._check_open() with self._conn: self._conn.execute( - "INSERT INTO knowledge_units (id, data) VALUES (?, ?)", - (unit.id, data), + "INSERT INTO knowledge_units " + "(id, data, team_sync_status, team_sync_attempted_at, team_sync_error) " + "VALUES (?, ?, ?, ?, ?)", + ( + unit.id, + data, + team_sync_status.value, + attempted_at, + team_sync_error, + ), ) self._conn.executemany( "INSERT INTO knowledge_unit_domains (unit_id, domain) VALUES (?, ?)", @@ -284,6 +332,35 @@ def all(self) -> list[KnowledgeUnit]: rows = self._conn.execute("SELECT data FROM knowledge_units").fetchall() return [KnowledgeUnit.model_validate_json(row[0]) for row in rows] + def pending_sync_units(self) -> list[KnowledgeUnit]: + """Return local KUs that still need propagation to the team store.""" + with self._lock: + self._check_open() + rows = self._conn.execute( + "SELECT data FROM knowledge_units " + "WHERE team_sync_status = ? " + "ORDER BY COALESCE(team_sync_attempted_at, '') ASC, id ASC", + (TeamSyncStatus.PENDING.value,), + ).fetchall() + return [KnowledgeUnit.model_validate_json(row[0]) for row in rows] + + def team_sync_status(self, unit_id: str) -> dict[str, str | None] | None: + """Return local team sync metadata for a knowledge unit.""" + with self._lock: + self._check_open() + row = self._conn.execute( + "SELECT team_sync_status, team_sync_attempted_at, team_sync_error " + "FROM knowledge_units WHERE id = ?", + (unit_id,), + ).fetchone() + if row is None: + return None + return { + "status": row[0], + "attempted_at": row[1], + "error": row[2], + } + def delete(self, unit_id: str) -> None: """Remove a knowledge unit by ID. @@ -350,6 +427,28 @@ def update(self, unit: KnowledgeUnit) -> None: (unit.id, unit.insight.summary, unit.insight.detail, unit.insight.action), ) + def update_team_sync_status( + self, + unit_id: str, + status: TeamSyncStatus, + *, + error: str | None = None, + attempted_at: datetime | None = None, + ) -> None: + """Update local team sync metadata for an existing knowledge unit.""" + timestamp = attempted_at or datetime.now(UTC) + with self._lock: + self._check_open() + with self._conn: + cursor = self._conn.execute( + "UPDATE knowledge_units " + "SET team_sync_status = ?, team_sync_attempted_at = ?, team_sync_error = ? " + "WHERE id = ?", + (status.value, timestamp.isoformat(), error, unit_id), + ) + if cursor.rowcount == 0: + raise KeyError(f"Knowledge unit not found: {unit_id}") + def query( self, domains: list[str], diff --git a/plugins/cq/server/cq_mcp/server.py b/plugins/cq/server/cq_mcp/server.py index cfb4a28..29925cf 100644 --- a/plugins/cq/server/cq_mcp/server.py +++ b/plugins/cq/server/cq_mcp/server.py @@ -12,6 +12,7 @@ import os from collections.abc import AsyncIterator from contextlib import asynccontextmanager +from datetime import UTC, datetime from pathlib import Path from mcp.server.fastmcp import FastMCP @@ -24,7 +25,7 @@ Tier, create_knowledge_unit, ) -from .local_store import LocalStore +from .local_store import LocalStore, TeamSyncStatus from .scoring import apply_confirmation, apply_flag, calculate_relevance from .team_client import TeamClient, TeamRejectedError @@ -103,14 +104,14 @@ async def _close_team_client() -> None: async def _drain_local_to_team() -> None: - """Promote locally-stored fallback KUs to the team API. + """Retry team sync for locally-stored KUs still marked pending. Runs once at MCP server startup when CQ_TEAM_ADDR is configured. - KUs that were stored locally as a fallback (e.g. when the team API - was temporarily unreachable) are proposed to team concurrently. - Successfully promoted KUs are deleted from local store; failures - (transport errors or rejections) are left in place for retry on - the next startup. + Only KUs explicitly marked as pending sync are retried. + Successfully synced KUs remain in the local store and have their + sync metadata updated in place. Rejections are marked rejected so + they are not retried indefinitely. Transport failures remain pending + for the next startup repair pass. """ global _drain_promoted_count # noqa: PLW0603 team_client = _get_team_client() @@ -118,7 +119,7 @@ async def _drain_local_to_team() -> None: return store = _get_store() - units = await asyncio.to_thread(store.all) + units = await asyncio.to_thread(store.pending_sync_units) if not units: _drain_promoted_count = 0 return @@ -127,20 +128,42 @@ async def _drain_local_to_team() -> None: async def _promote(unit: KnowledgeUnit) -> bool: async with sem: + attempted_at = datetime.now(UTC) try: result = await team_client.propose(unit) - except TeamRejectedError: + except TeamRejectedError as exc: + await asyncio.to_thread( + store.update_team_sync_status, + unit.id, + TeamSyncStatus.REJECTED, + error=exc.detail, + attempted_at=attempted_at, + ) logger.warning( - "Team API rejected local KU %s; will retry next startup.", + "Team API rejected local KU %s; marked rejected locally.", unit.id, ) return False if result is None: + await asyncio.to_thread( + store.update_team_sync_status, + unit.id, + TeamSyncStatus.PENDING, + error="Team API unreachable during startup sync.", + attempted_at=attempted_at, + ) logger.warning( "Team API unreachable for local KU %s; will retry next startup.", unit.id, ) return False + await asyncio.to_thread( + store.update_team_sync_status, + unit.id, + TeamSyncStatus.SYNCED, + error=None, + attempted_at=attempted_at, + ) return True # Process in fixed-size batches to bound the number of in-flight tasks. @@ -156,18 +179,17 @@ async def _promote(unit: KnowledgeUnit) -> bool: result, ) elif result is True: - await asyncio.to_thread(store.delete, unit.id) promoted += 1 _drain_promoted_count = promoted - logger.info("Promoted %d/%d local KUs to team.", promoted, len(units)) + logger.info("Synced %d/%d pending local KUs to team.", promoted, len(units)) @asynccontextmanager async def _lifespan(_server: FastMCP) -> AsyncIterator[None]: """Manage MCP server startup and shutdown. - On startup: drains locally-stored fallback KUs to team API (if configured). + On startup: retries pending local-to-team sync work (if configured). On shutdown: closes the team client and local store. """ await _drain_local_to_team() @@ -225,19 +247,20 @@ def _merge_results( "local", ) - seen_ids: set[str] = set() + local_by_id = {unit.id: unit for unit in local_units} + team_by_id = {unit.id: unit for unit in team_units} merged: list[KnowledgeUnit] = [] - # Local results take precedence for duplicate IDs. - for unit in local_units: - if unit.id not in seen_ids: - seen_ids.add(unit.id) - merged.append(unit) - - for unit in team_units: - if unit.id not in seen_ids: - seen_ids.add(unit.id) - merged.append(unit) + for unit_id in dict.fromkeys([*local_by_id.keys(), *team_by_id.keys()]): + local_unit = local_by_id.get(unit_id) + team_unit = team_by_id.get(unit_id) + if local_unit is None: + merged.append(team_unit) + continue + if team_unit is None: + merged.append(local_unit) + continue + merged.append(_prefer_merged_unit(local_unit, team_unit)) # Source reflects which stores were consulted and returned data. has_local = len(local_units) > 0 @@ -267,6 +290,22 @@ def _merge_results( return [u.model_dump(mode="json") for u in top], source +def _prefer_merged_unit(local_unit: KnowledgeUnit, team_unit: KnowledgeUnit) -> KnowledgeUnit: + """Choose the best representation for a KU present in both stores.""" + local_last = local_unit.evidence.last_confirmed or local_unit.evidence.first_observed + team_last = team_unit.evidence.last_confirmed or team_unit.evidence.first_observed + if local_last and team_last: + if team_last > local_last: + return team_unit + if local_last > team_last: + return local_unit + elif team_last is not None: + return team_unit + elif local_last is not None: + return local_unit + return team_unit + + @mcp.tool(name="query") async def query( domain: list[str], @@ -354,9 +393,9 @@ async def propose( """Propose a new knowledge unit. Propose flow scenarios: - - Team configured and reachable: proposal goes to team only, nothing stored locally. - - Team configured but unreachable: falls back to local storage. - - Team configured but rejects the proposal: returns error, nothing stored locally. + - Team configured and reachable: store locally, then submit the same KU to team. + - Team configured but unreachable: store locally and leave the KU pending sync. + - Team configured but rejects the proposal: store locally and mark team sync rejected. - No team configured: always stores locally. Args: @@ -369,8 +408,9 @@ async def propose( pattern: Optional pattern name. Returns: - Dict with ``id``, ``tier``, ``message``, and ``team_id`` - (if pushed to team), or ``error`` if inputs are invalid. + Dict with ``id``, ``tier``, ``source``, ``sync_status``, and + ``message``. Includes ``team`` status details when team sync is + configured, or ``error`` if inputs are invalid. """ cleaned_summary = summary.strip() cleaned_detail = detail.strip() @@ -400,27 +440,71 @@ async def propose( tier=Tier.LOCAL, ) + store = _get_store() team_client = _get_team_client() - if team_client is not None: - try: - team_unit = await team_client.propose(unit) - except TeamRejectedError as exc: - return {"error": f"Team API rejected proposal: {exc.detail}"} - if team_unit is not None: - return { - "id": team_unit.id, - "tier": team_unit.tier.value, - "message": f"Knowledge unit proposed to team as {team_unit.id}.", - } - logger.warning("Team API unreachable; falling back to local storage.") + initial_sync_status = ( + TeamSyncStatus.NOT_APPLICABLE if team_client is None else TeamSyncStatus.PENDING + ) + await asyncio.to_thread(store.insert, unit, team_sync_status=initial_sync_status) - store = _get_store() - await asyncio.to_thread(store.insert, unit) - return { + result = { "id": unit.id, "tier": unit.tier.value, + "source": "local", + "sync_status": initial_sync_status.value, "message": f"Knowledge unit {unit.id} stored locally.", } + if team_client is None: + return result + + attempted_at = datetime.now(UTC) + try: + team_unit = await team_client.propose(unit) + except TeamRejectedError as exc: + await asyncio.to_thread( + store.update_team_sync_status, + unit.id, + TeamSyncStatus.REJECTED, + error=exc.detail, + attempted_at=attempted_at, + ) + result["sync_status"] = TeamSyncStatus.REJECTED.value + result["message"] = ( + f"Knowledge unit {unit.id} stored locally; team rejected the shared submission." + ) + result["team"] = {"status": "rejected", "error": exc.detail} + return result + + if team_unit is None: + logger.warning("Team API unreachable after local write; leaving KU pending sync.") + await asyncio.to_thread( + store.update_team_sync_status, + unit.id, + TeamSyncStatus.PENDING, + error="Team API unreachable during propose.", + attempted_at=attempted_at, + ) + result["sync_status"] = TeamSyncStatus.PENDING.value + result["team"] = {"status": "error", "error": "Team API unreachable during propose."} + result["message"] = ( + f"Knowledge unit {unit.id} stored locally; team sync is pending retry." + ) + return result + + await asyncio.to_thread( + store.update_team_sync_status, + unit.id, + TeamSyncStatus.SYNCED, + error=None, + attempted_at=attempted_at, + ) + result["source"] = "both" + result["sync_status"] = TeamSyncStatus.SYNCED.value + result["team"] = {"status": "ok"} + result["message"] = ( + f"Knowledge unit {unit.id} stored locally and submitted to team." + ) + return result @mcp.tool(name="confirm") @@ -571,7 +655,7 @@ async def status() -> dict: Dict with ``total_count``, ``domain_counts``, ``recent`` (serialised knowledge units), ``confidence_distribution``, and ``team`` (connectivity status). Includes ``promoted_to_team`` - when KUs were drained at startup. + when pending KUs were synced at startup. """ store = _get_store() stats = await asyncio.to_thread(store.stats) diff --git a/plugins/cq/server/cq_mcp/team_client.py b/plugins/cq/server/cq_mcp/team_client.py index bd9b8cc..314aa58 100644 --- a/plugins/cq/server/cq_mcp/team_client.py +++ b/plugins/cq/server/cq_mcp/team_client.py @@ -144,9 +144,11 @@ async def propose(self, unit: KnowledgeUnit) -> KnowledgeUnit | None: proposal with an HTTP 4xx/5xx status. """ body = { + "id": unit.id, "domain": unit.domain, "insight": unit.insight.model_dump(mode="json"), "context": unit.context.model_dump(mode="json"), + "evidence": unit.evidence.model_dump(mode="json"), "created_by": unit.created_by, } try: diff --git a/plugins/cq/server/tests/test_local_store.py b/plugins/cq/server/tests/test_local_store.py index 4bcc4cb..8528eec 100644 --- a/plugins/cq/server/tests/test_local_store.py +++ b/plugins/cq/server/tests/test_local_store.py @@ -19,6 +19,7 @@ _FTS_MAX_TERM_LENGTH, _FTS_MAX_TERMS, LocalStore, + TeamSyncStatus, _build_fts_match_expr, _default_db_path, _migrate_legacy_db, @@ -75,6 +76,26 @@ def _inspect_tables(db_path: Path) -> list[str]: conn.close() +def _inspect_sync_metadata(db_path: Path, unit_id: str) -> dict[str, str | None] | None: + """Read team sync metadata directly from SQLite for test assertions.""" + conn = _inspect_connection(db_path) + try: + row = conn.execute( + "SELECT team_sync_status, team_sync_attempted_at, team_sync_error " + "FROM knowledge_units WHERE id = ?", + (unit_id,), + ).fetchone() + if row is None: + return None + return { + "status": row[0], + "attempted_at": row[1], + "error": row[2], + } + finally: + conn.close() + + @pytest.fixture() def store(tmp_path: Path) -> Iterator[LocalStore]: s = LocalStore(db_path=tmp_path / "test.db") @@ -201,6 +222,16 @@ def test_creates_domains_table(self, store: LocalStore): tables = _inspect_tables(store.db_path) assert "knowledge_unit_domains" in tables + def test_creates_team_sync_columns(self, store: LocalStore): + unit = _make_unit(domain=["api"]) + store.insert(unit) + metadata = _inspect_sync_metadata(store.db_path, unit.id) + assert metadata == { + "status": TeamSyncStatus.NOT_APPLICABLE.value, + "attempted_at": None, + "error": None, + } + def test_idempotent_schema_creation(self, tmp_path: Path): db_path = tmp_path / "test.db" store1 = LocalStore(db_path=db_path) @@ -258,6 +289,19 @@ def test_insert_with_empty_domains_raises(self, store: LocalStore): with pytest.raises(ValueError, match="At least one non-empty domain"): store.insert(unit) + def test_insert_persists_team_sync_metadata(self, store: LocalStore): + unit = _make_unit(domain=["api"]) + store.insert( + unit, + team_sync_status=TeamSyncStatus.PENDING, + team_sync_error="Team API unreachable during propose.", + ) + metadata = _inspect_sync_metadata(store.db_path, unit.id) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.PENDING.value + assert metadata["attempted_at"] is None + assert metadata["error"] == "Team API unreachable during propose." + class TestGet: def test_returns_none_for_missing_id(self, store: LocalStore): @@ -280,6 +324,9 @@ def test_roundtrip_preserves_all_fields(self, store: LocalStore): assert retrieved.evidence == unit.evidence assert retrieved.insight == unit.insight + def test_team_sync_status_returns_none_for_missing_id(self, store: LocalStore): + assert store.team_sync_status("ku_nonexistent") is None + class TestUpdate: def test_update_persists_changes(self, store: LocalStore): @@ -329,6 +376,26 @@ def test_update_after_flag_reduces_confidence(self, store: LocalStore): assert retrieved.evidence.confidence == pytest.approx(0.35) assert len(retrieved.flags) == 1 + def test_update_team_sync_status_persists_metadata(self, store: LocalStore): + unit = _make_unit(domain=["api"]) + store.insert(unit, team_sync_status=TeamSyncStatus.PENDING) + + store.update_team_sync_status( + unit.id, + TeamSyncStatus.SYNCED, + attempted_at=datetime.now(UTC), + ) + + metadata = store.team_sync_status(unit.id) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.SYNCED.value + assert metadata["attempted_at"] is not None + assert metadata["error"] is None + + def test_update_team_sync_status_missing_unit_raises(self, store: LocalStore): + with pytest.raises(KeyError, match="Knowledge unit not found"): + store.update_team_sync_status("ku_missing", TeamSyncStatus.PENDING) + class TestQuery: def test_returns_units_with_matching_domain(self, store: LocalStore): @@ -843,6 +910,23 @@ def test_all_raises_when_store_closed(self, store: LocalStore) -> None: store.all() +class TestPendingSyncUnits: + def test_returns_only_pending_units(self, store: LocalStore) -> None: + pending = _make_unit(domain=["api"]) + synced = _make_unit(domain=["databases"]) + store.insert(pending, team_sync_status=TeamSyncStatus.PENDING) + store.insert(synced, team_sync_status=TeamSyncStatus.SYNCED) + + results = store.pending_sync_units() + + assert [unit.id for unit in results] == [pending.id] + + def test_pending_sync_units_raises_when_store_closed(self, store: LocalStore) -> None: + store.close() + with pytest.raises(RuntimeError, match="closed"): + store.pending_sync_units() + + class TestDelete: def test_delete_removes_unit(self, store: LocalStore) -> None: unit = _make_unit(domain=["api"]) diff --git a/plugins/cq/server/tests/test_server.py b/plugins/cq/server/tests/test_server.py index 4c38110..f94d1de 100644 --- a/plugins/cq/server/tests/test_server.py +++ b/plugins/cq/server/tests/test_server.py @@ -23,6 +23,7 @@ reflect, status, ) +from cq_mcp.local_store import TeamSyncStatus from cq_mcp.team_client import TeamQueryResult, TeamRejectedError @@ -187,8 +188,8 @@ async def test_query_deduplicates_by_id( result = await query(domain=["api"]) assert len(result["results"]) == 1 - # Local version takes precedence. - assert result["results"][0]["tier"] == "local" + # Team version takes precedence when evidence freshness is equal. + assert result["results"][0]["tier"] == "team" # Source reflects that both stores were consulted. assert result["source"] == "both" @@ -321,12 +322,14 @@ async def test_propose_pushes_to_team( monkeypatch.setattr(server, "_get_team_client", lambda: mock_client) result = await _propose_unit(domain=["api"]) - assert result["id"] == "ku_team_pushed" - assert result["tier"] == "team" - assert "proposed to team" in result["message"] + assert result["id"].startswith("ku_") + assert result["tier"] == "local" + assert result["source"] == "both" + assert result["sync_status"] == TeamSyncStatus.SYNCED.value + assert "submitted to team" in result["message"] mock_client.propose.assert_called_once() - async def test_propose_skips_local_store_when_team_succeeds( + async def test_propose_keeps_local_store_when_team_succeeds( self, monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -338,11 +341,18 @@ async def test_propose_skips_local_store_when_team_succeeds( ) monkeypatch.setattr(server, "_get_team_client", lambda: mock_client) - await _propose_unit(domain=["api"]) + proposed = await _propose_unit(domain=["api"]) local_results = await query(domain=["api"]) - assert len(local_results["results"]) == 0 + assert len(local_results["results"]) == 1 + assert local_results["results"][0]["id"] == proposed["id"] + store = server._get_store() + metadata = store.team_sync_status(proposed["id"]) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.SYNCED.value + assert metadata["attempted_at"] is not None + assert metadata["error"] is None - async def test_propose_returns_error_when_team_rejects( + async def test_propose_keeps_local_store_when_team_rejects( self, monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -354,10 +364,17 @@ async def test_propose_returns_error_when_team_rejects( monkeypatch.setattr(server, "_get_team_client", lambda: mock_client) result = await _propose_unit(domain=["api"]) - assert "error" in result - assert "rejected" in result["error"].lower() + assert result["source"] == "local" + assert result["sync_status"] == TeamSyncStatus.REJECTED.value + assert result["team"]["status"] == "rejected" local_results = await query(domain=["api"]) - assert len(local_results["results"]) == 0 + assert len(local_results["results"]) == 1 + assert local_results["results"][0]["id"] == result["id"] + store = server._get_store() + metadata = store.team_sync_status(result["id"]) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.REJECTED.value + assert metadata["error"] == "Invalid domain" async def test_propose_falls_back_to_local_when_team_unreachable( self, @@ -373,9 +390,15 @@ async def test_propose_falls_back_to_local_when_team_unreachable( result = await _propose_unit(domain=["api"]) assert result["id"].startswith("ku_") assert result["tier"] == "local" - assert "stored locally" in result["message"] + assert result["source"] == "local" + assert result["sync_status"] == TeamSyncStatus.PENDING.value + assert "pending retry" in result["message"] local_results = await query(domain=["api"]) assert len(local_results["results"]) == 1 + store = server._get_store() + metadata = store.team_sync_status(result["id"]) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.PENDING.value class TestCqConfirm: @@ -610,14 +633,14 @@ def _make_local_unit(*, domain: list[str] | None = None) -> KnowledgeUnit: class TestDrainLocalToTeam: - async def test_drain_promotes_local_kus_to_team( + async def test_drain_syncs_pending_local_kus_to_team( self, monkeypatch: pytest.MonkeyPatch, ) -> None: - """Local KUs are proposed to team and deleted from local store.""" + """Pending local KUs are proposed to team and kept locally.""" store = server._get_store() unit = _make_local_unit(domain=["api"]) - store.insert(unit) + store.insert(unit, team_sync_status=TeamSyncStatus.PENDING) team_unit = _make_team_unit(unit_id="ku_team_promoted") mock_client = AsyncMock() @@ -626,14 +649,19 @@ async def test_drain_promotes_local_kus_to_team( await server._drain_local_to_team() - assert store.all() == [] + remaining = store.all() + assert len(remaining) == 1 + assert remaining[0].id == unit.id + metadata = store.team_sync_status(unit.id) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.SYNCED.value assert server._drain_promoted_count == 1 async def test_drain_skips_when_no_team_client(self) -> None: """Drain does nothing when team is not configured.""" store = server._get_store() unit = _make_local_unit(domain=["api"]) - store.insert(unit) + store.insert(unit, team_sync_status=TeamSyncStatus.PENDING) await server._drain_local_to_team() @@ -646,7 +674,7 @@ async def test_drain_keeps_unit_on_transport_error( """KU stays local when team API is unreachable.""" store = server._get_store() unit = _make_local_unit(domain=["api"]) - store.insert(unit) + store.insert(unit, team_sync_status=TeamSyncStatus.PENDING) mock_client = AsyncMock() mock_client.propose.return_value = None @@ -655,16 +683,19 @@ async def test_drain_keeps_unit_on_transport_error( await server._drain_local_to_team() assert len(store.all()) == 1 + metadata = store.team_sync_status(unit.id) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.PENDING.value assert server._drain_promoted_count == 0 async def test_drain_keeps_unit_on_rejection( self, monkeypatch: pytest.MonkeyPatch, ) -> None: - """KU stays local when team API rejects it.""" + """Rejected KUs stay local and stop retrying.""" store = server._get_store() unit = _make_local_unit(domain=["api"]) - store.insert(unit) + store.insert(unit, team_sync_status=TeamSyncStatus.PENDING) mock_client = AsyncMock() mock_client.propose.side_effect = TeamRejectedError(422, "bad") @@ -673,18 +704,22 @@ async def test_drain_keeps_unit_on_rejection( await server._drain_local_to_team() assert len(store.all()) == 1 + metadata = store.team_sync_status(unit.id) + assert metadata is not None + assert metadata["status"] == TeamSyncStatus.REJECTED.value + assert metadata["error"] == "bad" assert server._drain_promoted_count == 0 async def test_drain_handles_mixed_results( self, monkeypatch: pytest.MonkeyPatch, ) -> None: - """Some KUs promote, some fail — only promoted ones are deleted.""" + """Some KUs sync, some fail — all remain local with updated status.""" store = server._get_store() u1 = _make_local_unit(domain=["api"]) u2 = _make_local_unit(domain=["databases"]) - store.insert(u1) - store.insert(u2) + store.insert(u1, team_sync_status=TeamSyncStatus.PENDING) + store.insert(u2, team_sync_status=TeamSyncStatus.PENDING) team_unit = _make_team_unit(unit_id="ku_team_ok") mock_client = AsyncMock() @@ -695,9 +730,36 @@ async def test_drain_handles_mixed_results( await server._drain_local_to_team() remaining = store.all() - assert len(remaining) == 1 + assert len(remaining) == 2 + statuses = { + store.team_sync_status(u1.id)["status"], + store.team_sync_status(u2.id)["status"], + } + assert statuses == { + TeamSyncStatus.SYNCED.value, + TeamSyncStatus.PENDING.value, + } assert server._drain_promoted_count == 1 + async def test_drain_ignores_non_pending_units( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + store = server._get_store() + pending = _make_local_unit(domain=["api"]) + synced = _make_local_unit(domain=["databases"]) + store.insert(pending, team_sync_status=TeamSyncStatus.PENDING) + store.insert(synced, team_sync_status=TeamSyncStatus.SYNCED) + + mock_client = AsyncMock() + mock_client.propose.return_value = _make_team_unit(unit_id=pending.id, domain=["api"]) + monkeypatch.setattr(server, "_get_team_client", lambda: mock_client) + + await server._drain_local_to_team() + + mock_client.propose.assert_called_once_with(pending) + assert store.team_sync_status(synced.id)["status"] == TeamSyncStatus.SYNCED.value + class TestCqStatusWithDrain: async def test_status_includes_promotion_count( diff --git a/plugins/cq/server/tests/test_team_client.py b/plugins/cq/server/tests/test_team_client.py index 1a5ea16..672fbcf 100644 --- a/plugins/cq/server/tests/test_team_client.py +++ b/plugins/cq/server/tests/test_team_client.py @@ -175,6 +175,25 @@ async def test_propose_parses_response( assert result is not None assert result.id == "ku_team_new" + async def test_propose_sends_id_and_evidence( + self, + client: TeamClient, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + captured: dict[str, object] = {} + unit = _sample_unit(unit_id="ku_supplied") + + async def handler(*_args: object, **kwargs: object) -> httpx.Response: + captured.update(kwargs) + return _mock_response(201, unit.model_dump(mode="json")) + + monkeypatch.setattr(client._client, "post", handler) + + await client.propose(unit) + + assert captured["json"]["id"] == "ku_supplied" + assert captured["json"]["evidence"]["confidence"] == unit.evidence.confidence + class TestTeamClientConfirm: async def test_confirm_returns_none_on_connection_error( diff --git a/team-api/team_api/app.py b/team-api/team_api/app.py index 601dd08..facc77c 100644 --- a/team-api/team_api/app.py +++ b/team-api/team_api/app.py @@ -14,6 +14,7 @@ from .review import router as review_router from .knowledge_unit import ( Context, + Evidence, FlagReason, Insight, KnowledgeUnit, @@ -27,9 +28,11 @@ class ProposeRequest(BaseModel): """Request body for proposing a new knowledge unit.""" + id: str | None = None domain: list[str] = Field(min_length=1) insight: Insight context: Context = Field(default_factory=Context) + evidence: Evidence | None = None created_by: str = "" @@ -102,13 +105,24 @@ def propose_unit(request: ProposeRequest) -> KnowledgeUnit: raise HTTPException( status_code=422, detail="At least one non-empty domain is required" ) - unit = create_knowledge_unit( - domain=domains, - insight=request.insight, - context=request.context, - tier=Tier.TEAM, - created_by=request.created_by, - ) + if request.id is None and request.evidence is None: + unit = create_knowledge_unit( + domain=domains, + insight=request.insight, + context=request.context, + tier=Tier.TEAM, + created_by=request.created_by, + ) + else: + unit = KnowledgeUnit( + id=request.id or create_knowledge_unit(domain=domains, insight=request.insight).id, + domain=domains, + insight=request.insight, + context=request.context, + evidence=request.evidence or Evidence(), + tier=Tier.TEAM, + created_by=request.created_by, + ) store.insert(unit) return unit diff --git a/team-api/tests/test_app.py b/team-api/tests/test_app.py index 118dd2f..036ba97 100644 --- a/team-api/tests/test_app.py +++ b/team-api/tests/test_app.py @@ -82,6 +82,23 @@ def test_propose_normalises_domains(self, client: TestClient) -> None: assert resp.status_code == 201 assert resp.json()["domain"] == ["api", "databases"] + def test_propose_preserves_client_supplied_id_and_evidence(self, client: TestClient) -> None: + payload = _propose_payload( + id="ku_supplied_id", + evidence={ + "confidence": 0.7, + "confirmations": 3, + "first_observed": "2026-03-29T00:00:00Z", + "last_confirmed": "2026-03-29T00:00:00Z", + }, + ) + resp = client.post("/propose", json=payload) + assert resp.status_code == 201 + body = resp.json() + assert body["id"] == "ku_supplied_id" + assert body["evidence"]["confidence"] == 0.7 + assert body["tier"] == "team" + class TestQuery: def _insert_unit(self, client: TestClient, **overrides: Any) -> dict[str, Any]: