Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,17 @@ cq works out of the box in **local-only mode** with no configuration. Set enviro
| `CQ_TEAM_ADDR` | No | *(disabled)* | Team API URL. Set to enable team sync (e.g. `http://localhost:8742`) |
| `CQ_TEAM_API_KEY` | When team configured | — | API key for team API authentication |

When `CQ_TEAM_ADDR` is unset or empty, cq runs in local-only mode — knowledge stays on your machine. Set it to a team API URL to enable shared knowledge across your team.
When `CQ_TEAM_ADDR` is unset or empty, cq runs in local-only mode and knowledge stays on your machine. Set it to a team API URL to enable shared knowledge across your team while keeping a durable local copy.

## Storage Semantics

cq always keeps a machine-local knowledge store. Team sync adds a shared store; it does not replace the local one.

- `query` reads the local store first and, when `CQ_TEAM_ADDR` is configured and reachable, also queries the team API. Results are merged, deduplicated by knowledge-unit ID, and returned with a `source` value showing whether they came from `local`, `team`, or `both`.
- `propose` stores the new knowledge unit locally first. If team sync is configured, cq then submits the same knowledge unit to the team API using the same ID.
- If the team API is unreachable, the local copy remains stored and is marked for retry.
- If the team API rejects the shared submission, the local copy still remains stored; rejection affects team sharing, not machine-local durability.
- Team query visibility depends on the team review workflow. A proposal can be submitted to the team API immediately but may not appear in team query results until it is approved there.

### Claude Code

Expand Down
13 changes: 6 additions & 7 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,14 @@ sequenceDiagram
CC->>MCP: propose(summary="...", domain=["api","webhooks"])
MCP->>MCP: Guardrails check (PII, prompt injection, quality)
MCP->>Local: Store as ku_abc123 (confidence: 0.5)
MCP-->>CC: Stored locally as ku_abc123

Note over CC,Team: Graduation to team requires human approval...

MCP->>Team: POST /propose (flagged for HITL review)
MCP->>Team: POST /propose with ku_abc123
Team-->>MCP: Queued for review
MCP-->>CC: Stored locally; team submission queued

Note over CC,Team: Team visibility requires human approval...
```

The agent queries before writing code, avoiding repeated failures. When it discovers something novel, it proposes a new knowledge unit. The proposal passes through guardrails (PII detection, prompt injection filtering, quality checks) before entering the local store. Graduation to the team store is not automatic — it requires human approval through a review process. In the enterprise path, a team reviewer approves promotion; in the individual path, the contributor nominates local knowledge directly for global graduation.
The agent queries before writing code, avoiding repeated failures. When it discovers something novel, it proposes a new knowledge unit. The proposal passes through guardrails (PII detection, prompt injection filtering, quality checks) before entering the local store. When team sync is configured, cq then submits the same knowledge unit to the team API using the same ID. Human review governs whether that proposal becomes visible through team query results; it does not determine whether the local copy persists. In the enterprise path, a team reviewer approves shared visibility; in the individual path, the contributor nominates local knowledge directly for global graduation.

---

Expand Down Expand Up @@ -336,7 +335,7 @@ The tiered architecture implies different storage characteristics at each level.

| Tier | Backing Store | Characteristics |
|------|--------------|-----------------|
| **Tier 1: Local** | SQLite / embedded | Fast, offline-capable, private. Data never leaves the machine unless explicitly graduated. |
| **Tier 1: Local** | SQLite / embedded | Fast, offline-capable, private by default. This is the durable machine-scoped store even when team sync is enabled. |
| **Tier 2: Team** | Postgres + pgvector | Multi-user access, RBAC, hybrid keyword + semantic search. Natural home for the enterprise SaaS offering. |
| **Tier 3: Global** | Federated / decentralised | Publicly readable, highly available, resistant to single points of failure. Content-addressed storage for immutability and provenance. |

Expand Down
6 changes: 3 additions & 3 deletions plugins/cq/commands/cq-status.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ Present the results using this structure:
If the response includes `promoted_to_team`, add this line after the total count:

```
Promoted {promoted_to_team} knowledge units to team at startup.
Synced {promoted_to_team} pending local knowledge units to team at startup.
```

## Empty Store

When `total_count` is 0:

- **With `promoted_to_team`:** Show the header, total count line, and promotion line. Omit Domains, Recent Additions, and Confidence sections (there is no data to display).
- **Without `promoted_to_team`:** Display only: "The local cq store is empty. Knowledge units are added via `propose` or the `/cq:reflect` command."
- Display the header and total count line, then: "The local cq store is empty. Knowledge units are added via `propose` or the `/cq:reflect` command."
- If `promoted_to_team` is present, add the startup sync line after the empty-store message.
107 changes: 103 additions & 4 deletions plugins/cq/server/cq_mcp/local_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sqlite3
import threading
from datetime import UTC, datetime
from enum import StrEnum
from pathlib import Path
from types import TracebackType
from typing import Any
Expand All @@ -38,6 +39,15 @@
_LEGACY_DB_PATH = Path.home() / ".cq" / "local.db"


class TeamSyncStatus(StrEnum):
"""Local sync status for propagating a KU to the team store."""

NOT_APPLICABLE = "not_applicable"
PENDING = "pending"
SYNCED = "synced"
REJECTED = "rejected"


def _default_db_path() -> Path:
"""Return the default database path per the XDG Base Directory spec."""
xdg = os.environ.get("XDG_DATA_HOME")
Expand Down Expand Up @@ -96,7 +106,10 @@ class StoreStats(BaseModel):
_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS knowledge_units (
id TEXT PRIMARY KEY,
data TEXT NOT NULL
data TEXT NOT NULL,
team_sync_status TEXT NOT NULL DEFAULT 'not_applicable',
team_sync_attempted_at TEXT,
team_sync_error TEXT
);

CREATE TABLE IF NOT EXISTS knowledge_unit_domains (
Expand All @@ -115,6 +128,12 @@ class StoreStats(BaseModel):
USING fts5(id UNINDEXED, summary, detail, action);
"""

_SYNC_COLUMN_STATEMENTS = [
"ALTER TABLE knowledge_units ADD COLUMN team_sync_status TEXT NOT NULL DEFAULT 'not_applicable'",
"ALTER TABLE knowledge_units ADD COLUMN team_sync_attempted_at TEXT",
"ALTER TABLE knowledge_units ADD COLUMN team_sync_error TEXT",
]


def _normalise_domains(domains: list[str]) -> list[str]:
"""Lowercase, strip whitespace, drop empties, and deduplicate domain tags."""
Expand Down Expand Up @@ -195,6 +214,19 @@ def _ensure_schema(self) -> None:
"""Create tables, indexes, and FTS virtual table if they do not exist."""
self._conn.executescript(_SCHEMA_SQL)
self._conn.executescript(_FTS_SCHEMA_SQL)
self._ensure_sync_columns()

def _ensure_sync_columns(self) -> None:
"""Add team sync metadata columns if they do not exist."""
existing = {
row[1]
for row in self._conn.execute("PRAGMA table_info(knowledge_units)").fetchall()
}
for statement in _SYNC_COLUMN_STATEMENTS:
column = statement.split("COLUMN ")[1].split()[0]
if column not in existing:
self._conn.execute(statement)
self._conn.commit()

def _check_open(self) -> None:
"""Raise if the store has been closed."""
Expand Down Expand Up @@ -227,7 +259,14 @@ def db_path(self) -> Path:
"""Path to the SQLite database file."""
return self._db_path

def insert(self, unit: KnowledgeUnit) -> None:
def insert(
self,
unit: KnowledgeUnit,
*,
team_sync_status: TeamSyncStatus = TeamSyncStatus.NOT_APPLICABLE,
team_sync_attempted_at: datetime | None = None,
team_sync_error: str | None = None,
) -> None:
"""Insert a knowledge unit into the store.

Args:
Expand All @@ -242,12 +281,21 @@ def insert(self, unit: KnowledgeUnit) -> None:
raise ValueError("At least one non-empty domain is required")
unit = unit.model_copy(update={"domain": domains})
data = unit.model_dump_json()
attempted_at = team_sync_attempted_at.isoformat() if team_sync_attempted_at else None
with self._lock:
self._check_open()
with self._conn:
self._conn.execute(
"INSERT INTO knowledge_units (id, data) VALUES (?, ?)",
(unit.id, data),
"INSERT INTO knowledge_units "
"(id, data, team_sync_status, team_sync_attempted_at, team_sync_error) "
"VALUES (?, ?, ?, ?, ?)",
(
unit.id,
data,
team_sync_status.value,
attempted_at,
team_sync_error,
),
)
self._conn.executemany(
"INSERT INTO knowledge_unit_domains (unit_id, domain) VALUES (?, ?)",
Expand Down Expand Up @@ -284,6 +332,35 @@ def all(self) -> list[KnowledgeUnit]:
rows = self._conn.execute("SELECT data FROM knowledge_units").fetchall()
return [KnowledgeUnit.model_validate_json(row[0]) for row in rows]

def pending_sync_units(self) -> list[KnowledgeUnit]:
"""Return local KUs that still need propagation to the team store."""
with self._lock:
self._check_open()
rows = self._conn.execute(
"SELECT data FROM knowledge_units "
"WHERE team_sync_status = ? "
"ORDER BY COALESCE(team_sync_attempted_at, '') ASC, id ASC",
(TeamSyncStatus.PENDING.value,),
).fetchall()
return [KnowledgeUnit.model_validate_json(row[0]) for row in rows]

def team_sync_status(self, unit_id: str) -> dict[str, str | None] | None:
"""Return local team sync metadata for a knowledge unit."""
with self._lock:
self._check_open()
row = self._conn.execute(
"SELECT team_sync_status, team_sync_attempted_at, team_sync_error "
"FROM knowledge_units WHERE id = ?",
(unit_id,),
).fetchone()
if row is None:
return None
return {
"status": row[0],
"attempted_at": row[1],
"error": row[2],
}

def delete(self, unit_id: str) -> None:
"""Remove a knowledge unit by ID.

Expand Down Expand Up @@ -350,6 +427,28 @@ def update(self, unit: KnowledgeUnit) -> None:
(unit.id, unit.insight.summary, unit.insight.detail, unit.insight.action),
)

def update_team_sync_status(
self,
unit_id: str,
status: TeamSyncStatus,
*,
error: str | None = None,
attempted_at: datetime | None = None,
) -> None:
"""Update local team sync metadata for an existing knowledge unit."""
timestamp = attempted_at or datetime.now(UTC)
with self._lock:
self._check_open()
with self._conn:
cursor = self._conn.execute(
"UPDATE knowledge_units "
"SET team_sync_status = ?, team_sync_attempted_at = ?, team_sync_error = ? "
"WHERE id = ?",
(status.value, timestamp.isoformat(), error, unit_id),
)
if cursor.rowcount == 0:
raise KeyError(f"Knowledge unit not found: {unit_id}")

def query(
self,
domains: list[str],
Expand Down
Loading