diff --git a/DECISION_LOG.md b/DECISION_LOG.md index a01980d..cb8b9cb 100644 --- a/DECISION_LOG.md +++ b/DECISION_LOG.md @@ -26,6 +26,35 @@ Add a `workmem backup` subcommand that writes an age-encrypted snapshot of the g - **Include project-scoped DBs automatically.** Rejected: project DBs belong to workspaces, not to the user's top-level knowledge. Auto-including them couples backup to filesystem scanning and makes the unit of restore ambiguous. A `backup` invocation per workspace is explicit. - **Include telemetry.db in the snapshot.** Rejected: telemetry is operational, rebuildable, and has a different lifecycle than knowledge. Mixing them also risks leaking telemetry via recall if paths cross. +## 2026-04-14: Port telemetry with Go-native refinements and add privacy-strict mode + +### Context + +The Node reference implementation shipped with opt-in telemetry in a separate SQLite DB (schemas: `tool_calls`, `search_metrics`). Phase 3 deferred the port until the Go MCP entrypoint was real and adopted. That condition is now met: the Go binary serves Claude Code, Kilo, and Codex in production. Time to port. + +### Decision + +Port the Node telemetry design to Go and preserve the guiding principles (opt-in via env, separate database, counts-only for results, content replaced with ``). Refine three Node-era shortcuts: + +1. **Nil-tolerant `*Client`** — the client value is `nil` when disabled, every method returns immediately on `nil` receiver. Replaces per-callsite `if TELEMETRY_ENABLED` checks. +2. **No globals** — the client is constructed in `cmd/workmem/main.go` and plumbed via `mcpserver.Config{Telemetry: …}`. Replaces the Node pattern of module-level mutable state (`_telemetryDb`, `_lastSearchMetrics`, etc.). +3. **`SearchMemory` returns `SearchMetrics` as a tuple** — `(results []SearchObservation, metrics SearchMetrics, err error)`. Replaces the Node `_lastSearchMetrics` side-channel. + +Add a new **privacy-strict mode** (`MEMORY_TELEMETRY_PRIVACY=strict`): entity names, queries, and event labels are sha256-hashed before storage. Intended for sensitive backends (e.g., the `private_memory` server backing therapy/health/relationship content). + +### Rationale + +- Node-era globals would have been awkward in Go and hard to test under parallel `t.Run` — eliminating them keeps the test story clean. +- Privacy-strict closes a real threat: local plaintext telemetry DB on a laptop with sensitive entity names is a leak vector if the laptop is lost/sync'd/exported. Strict mode lets one binary serve two wiring contexts (permissive `memory`, strict `private_memory`) cleanly. +- `SearchMemory` returning metrics as a proper value is idiomatic Go and testable in isolation without the telemetry package. +- Using `modernc.org/sqlite` for the telemetry DB keeps the pure-Go single-binary invariant (no CGO addition just for the analytics path). + +### Alternatives considered + +- **1:1 port with globals** — Rejected because Go's `database/sql` + `sql.Stmt` lifecycle around a package-level mutable pointer becomes painful under test; the nil-client pattern is simpler and safer. +- **Attach telemetry as an MCP tool** — Rejected as in the Node design: telemetry is human-developer infrastructure, not a model capability. Adding a tool wastes context tokens on every call for every client. +- **Encryption at rest on the telemetry DB with a keychain-stored key** — Rejected for this iteration. Cross-platform keychain integration (macOS/Windows/Linux headless) is a bigger cantiere than hashing the sensitive fields. Revisit if the strict mode proves insufficient in practice. + ## 2026-04-14: Use the official Go MCP SDK for transport ### Context diff --git a/IMPLEMENTATION.md b/IMPLEMENTATION.md index ae17b82..8a634da 100644 --- a/IMPLEMENTATION.md +++ b/IMPLEMENTATION.md @@ -108,4 +108,19 @@ Ship a `workmem backup` subcommand that produces an age-encrypted snapshot of th - [x] Wire `backup` subcommand in `cmd/workmem/main.go` with `--to`, `--age-recipient` (repeatable), `--db`, `--env-file` - [x] README section documenting usage and manual `age -d` restore -**On Step Gate (all items [x]):** trigger correctness review focused on crypto wiring and VACUUM INTO error paths. \ No newline at end of file +**On Step Gate (all items [x]):** trigger correctness review focused on crypto wiring and VACUUM INTO error paths. + +### Step 3.5: Telemetry [✅] + +Port the Node telemetry design to Go with Go-native refinements and a new privacy-strict mode. **Gate:** when `MEMORY_TELEMETRY_PATH` is set, every tool call lands a row in `tool_calls`; every `recall` lands a row in `search_metrics` linked by `tool_call_id`; when unset, no DB is created and no overhead is added. In `MEMORY_TELEMETRY_PRIVACY=strict` mode, entity/query/label values are sha256-hashed before storage. + +- [x] Build `internal/telemetry` package (nil-tolerant Client, schema, sanitize, hash, detect) +- [x] Refactor `SearchMemory` to return `(results, metrics, err)` — no globals, no side channels +- [x] Wire `*telemetry.Client` through `cmd/workmem/main.go` and `mcpserver.Config` +- [x] Wrap `mcpserver` dispatch with duration + args/result sanitization + LogToolCall/LogSearchMetrics +- [x] Unit tests for package (nil-client safety, init failure, strict hashing, sanitize, detect) +- [x] Integration tests: enabled roundtrip / disabled zero overhead / privacy-strict +- [x] `docs/TELEMETRY.md` adapted for Go with privacy-strict documented +- [x] Telemetry invariants wired into `OPERATIONS.md` + +**On Step Gate (all items [x]):** trigger correctness review on telemetry hook points and strict-mode hashing. diff --git a/OPERATIONS.md b/OPERATIONS.md index 0eae5b6..5202444 100644 --- a/OPERATIONS.md +++ b/OPERATIONS.md @@ -11,6 +11,9 @@ - Live-data queries must never bypass tombstone guards. - FTS cleanup must never use raw `DELETE` against a contentless FTS table. - `remember_event` must be atomic: the event row and all attached observations commit together or not at all. Proof: `TestRememberEventAtomicityOnMidLoopFailure` in `internal/store/parity_test.go`. +- Telemetry is opt-in (`MEMORY_TELEMETRY_PATH`) and never affects the tool call success path. Init failure logs a single warning to stderr and disables telemetry for the session; the main memory DB is unaffected. +- Telemetry data lives in its own SQLite file, physically separate from the memory database. No foreign keys, no joins, no shared lifecycle. +- When `MEMORY_TELEMETRY_PRIVACY=strict`, entity names, queries, and event labels must be sha256-hashed before reaching disk. Observation/content values are always reduced to `` regardless of mode. ## Active Debt @@ -32,12 +35,6 @@ Blast radius: Late failures on Linux or Windows packaging, or FTS behavior drift Fix: Keep the canary in CI and run it on at least macOS, Linux, and Windows before calling the persistence layer portable. Done when: the same schema/FTS canary passes in cross-build validation. -- Telemetry parity is consciously deferred until the new Go MCP entrypoint is wired into a real client and the request path is considered stable. -Trigger: Instrumenting before the client-facing transport contract has been debugged in practice. -Blast radius: Busywork telemetry code tied to temporary wiring. -Fix: keep telemetry scope documented; implement it once the Kilo-facing transport path is stable. -Done when: the Go MCP entrypoint is live under a real client and tool-call telemetry can be attached once, not retrofitted twice. - - FTS5 viability is proven locally on the chosen driver, but not yet in a cross-platform validation matrix. Trigger: Assuming a passing local canary implies release-target portability. Blast radius: Search or forget semantics break only after packaging or OS expansion. @@ -54,15 +51,10 @@ Done when: FTS-specific parity tests pass across the release matrix. ### P2 -- Telemetry schema and migration strategy are not yet designed. -Trigger: Reaching post-parity milestone without an observability plan. -Blast radius: Delayed adoption of telemetry in the Go port. -Fix: Define minimal telemetry compatibility after core parity lands. -Done when: telemetry design is recorded and scheduled. +- None active. ## Pre-Launch TODO -- Prove MCP stdio compatibility with Kilo or another real client. - Prove schema initialization and migrations on clean and upgraded DBs. - Prove forget semantics including FTS deletion. - Prove project isolation. diff --git a/cmd/workmem/main.go b/cmd/workmem/main.go index ae3d213..63bc93e 100644 --- a/cmd/workmem/main.go +++ b/cmd/workmem/main.go @@ -12,6 +12,7 @@ import ( "workmem/internal/dotenv" "workmem/internal/mcpserver" "workmem/internal/store" + "workmem/internal/telemetry" ) func main() { @@ -101,8 +102,16 @@ func runMCP(args []string) { loadEnvFile(*envFile) - runtime, err := mcpserver.New(mcpserver.Config{DBPath: *dbPath}) + // Ownership of the telemetry client transfers to the Runtime only after + // New returns successfully. If New fails, the DB was already opened by + // FromEnv and must be closed here — otherwise the handle leaks. + tele := telemetry.FromEnv() + runtime, err := mcpserver.New(mcpserver.Config{ + DBPath: *dbPath, + Telemetry: tele, + }) if err != nil { + _ = tele.Close() // nil-safe no-op when telemetry is disabled fmt.Fprintf(os.Stderr, "start mcp server: %v\n", err) os.Exit(1) } diff --git a/docs/TELEMETRY.md b/docs/TELEMETRY.md new file mode 100644 index 0000000..1b508d8 --- /dev/null +++ b/docs/TELEMETRY.md @@ -0,0 +1,190 @@ +# Telemetry + +> Opt-in usage analytics for workmem. Zero overhead when disabled. Separate database, strict privacy controls. + +## Enabling + +Set `MEMORY_TELEMETRY_PATH` to a file path. When telemetry is enabled, the database is opened and the schema is initialized at process startup (before any tool call). If initialization fails, a single warning is printed to stderr and telemetry is disabled for the rest of the session — the main memory path is never affected. + +**Via `.env`:** +```bash +MEMORY_TELEMETRY_PATH=./telemetry.db +``` + +**Via client config (example for Claude Code's `~/.claude.json`):** +```json +{ + "memory": { + "command": "/path/to/workmem", + "args": ["-env-file", "/path/to/memory.env"], + "env": { + "MEMORY_TELEMETRY_PATH": "/absolute/path/to/telemetry.db" + } + } +} +``` + +When `MEMORY_TELEMETRY_PATH` is unset (the default), telemetry is disabled end-to-end: no database is opened, no schema is written, no rows are inserted. The dispatch wrapper skips its timing block entirely when the telemetry client is `nil`, so the only cost on the no-telemetry path is a single pointer-nil check per tool call. + +## Privacy modes + +Telemetry supports two modes, controlled by `MEMORY_TELEMETRY_PRIVACY`: + +| Value | Mode | Behavior | +|-------|------|----------| +| (unset) / any other value | **permissive** (default) | entity names, queries, and event labels are stored in plaintext | +| `strict` | **strict** | entity names, queries, and event labels are sha256-hashed before storage | + +Strict mode is intended for sensitive instances such as a `private_memory` server backing therapy/health/relationship content. Ranking debug ("which queries overfetch candidates?") becomes harder in strict mode because plaintext queries are no longer recoverable — but sensitive identifiers never land on disk. + +Observation/content values are **always** reduced to `` regardless of mode. Batched-array fields are **always** reduced to a count marker using the field name — `facts` becomes ``, `observations` becomes ``. Strict mode only changes what happens to identifier-like fields. + +**Example `.env` for sensitive backend:** +```bash +MEMORY_TELEMETRY_PATH=/home/user/.local/state/workmem/private-telemetry.db +MEMORY_TELEMETRY_PRIVACY=strict +``` + +## What it logs + +### Tool calls (`tool_calls` table) + +Every MCP tool invocation is logged with: + +| Column | Example | +|--------|---------| +| `ts` | `2026-04-14T20:15:32.456` | +| `tool` | `recall`, `remember`, `forget` | +| `client_name` | `kilo`, `claude-code`, `cursor`, `windsurf`, `vscode-copilot` | +| `client_version` | `0.43.6` | +| `client_source` | `protocol` / `env` / `none` | +| `db_scope` | `global` / `project` | +| `project_path` | resolved absolute path, or null | +| `duration_ms` | `12.4` | +| `args_summary` | Sanitized JSON (see below) | +| `result_summary` | Counts only, never data | +| `is_error` | `0` or `1` | + +### Search ranking metrics (`search_metrics` table) + +For `recall` calls, additional metrics capture the ranking pipeline: + +| Column | Example | +|--------|---------| +| `tool_call_id` | FK into `tool_calls.id` | +| `query` | Search text (hashed in strict mode) | +| `channels` | `{"fts": 12, "fts_phrase": 3, "entity_exact": 1}` | +| `candidates_total` | `16` | +| `results_returned` | `5` | +| `limit_requested` | `20` | +| `score_min` | `0.32` | +| `score_max` | `0.87` | +| `score_median` | `0.61` | +| `compact` | `0` or `1` | + +## What it does NOT log + +- Observation content (replaced with ``, always) +- Full result payloads — only counts (entities returned, observations stored, etc.) +- In strict mode, any identifier (entity name, query, event label, from/to) + +## Client identity + +The server identifies which client is calling through two mechanisms: + +1. **MCP protocol** (primary) — the `initialize` handshake includes `clientInfo.name` and `clientInfo.version`. This is a required field in the MCP spec. +2. **Environment fingerprinting** (fallback) — when the protocol doesn't provide client info, the server detects the client from environment variables: + +| Client | Signal | +|--------|--------| +| Kilo | `KILO=1` (version from `KILOCODE_VERSION`) | +| Claude Code | `CLAUDE_CODE_SSE_PORT` set | +| Cursor | `CURSOR_TRACE_ID` set | +| Windsurf | `WINDSURF_EXTENSION_ID` set | +| VS Code Copilot | `VSCODE_MCP_HTTP_PREFER` set non-empty | +| VS Code (unknown extension) | `TERM_PROGRAM=vscode` | + +The `client_source` column tells you which mechanism fired: `protocol`, `env`, or `none`. + +## Querying the data + +The telemetry database is a standard SQLite file. Open it with any tool: `sqlite3`, DBeaver, Jupyter, pandas, etc. + +### Example queries + +**Tool usage by client:** +```sql +SELECT client_name, tool, COUNT(*) as calls, + ROUND(AVG(duration_ms), 1) as avg_ms +FROM tool_calls +GROUP BY client_name, tool +ORDER BY calls DESC; +``` + +**Search ranking quality (permissive mode — query is plaintext):** +```sql +SELECT query, candidates_total, results_returned, + ROUND(score_min, 3) as min, ROUND(score_max, 3) as max, + channels +FROM search_metrics +ORDER BY candidates_total DESC +LIMIT 20; +``` + +**Overfetch detection (candidates >> returned):** +```sql +SELECT query, candidates_total, results_returned, limit_requested, + ROUND(1.0 * results_returned / candidates_total, 2) as yield_ratio +FROM search_metrics +WHERE candidates_total > 0 +ORDER BY yield_ratio ASC +LIMIT 20; +``` + +**Error rate by tool:** +```sql +SELECT tool, COUNT(*) as total, + SUM(is_error) as errors, + ROUND(100.0 * SUM(is_error) / COUNT(*), 1) as error_pct +FROM tool_calls +GROUP BY tool +ORDER BY error_pct DESC; +``` + +**Channel effectiveness:** +```sql +SELECT json_each.key as channel, COUNT(*) as appearances +FROM search_metrics, json_each(search_metrics.channels) +GROUP BY channel +ORDER BY appearances DESC; +``` + +## Separate database + +Telemetry is stored in its own SQLite file, completely separate from `memory.db`. This means: + +- Deleting the telemetry DB has zero impact on your knowledge graph +- The telemetry DB can be wiped and recreated at any time +- No foreign keys or joins between telemetry and memory data +- Telemetry uses `journal_mode=WAL` for concurrent reads while the server writes + +## Init failure handling + +If the telemetry path is invalid or the database can't be opened, the server prints a single warning to stderr and disables telemetry for the rest of the session. It does not retry on every call. The main `memory.db` is unaffected — telemetry failure never breaks the tool call path. + +Example warning: +``` +[memory] telemetry init failed (disabled for this session): unable to open database file +``` + +## Design rationale + +**Why plaintext queries in permissive mode?** Local, single-user development: the telemetry DB lives on your machine, is gitignored, and is only readable by you. Redacting queries permissively would make the analytics useless — you can't answer "which queries produce too many candidates?" if the query text is hashed. + +**Why privacy-strict mode?** For backends holding sensitive content (therapy, health, relationships, personal journaling), even local plaintext can matter: laptop loss, accidental sync-folder placement, or exported snapshots. Strict mode ensures entity names and queries never land on disk in the clear. + +**Why a separate SQLite and not the memory DB?** Separation of concerns. The memory DB is your knowledge graph. Telemetry is operational data with a different lifecycle (wipe freely, aggregate, export). Mixing them risks accidentally leaking telemetry via `recall`, or losing telemetry history when the memory DB is rebuilt. + +**Why not an MCP tool?** Telemetry is infrastructure, not a capability the model needs. Adding a tool would cost context tokens on every call for something only the human developer uses. Query the SQLite directly. + +**Why a nil-tolerant `*Client`?** The alternative is `if TELEMETRY_ENABLED` checks sprinkled across every call site. The `nil`-receiver pattern keeps the dispatch code clean — the wrapper always calls `LogToolCall`; when telemetry is disabled, the client is `nil` and the method returns immediately. diff --git a/internal/mcpserver/server.go b/internal/mcpserver/server.go index ce9216b..98d64c7 100644 --- a/internal/mcpserver/server.go +++ b/internal/mcpserver/server.go @@ -10,22 +10,35 @@ import ( "os" "path/filepath" "strings" + "sync/atomic" + "time" "github.com/modelcontextprotocol/go-sdk/mcp" "workmem/internal/store" + "workmem/internal/telemetry" ) const serverVersion = "0.1.0" +// Config carries the construction parameters for a Runtime. Ownership of the +// Telemetry client transfers to the Runtime: Runtime.Close() will close it. +// Callers must not call Close on the client themselves once it has been +// handed to New. type Config struct { - DBPath string + DBPath string + Telemetry *telemetry.Client } type Runtime struct { server *mcp.Server defaultDB *sql.DB dbPath string + // telemetry is stored as an atomic.Pointer so concurrent reads by + // in-flight tool handlers cannot race with the Close-time Swap during + // shutdown. Reads use Load(); Close uses Swap(nil) to atomically take + // ownership of the pointer for cleanup. + telemetry atomic.Pointer[telemetry.Client] } type toolDefinition struct { @@ -55,6 +68,7 @@ func New(config Config) (*Runtime, error) { }, nil) runtime := &Runtime{server: server, defaultDB: db, dbPath: dbPath} + runtime.telemetry.Store(config.Telemetry) runtime.registerTools() return runtime, nil } @@ -78,7 +92,18 @@ func (r *Runtime) Close() error { closeErr = r.defaultDB.Close() r.defaultDB = nil } - return errors.Join(closeErr, store.ResetProjectDBs()) + // atomic.Pointer.Swap atomically takes the old pointer and installs nil, + // so any in-flight handler that reaches its telemetry defer during + // shutdown observes a nil Load() and skips the log path entirely. No + // data race between this write and the concurrent reads in handleTool. + // The underlying Client is also mutex-guarded (see telemetry/client.go), + // so even if a handler already captured the old pointer before the swap + // its Log* calls will serialize with the Close below. + var teleErr error + if telemetryClient := r.telemetry.Swap(nil); telemetryClient != nil { + teleErr = telemetryClient.Close() + } + return errors.Join(closeErr, teleErr, store.ResetProjectDBs()) } func (r *Runtime) DBPath() string { @@ -99,21 +124,48 @@ func (r *Runtime) registerTools() { } func (r *Runtime) handleTool(_ context.Context, def toolDefinition, req *mcp.CallToolRequest) (*mcp.CallToolResult, error) { + // Telemetry observables — captured as the flow unfolds. The telemetry + // client pointer is read via atomic.Pointer.Load() once and captured + // into the closure below; a concurrent Runtime.Close() Swap cannot race + // with the captured value. When telemetry is disabled (Load returns + // nil) we skip both the time.Now/time.Since measurement and the defer + // installation entirely, so the no-telemetry path stays near-zero. + var ( + argObject map[string]any + toolResult any + metrics *store.SearchMetrics + projectRaw string + isError bool + ) + if tele := r.telemetry.Load(); tele != nil { + t0 := time.Now() + defer func() { + id := r.logToolCall(tele, def.Name, req, argObject, toolResult, projectRaw, isError, time.Since(t0)) + r.logSearchMetrics(tele, id, metrics) + }() + } + raw := req.Params.Arguments if len(raw) == 0 { raw = []byte("{}") } - argObject, err := parseArgumentObject(raw) + var err error + argObject, err = parseArgumentObject(raw) if err != nil { + isError = true return errorResult(map[string]any{ "error": err.Error(), "tool": def.Name, }), nil } + if p, ok := argObject["project"].(string); ok { + projectRaw = p + } missing := missingRequiredArguments(def.Required, argObject) if len(missing) > 0 { + isError = true return errorResult(map[string]any{ "error": fmt.Sprintf("Missing required arguments: %s", strings.Join(missing, ", ")), "tool": def.Name, @@ -122,11 +174,13 @@ func (r *Runtime) handleTool(_ context.Context, def toolDefinition, req *mcp.Cal } if validation := validateStringArguments(def, argObject); validation != nil { + isError = true return errorResult(validation), nil } var args store.ToolArgs if err := json.Unmarshal(raw, &args); err != nil { + isError = true return errorResult(map[string]any{ "error": fmt.Sprintf("Invalid arguments: %v", err), "tool": def.Name, @@ -134,8 +188,10 @@ func (r *Runtime) handleTool(_ context.Context, def toolDefinition, req *mcp.Cal }), nil } - result, err := store.HandleTool(r.defaultDB, def.Name, args) + toolResult, metrics, err = store.HandleToolWithMetrics(r.defaultDB, def.Name, args) if err != nil { + isError = true + toolResult = nil return errorResult(map[string]any{ "error": err.Error(), "tool": def.Name, @@ -143,7 +199,7 @@ func (r *Runtime) handleTool(_ context.Context, def toolDefinition, req *mcp.Cal }), nil } - return successResult(result) + return successResult(toolResult) } func ResolveDBPath(configPath string) (string, error) { diff --git a/internal/mcpserver/telemetry.go b/internal/mcpserver/telemetry.go new file mode 100644 index 0000000..b64ee2f --- /dev/null +++ b/internal/mcpserver/telemetry.go @@ -0,0 +1,100 @@ +package mcpserver + +import ( + "os" + "time" + + "github.com/modelcontextprotocol/go-sdk/mcp" + + "workmem/internal/store" + "workmem/internal/telemetry" +) + +// detectClient reads clientInfo from the MCP initialize handshake when +// available, falling back to environment-variable detection otherwise. +func detectClient(req *mcp.CallToolRequest) telemetry.ClientInfo { + var name, version string + if req != nil && req.Session != nil { + if params := req.Session.InitializeParams(); params != nil && params.ClientInfo != nil { + name = params.ClientInfo.Name + version = params.ClientInfo.Version + } + } + return telemetry.DetectClient(name, version) +} + +// resolveProjectPath resolves a project argument as provided by the caller +// to its absolute form. Returns "" in two cases: the caller passed no +// project, and the rare case where os.UserHomeDir fails. When the caller +// originally supplied a non-empty project, logToolCall still records +// db_scope="project" for that call — only the project_path column is +// written as NULL. The scope reflects the caller's intent, not the +// resolver's success. +func resolveProjectPath(project string) string { + if project == "" { + return "" + } + home, err := os.UserHomeDir() + if err != nil { + return "" + } + return store.ResolveProjectPath(project, home) +} + +// logToolCall inserts a tool_calls row and returns the insert id (or 0 when +// the telemetry client is nil / logging failed). The caller is expected to +// pass the pointer it captured from r.telemetry.Load() so this helper never +// re-reads the atomic field — that avoids any formal race against a +// concurrent Runtime.Close() that may have Swap(nil)'d the pointer after +// the defer fired. +func (r *Runtime) logToolCall( + tele *telemetry.Client, + toolName string, + req *mcp.CallToolRequest, + argObject map[string]any, + result any, + projectRaw string, + isError bool, + elapsed time.Duration, +) int64 { + if tele == nil { + return 0 + } + dbScope := "global" + projectPath := "" + if projectRaw != "" { + dbScope = "project" + projectPath = resolveProjectPath(projectRaw) + } + return tele.LogToolCall(telemetry.ToolCallInput{ + Tool: toolName, + Client: detectClient(req), + DBScope: dbScope, + ProjectPath: projectPath, + DurationMs: float64(elapsed) / float64(time.Millisecond), + ArgsSummary: telemetry.SanitizeArgs(argObject, tele.Strict()), + ResultSummary: telemetry.SummarizeResult(result), + IsError: isError, + }) +} + +// logSearchMetrics mirrors the recall search_metrics row. No-op when the +// captured telemetry client is nil or the parent tool_call id is 0. Same +// captured-pointer rationale as logToolCall. +func (r *Runtime) logSearchMetrics(tele *telemetry.Client, toolCallID int64, m *store.SearchMetrics) { + if tele == nil || toolCallID == 0 || m == nil { + return + } + tele.LogSearchMetrics(telemetry.SearchMetricsInput{ + ToolCallID: toolCallID, + Query: m.Query, + Channels: m.Channels, + CandidatesTotal: m.CandidatesTotal, + ResultsReturned: m.ResultsReturned, + LimitRequested: m.LimitRequested, + ScoreMin: m.ScoreMin, + ScoreMax: m.ScoreMax, + ScoreMedian: m.ScoreMedian, + Compact: m.Compact, + }) +} diff --git a/internal/mcpserver/telemetry_integration_test.go b/internal/mcpserver/telemetry_integration_test.go new file mode 100644 index 0000000..4809eed --- /dev/null +++ b/internal/mcpserver/telemetry_integration_test.go @@ -0,0 +1,277 @@ +package mcpserver + +import ( + "context" + "database/sql" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/modelcontextprotocol/go-sdk/mcp" + + _ "modernc.org/sqlite" + + "workmem/internal/telemetry" +) + +func TestTelemetryEnabledRoundtripLogsToolCallsAndSearchMetrics(t *testing.T) { + telePath := filepath.Join(t.TempDir(), "telemetry.db") + tele := telemetry.InitIfEnabled(telePath, false) + if tele == nil { + t.Fatalf("telemetry InitIfEnabled returned nil on valid path") + } + + runtime, err := New(Config{DBPath: filepath.Join(t.TempDir(), "memory.db"), Telemetry: tele}) + if err != nil { + t.Fatalf("New() error = %v", err) + } + + session, stop := startTelemetrySession(t, runtime) + defer stop() + ctx := context.Background() + + callOK(t, session, ctx, "remember", map[string]any{ + "entity": "TelemetryEntity", + "observation": "sensitive observation content must never leak", + }) + callOK(t, session, ctx, "recall", map[string]any{ + "query": "TelemetryEntity", + "limit": 5, + }) + callOK(t, session, ctx, "forget", map[string]any{ + "entity": "TelemetryEntity", + }) + + // Shut the runtime down before reading back. Ownership of tele is held + // by Runtime, so Runtime.Close() (triggered through stop -> Run's defer) + // is the right way to flush it. Direct tele.Close() here would violate + // the ownership contract documented in mcpserver.Config. + stop() + + rdb, err := sql.Open("sqlite", "file:"+filepath.Clean(telePath)+"?_pragma=foreign_keys(1)") + if err != nil { + t.Fatalf("open telemetry db for readback: %v", err) + } + defer rdb.Close() + + var callCount int + if err := rdb.QueryRow(`SELECT COUNT(*) FROM tool_calls`).Scan(&callCount); err != nil { + t.Fatalf("count tool_calls: %v", err) + } + if callCount != 3 { + t.Fatalf("tool_calls count = %d, want 3", callCount) + } + + rows, err := rdb.Query(`SELECT tool, args_summary, result_summary, duration_ms, is_error FROM tool_calls ORDER BY id`) + if err != nil { + t.Fatalf("query tool_calls: %v", err) + } + defer rows.Close() + var tools []string + for rows.Next() { + var tool string + var argsSummary, resultSummary sql.NullString + var dur float64 + var isErr int + if err := rows.Scan(&tool, &argsSummary, &resultSummary, &dur, &isErr); err != nil { + t.Fatalf("scan tool_calls row: %v", err) + } + tools = append(tools, tool) + if strings.Contains(argsSummary.String, "sensitive observation content") { + t.Fatalf("args_summary leaked observation content: %s", argsSummary.String) + } + if dur <= 0 { + t.Fatalf("duration_ms = %v for %s, want > 0", dur, tool) + } + if isErr != 0 { + t.Fatalf("is_error = %d for %s, want 0", isErr, tool) + } + } + expected := []string{"remember", "recall", "forget"} + for i, want := range expected { + if tools[i] != want { + t.Fatalf("tool[%d] = %q, want %q", i, tools[i], want) + } + } + + var smCount int + if err := rdb.QueryRow(`SELECT COUNT(*) FROM search_metrics`).Scan(&smCount); err != nil { + t.Fatalf("count search_metrics: %v", err) + } + if smCount != 1 { + t.Fatalf("search_metrics count = %d, want 1", smCount) + } + + var smQuery sql.NullString + var candidatesTotal, resultsReturned int + if err := rdb.QueryRow(`SELECT query, candidates_total, results_returned FROM search_metrics`).Scan(&smQuery, &candidatesTotal, &resultsReturned); err != nil { + t.Fatalf("readback search_metrics: %v", err) + } + if smQuery.String != "TelemetryEntity" { + t.Fatalf("search_metrics.query = %q, want TelemetryEntity", smQuery.String) + } + if candidatesTotal == 0 { + t.Fatalf("search_metrics.candidates_total = 0, want > 0") + } +} + +func TestTelemetryDisabledViaFromEnvCreatesNoArtifacts(t *testing.T) { + // Drive the full env → FromEnv → runtime → dispatch path with telemetry + // explicitly disabled, and verify that no telemetry artifact appears + // anywhere in the runtime's data directory after a complete tool call + // cycle. This proves the disabled path is silent end-to-end, not just + // that a hand-picked path happens not to exist. + t.Setenv("MEMORY_TELEMETRY_PATH", "") + t.Setenv("MEMORY_TELEMETRY_PRIVACY", "") + tele := telemetry.FromEnv() + if tele != nil { + t.Fatalf("FromEnv with MEMORY_TELEMETRY_PATH unset must return nil, got %+v", tele) + } + + dir := t.TempDir() + memDB := filepath.Join(dir, "memory.db") + runtime, err := New(Config{DBPath: memDB, Telemetry: tele}) + if err != nil { + t.Fatalf("New() error = %v", err) + } + + session, stop := startTelemetrySession(t, runtime) + defer stop() + ctx := context.Background() + + callOK(t, session, ctx, "remember", map[string]any{ + "entity": "NoTelemetryEntity", + "observation": "this must dispatch without creating any telemetry file", + }) + callOK(t, session, ctx, "recall", map[string]any{"query": "NoTelemetry", "limit": 3}) + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("readdir %q: %v", dir, err) + } + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + // memory.db + its WAL/SHM companions are expected; anything else is + // a telemetry leak. + if name == "memory.db" || strings.HasPrefix(name, "memory.db-") { + continue + } + t.Fatalf("unexpected artifact %q in data dir — telemetry disabled path should not create any file", name) + } +} + +func TestTelemetryStrictModeHashesIdentifiersEndToEnd(t *testing.T) { + telePath := filepath.Join(t.TempDir(), "strict-telemetry.db") + tele := telemetry.InitIfEnabled(telePath, true) + if tele == nil { + t.Fatalf("telemetry InitIfEnabled(strict) returned nil") + } + + runtime, err := New(Config{DBPath: filepath.Join(t.TempDir(), "memory.db"), Telemetry: tele}) + if err != nil { + t.Fatalf("New() error = %v", err) + } + + session, stop := startTelemetrySession(t, runtime) + defer stop() + ctx := context.Background() + + callOK(t, session, ctx, "remember", map[string]any{ + "entity": "Alice", + "observation": "confidential note attached to Alice", + }) + callOK(t, session, ctx, "recall", map[string]any{ + "query": "sensitive question about therapy", + "limit": 5, + }) + + // Same ownership discipline as the enabled roundtrip test: stop the + // runtime (which triggers Runtime.Close -> tele.Close) before reading + // the telemetry DB back. + stop() + + rdb, err := sql.Open("sqlite", "file:"+filepath.Clean(telePath)+"?_pragma=foreign_keys(1)") + if err != nil { + t.Fatalf("open strict telemetry db: %v", err) + } + defer rdb.Close() + + rows, err := rdb.Query(`SELECT args_summary FROM tool_calls`) + if err != nil { + t.Fatalf("query tool_calls: %v", err) + } + defer rows.Close() + for rows.Next() { + var args sql.NullString + if err := rows.Scan(&args); err != nil { + t.Fatalf("scan: %v", err) + } + for _, leak := range []string{"Alice", "sensitive question", "therapy"} { + if strings.Contains(args.String, leak) { + t.Fatalf("strict args_summary leaked %q: %s", leak, args.String) + } + } + } + + var smQuery sql.NullString + if err := rdb.QueryRow(`SELECT query FROM search_metrics`).Scan(&smQuery); err != nil { + t.Fatalf("read search_metrics.query: %v", err) + } + if strings.Contains(smQuery.String, "sensitive") || strings.Contains(smQuery.String, "therapy") { + t.Fatalf("strict search_metrics.query leaked plaintext: %q", smQuery.String) + } + if !strings.HasPrefix(smQuery.String, "sha256:") { + t.Fatalf("strict search_metrics.query not hashed: %q", smQuery.String) + } +} + +func callOK(t *testing.T, session *mcp.ClientSession, ctx context.Context, name string, args map[string]any) { + t.Helper() + result, err := session.CallTool(ctx, &mcp.CallToolParams{Name: name, Arguments: args}) + if err != nil { + t.Fatalf("CallTool(%s) error = %v", name, err) + } + if result.IsError { + t.Fatalf("CallTool(%s) returned tool error: %+v", name, result) + } +} + +// startTelemetrySession spins up an in-memory MCP client+server pair against +// the given runtime and returns a session plus a stop function that unwinds +// the session and the server goroutine cleanly. +func startTelemetrySession(t *testing.T, runtime *Runtime) (*mcp.ClientSession, func()) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + serverTransport, clientTransport := mcp.NewInMemoryTransports() + errCh := make(chan error, 1) + go func() { errCh <- runtime.Run(ctx, serverTransport) }() + + client := mcp.NewClient(&mcp.Implementation{Name: "telemetry-test-client", Version: "1.0.0"}, nil) + session, err := client.Connect(ctx, clientTransport, nil) + if err != nil { + cancel() + t.Fatalf("client.Connect() error = %v", err) + } + // Idempotent stop — tests call it explicitly before DB readback (so + // Runtime.Close flushes telemetry) and then again through defer for + // panic-safety. Both calls must converge without blocking or fataling. + var once sync.Once + stop := func() { + once.Do(func() { + _ = session.Close() + cancel() + select { + case <-errCh: + case <-time.After(2 * time.Second): + t.Error("runtime.Run() did not exit after client shutdown") + } + }) + } + return session, stop +} diff --git a/internal/store/parity_test.go b/internal/store/parity_test.go index 7e3fc1f..d89c7d0 100644 --- a/internal/store/parity_test.go +++ b/internal/store/parity_test.go @@ -32,7 +32,7 @@ func TestCoreMemoryParity(t *testing.T) { if _, err := AddObservation(db, entityID, "test observation", "user", 1.0); err != nil { t.Fatalf("AddObservation() error = %v", err) } - results, err := SearchMemory(db, "test", 5, 12) + results, _, err := SearchMemory(db, "test", 5, 12) if err != nil { t.Fatalf("SearchMemory() error = %v", err) } @@ -65,7 +65,7 @@ func TestCoreMemoryParity(t *testing.T) { if !tombstoned { t.Fatalf("observation tombstone not set") } - recalled, err := SearchMemory(db, "observation to forget", 5, 12) + recalled, _, err := SearchMemory(db, "observation to forget", 5, 12) if err != nil { t.Fatalf("SearchMemory() error = %v", err) } @@ -353,7 +353,7 @@ func TestProjectIsolationParity(t *testing.T) { if _, err := AddObservation(projectDB, entityID, "project fact", "user", 1.0); err != nil { t.Fatalf("AddObservation(project) error = %v", err) } - globalResults, err := SearchMemory(db, "ProjectOnly", 5, 12) + globalResults, _, err := SearchMemory(db, "ProjectOnly", 5, 12) if err != nil { t.Fatalf("SearchMemory(global) error = %v", err) } @@ -402,7 +402,7 @@ func TestRankingAndRecallParity(t *testing.T) { } t.Run("composite score present on results", func(t *testing.T) { - results, err := SearchMemory(db, "Zara", 10, 12) + results, _, err := SearchMemory(db, "Zara", 10, 12) if err != nil { t.Fatalf("SearchMemory() error = %v", err) } @@ -417,7 +417,7 @@ func TestRankingAndRecallParity(t *testing.T) { }) t.Run("entity exact outranks entity like", func(t *testing.T) { - results, err := SearchMemory(db, "Zara", 10, 12) + results, _, err := SearchMemory(db, "Zara", 10, 12) if err != nil { t.Fatalf("SearchMemory() error = %v", err) } @@ -530,3 +530,39 @@ func TestRememberEventAtomicityOnMidLoopFailure(t *testing.T) { t.Fatalf("Alpha entity persisted despite rollback: count=%d", alphaCount) } } + +// SearchMetrics.Query must match the trimmed query actually executed against +// the index — otherwise two recall calls that only differ by leading or +// trailing whitespace look like distinct queries in telemetry even though +// they ran the same search. +func TestSearchMemoryMetricsQueryIsTrimmed(t *testing.T) { + db := newTestDB(t, "metrics-trim.db") + entityID, err := UpsertEntity(db, "TrimProbe", "test") + if err != nil { + t.Fatalf("UpsertEntity: %v", err) + } + if _, err := AddObservation(db, entityID, "trim probe observation", "user", 1.0); err != nil { + t.Fatalf("AddObservation: %v", err) + } + + cases := []struct { + name, input, want string + }{ + {"no whitespace", "trim", "trim"}, + {"leading whitespace", " trim", "trim"}, + {"trailing whitespace", "trim ", "trim"}, + {"both sides", " trim ", "trim"}, + {"whitespace only", " ", ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, metrics, err := SearchMemory(db, tc.input, 5, 12) + if err != nil { + t.Fatalf("SearchMemory(%q): %v", tc.input, err) + } + if metrics.Query != tc.want { + t.Fatalf("SearchMemory(%q).Metrics.Query = %q, want %q", tc.input, metrics.Query, tc.want) + } + }) + } +} diff --git a/internal/store/search.go b/internal/store/search.go index acc908a..50c5018 100644 --- a/internal/store/search.go +++ b/internal/store/search.go @@ -29,6 +29,21 @@ type candidate struct { FTSPosition *int } +// SearchMetrics captures per-call ranking-pipeline observations, suitable for +// telemetry. SearchMemory populates every field except Compact (which depends +// on tool args and is set by the caller). +type SearchMetrics struct { + Query string + Channels map[string]int + CandidatesTotal int + ResultsReturned int + LimitRequested int + ScoreMin float64 + ScoreMax float64 + ScoreMedian float64 + Compact bool +} + type SearchObservation struct { ID int64 EntityID int64 @@ -315,14 +330,19 @@ func ScoreCandidates(observations []SearchObservation, candidateMap map[int64]*c return ranked } -func SearchMemory(db *sql.DB, query string, limit int, halfLifeWeeks float64) ([]SearchObservation, error) { +func SearchMemory(db *sql.DB, query string, limit int, halfLifeWeeks float64) ([]SearchObservation, SearchMetrics, error) { + requestedLimit := limit limit = SanitizeSearchLimit(limit) + // Use the trimmed query in metrics so two recall calls that only differ + // by leading/trailing whitespace do not look like different queries when + // they actually execute the same search. + trimmed := strings.TrimSpace(query) + emptyMetrics := SearchMetrics{Query: trimmed, LimitRequested: requestedLimit, Channels: map[string]int{}} if limit <= 0 { - return []SearchObservation{}, nil + return []SearchObservation{}, emptyMetrics, nil } - trimmed := strings.TrimSpace(query) if trimmed == "" { - return []SearchObservation{}, nil + return []SearchObservation{}, emptyMetrics, nil } collectLimit := limit * collectionMultiplier @@ -332,11 +352,12 @@ func SearchMemory(db *sql.DB, query string, limit int, halfLifeWeeks float64) ([ } candidates, err := CollectCandidates(db, trimmed, collectLimit, maxCandidates) if err != nil { - return nil, err + return nil, SearchMetrics{}, err } + channelCounts := countCandidateChannels(candidates) hydrated, err := HydrateCandidates(db, candidates) if err != nil { - return nil, err + return nil, SearchMetrics{}, err } ranked := ScoreCandidates(hydrated, candidates, halfLifeWeeks, limit) returnedIDs := make([]int64, 0, len(ranked)) @@ -344,9 +365,57 @@ func SearchMemory(db *sql.DB, query string, limit int, halfLifeWeeks float64) ([ returnedIDs = append(returnedIDs, item.ID) } if err := TouchObservations(db, returnedIDs); err != nil { - return nil, err + return nil, SearchMetrics{}, err + } + metrics := SearchMetrics{ + Query: trimmed, + Channels: channelCounts, + CandidatesTotal: len(candidates), + ResultsReturned: len(ranked), + LimitRequested: requestedLimit, + ScoreMin: scoreMin(ranked), + ScoreMax: scoreMax(ranked), + ScoreMedian: scoreMedian(ranked), + } + return ranked, metrics, nil +} + +func countCandidateChannels(m map[int64]*candidate) map[string]int { + out := make(map[string]int) + for _, c := range m { + for ch := range c.Channels { + out[ch]++ + } + } + return out +} + +// ranked is sorted descending by CompositeScore; the three helpers below +// exploit that ordering so we avoid a re-sort just for metrics. + +func scoreMax(rs []SearchObservation) float64 { + if len(rs) == 0 { + return 0 + } + return rs[0].CompositeScore +} + +func scoreMin(rs []SearchObservation) float64 { + if len(rs) == 0 { + return 0 + } + return rs[len(rs)-1].CompositeScore +} + +func scoreMedian(rs []SearchObservation) float64 { + n := len(rs) + if n == 0 { + return 0 + } + if n%2 == 1 { + return rs[n/2].CompositeScore } - return ranked, nil + return (rs[n/2-1].CompositeScore + rs[n/2].CompositeScore) / 2 } func GroupResults(results []SearchObservation, compact bool) RecallResponse { diff --git a/internal/store/tools.go b/internal/store/tools.go index 5cb38bb..376d5b1 100644 --- a/internal/store/tools.go +++ b/internal/store/tools.go @@ -127,7 +127,24 @@ type GetEventObservationsToolResult struct { Total int `json:"total,omitempty"` } +// HandleTool dispatches a tool call and discards the search metrics (if any). +// Callers that want access to ranking-pipeline metrics for telemetry should +// use HandleToolWithMetrics instead. func HandleTool(defaultDB *sql.DB, name string, args ToolArgs) (any, error) { + result, _, err := HandleToolWithMetrics(defaultDB, name, args) + return result, err +} + +// HandleToolWithMetrics dispatches a tool call and returns ranking-pipeline +// metrics when applicable (currently: "recall"). For other tools the metrics +// return is nil. +func HandleToolWithMetrics(defaultDB *sql.DB, name string, args ToolArgs) (any, *SearchMetrics, error) { + var metrics *SearchMetrics + result, err := dispatchTool(defaultDB, name, args, &metrics) + return result, metrics, err +} + +func dispatchTool(defaultDB *sql.DB, name string, args ToolArgs, outMetrics **SearchMetrics) (any, error) { if err := validateToolArgs(name, args); err != nil { return nil, err } @@ -181,10 +198,14 @@ func HandleTool(defaultDB *sql.DB, name string, args ToolArgs) (any, error) { if args.Limit != nil { limit = *args.Limit } - results, err := SearchMemory(db, args.Query, limit, halfLife) + results, m, err := SearchMemory(db, args.Query, limit, halfLife) if err != nil { return nil, err } + m.Compact = args.Compact + if outMetrics != nil { + *outMetrics = &m + } return GroupResults(results, args.Compact), nil case "recall_entity": diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go new file mode 100644 index 0000000..c402f0c --- /dev/null +++ b/internal/telemetry/client.go @@ -0,0 +1,292 @@ +// Package telemetry provides opt-in usage analytics for the workmem MCP server. +// +// The package is a no-op when MEMORY_TELEMETRY_PATH is unset — InitIfEnabled +// returns nil, and every method of *Client returns immediately when the +// receiver is nil. There is no global state, no side channel: the server +// passes a *Client down to wherever logging happens. +// +// When MEMORY_TELEMETRY_PRIVACY=strict is set, entity/query/label values in +// args_summary and search_metrics.query are sha256-hashed. observation +// content and structured array values are always reduced to counts, in any +// mode. +package telemetry + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + + _ "modernc.org/sqlite" +) + +// Client is a telemetry sink. A nil *Client is valid and represents the +// disabled state — every method below checks for nil and returns harmlessly. +// +// The mutex serializes Close against LogToolCall / LogSearchMetrics so the +// "tool call in flight while shutdown fires" scenario is safe even under +// -race. The uncontended lock cost (a few ns per call) is dwarfed by the +// SQLite Exec itself (microseconds), so there is no measurable hot-path +// impact. strict is set at construction and never mutated, so it is read +// outside the lock. +// +// degraded flips to true on the first Exec failure (disk full, permissions +// change, DB corruption) so subsequent Log* calls return silently instead +// of spamming stderr for every tool call. The underlying handles are left +// open — Close still cleans them up on shutdown — but the hot path becomes +// a cheap no-op for the rest of the session. Recovery requires a restart, +// which is the same contract as init failure. +type Client struct { + mu sync.Mutex + db *sql.DB + insertCall *sql.Stmt + insertSearch *sql.Stmt + strict bool + degraded bool +} + +// FromEnv reads MEMORY_TELEMETRY_PATH and MEMORY_TELEMETRY_PRIVACY from the +// environment and delegates to InitIfEnabled. Returns nil when the path is +// unset (telemetry disabled). Strict mode is enabled when +// MEMORY_TELEMETRY_PRIVACY is exactly "strict"; any other value (including +// empty) means permissive. +func FromEnv() *Client { + return InitIfEnabled( + os.Getenv("MEMORY_TELEMETRY_PATH"), + os.Getenv("MEMORY_TELEMETRY_PRIVACY") == "strict", + ) +} + +// InitIfEnabled opens (or creates) the telemetry SQLite database at the given +// path. If path is empty, returns nil (telemetry disabled). If init fails at +// any step, logs a single warning to stderr and returns nil — telemetry is +// never allowed to break the tool call path. +func InitIfEnabled(path string, strict bool) *Client { + if path == "" { + return nil + } + // Mirror the main memory DB's open pattern (see internal/store/sqlite.go + // openSQLite): a file: DSN with foreign_keys set at open time, cleaned + // path for cross-platform safety (notably Windows drive letters), single + // open connection for deterministic write ordering under concurrent tool + // calls, WAL for reader-friendly durability, and a non-zero busy_timeout + // so brief lock contention retries instead of erroring immediately. + dsn := fmt.Sprintf("file:%s?_pragma=foreign_keys(1)", filepath.Clean(path)) + db, err := sql.Open("sqlite", dsn) + if err != nil { + initWarn(err) + return nil + } + db.SetMaxOpenConns(1) + if err := db.Ping(); err != nil { + _ = db.Close() + initWarn(err) + return nil + } + // PRAGMA foreign_keys is also set via the DSN (_pragma=foreign_keys(1)), + // but openSQLite in the main store issues the PRAGMA explicitly after + // open anyway — some driver/version combinations honor one but not the + // other. Belt-and-suspenders. + for _, pragma := range []string{ + "PRAGMA journal_mode=WAL", + "PRAGMA foreign_keys=ON", + "PRAGMA busy_timeout=5000", + } { + if _, err := db.Exec(pragma); err != nil { + _ = db.Close() + initWarn(err) + return nil + } + } + for i, stmt := range schemaStatements { + if _, err := db.Exec(stmt); err != nil { + _ = db.Close() + initWarn(fmt.Errorf("schema statement %d: %w", i, err)) + return nil + } + } + insertCall, err := db.Prepare(insertCallSQL) + if err != nil { + _ = db.Close() + initWarn(err) + return nil + } + insertSearch, err := db.Prepare(insertSearchSQL) + if err != nil { + _ = insertCall.Close() + _ = db.Close() + initWarn(err) + return nil + } + return &Client{db: db, insertCall: insertCall, insertSearch: insertSearch, strict: strict} +} + +func initWarn(err error) { + fmt.Fprintf(os.Stderr, "[memory] telemetry init failed (disabled for this session): %v\n", err) +} + +// Close releases the prepared statements and the underlying database +// connection. Safe to call on a nil receiver. Idempotent: after the first +// call, fields are nil-ed so subsequent calls return nil instead of +// trying to close an already-closed *sql.DB (which would surface a +// confusing shutdown error). +// +// Thread-safe: the mutex serializes Close against any concurrent +// LogToolCall / LogSearchMetrics so the "shutdown while tool call in +// flight" scenario cannot observe half-closed state. A late log call +// that acquires the mutex after Close returns harmlessly because the +// nil-field guard fails. +func (c *Client) Close() error { + if c == nil { + return nil + } + c.mu.Lock() + defer c.mu.Unlock() + if c.db == nil { + return nil + } + insertCall := c.insertCall + insertSearch := c.insertSearch + db := c.db + c.insertCall = nil + c.insertSearch = nil + c.db = nil + if insertCall != nil { + _ = insertCall.Close() + } + if insertSearch != nil { + _ = insertSearch.Close() + } + return db.Close() +} + +// Strict reports whether privacy-strict mode is active. +func (c *Client) Strict() bool { + if c == nil { + return false + } + return c.strict +} + +// ToolCallInput captures a single tool invocation for telemetry. +type ToolCallInput struct { + Tool string + Client ClientInfo + DBScope string // "global" or "project" + ProjectPath string + DurationMs float64 + ArgsSummary string + ResultSummary string + IsError bool +} + +// LogToolCall inserts a tool_calls row. Returns the new row id on success or +// 0 on failure / disabled client. The returned id is used by LogSearchMetrics +// to link the search_metrics row back to its parent tool call. +// +// Thread-safe: acquires the client mutex so the insert cannot observe a +// half-closed state even if Close runs concurrently. A late call that +// arrives after Close returns 0 harmlessly because the nil-field guard +// fails under the lock. +func (c *Client) LogToolCall(in ToolCallInput) int64 { + if c == nil { + return 0 + } + c.mu.Lock() + defer c.mu.Unlock() + if c.db == nil || c.insertCall == nil || c.degraded { + return 0 + } + dbScope := in.DBScope + if dbScope == "" { + dbScope = "global" + } + res, err := c.insertCall.Exec( + in.Tool, + nullIfEmpty(in.Client.Name), + nullIfEmpty(in.Client.Version), + nullIfEmpty(in.Client.Source), + dbScope, + nullIfEmpty(in.ProjectPath), + in.DurationMs, + nullIfEmpty(in.ArgsSummary), + nullIfEmpty(in.ResultSummary), + boolToInt(in.IsError), + ) + if err != nil { + c.degraded = true + fmt.Fprintf(os.Stderr, "[memory] telemetry log failed (further errors suppressed for this session): %v\n", err) + return 0 + } + id, err := res.LastInsertId() + if err != nil { + return 0 + } + return id +} + +// SearchMetricsInput captures ranking-pipeline metrics for a single recall call. +type SearchMetricsInput struct { + ToolCallID int64 + Query string + Channels map[string]int + CandidatesTotal int + ResultsReturned int + LimitRequested int + ScoreMin float64 + ScoreMax float64 + ScoreMedian float64 + Compact bool +} + +// LogSearchMetrics inserts a search_metrics row linked to the tool_call id. +// No-op when client is nil or ToolCallID is 0 (the linking parent failed). +// Thread-safe: acquires the same mutex as LogToolCall / Close, so a closed +// client is observed as a no-op rather than panicking on a nil stmt. +// In strict mode, Query is hashed before insertion. +func (c *Client) LogSearchMetrics(in SearchMetricsInput) { + if c == nil || in.ToolCallID == 0 { + return + } + c.mu.Lock() + defer c.mu.Unlock() + if c.db == nil || c.insertSearch == nil || c.degraded { + return + } + channelsJSON, err := json.Marshal(in.Channels) + if err != nil { + channelsJSON = []byte("{}") + } + query := hashIfStrict(in.Query, c.strict) + if _, err := c.insertSearch.Exec( + in.ToolCallID, + nullIfEmpty(query), + string(channelsJSON), + in.CandidatesTotal, + in.ResultsReturned, + in.LimitRequested, + in.ScoreMin, + in.ScoreMax, + in.ScoreMedian, + boolToInt(in.Compact), + ); err != nil { + c.degraded = true + fmt.Fprintf(os.Stderr, "[memory] telemetry search log failed (further errors suppressed for this session): %v\n", err) + } +} + +func nullIfEmpty(s string) any { + if s == "" { + return nil + } + return s +} + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/internal/telemetry/client_test.go b/internal/telemetry/client_test.go new file mode 100644 index 0000000..d853cc0 --- /dev/null +++ b/internal/telemetry/client_test.go @@ -0,0 +1,305 @@ +package telemetry + +import ( + "bytes" + "database/sql" + "io" + "os" + "path/filepath" + "strings" + "sync" + "testing" +) + +func TestNilClientMethodsAreSafe(t *testing.T) { + var c *Client + if c.Strict() { + t.Fatalf("nil client should not be strict") + } + if got := c.LogToolCall(ToolCallInput{Tool: "noop"}); got != 0 { + t.Fatalf("nil client LogToolCall should return 0, got %d", got) + } + c.LogSearchMetrics(SearchMetricsInput{ToolCallID: 123, Query: "anything"}) + if err := c.Close(); err != nil { + t.Fatalf("nil client Close should be nil, got %v", err) + } +} + +func TestInitIfEnabledEmptyPathReturnsNil(t *testing.T) { + if got := InitIfEnabled("", false); got != nil { + t.Fatalf("InitIfEnabled(\"\", false) should return nil, got %+v", got) + } +} + +func TestInitIfEnabledInvalidPathReturnsNil(t *testing.T) { + // Build a path inside a non-existent subdirectory of t.TempDir() so the + // failure mode is identical across macOS, Linux, and Windows. + bad := filepath.Join(t.TempDir(), "missing-subdir", "telemetry.db") + if got := InitIfEnabled(bad, false); got != nil { + _ = got.Close() + t.Fatalf("InitIfEnabled on path inside missing directory should return nil, got client") + } +} + +func TestInitIfEnabledCreatesSchemaAndInserts(t *testing.T) { + path := filepath.Join(t.TempDir(), "telemetry.db") + c := InitIfEnabled(path, false) + if c == nil { + t.Fatalf("InitIfEnabled returned nil on valid path") + } + t.Cleanup(func() { _ = c.Close() }) + + id := c.LogToolCall(ToolCallInput{ + Tool: "remember", + Client: ClientInfo{Name: "claude-code", Source: "env"}, + DBScope: "global", + DurationMs: 1.23, + ArgsSummary: `{"entity":"Alice"}`, + ResultSummary: `{"stored":true}`, + }) + if id == 0 { + t.Fatalf("LogToolCall returned 0 on valid client") + } + + c.LogSearchMetrics(SearchMetricsInput{ + ToolCallID: id, + Query: "alice", + Channels: map[string]int{"fts": 3}, + CandidatesTotal: 3, + ResultsReturned: 1, + LimitRequested: 5, + ScoreMin: 0.1, + ScoreMax: 0.9, + ScoreMedian: 0.5, + }) + + // Read back from disk to prove persistence. Use the same DSN pattern as + // InitIfEnabled so the readback works consistently across Windows and + // POSIX (raw paths with drive letters can trip the driver). + rdb, err := sql.Open("sqlite", "file:"+filepath.Clean(path)+"?_pragma=foreign_keys(1)") + if err != nil { + t.Fatalf("open telemetry db for readback: %v", err) + } + defer rdb.Close() + + var toolCount, searchCount int + if err := rdb.QueryRow(`SELECT COUNT(*) FROM tool_calls`).Scan(&toolCount); err != nil { + t.Fatalf("count tool_calls: %v", err) + } + if err := rdb.QueryRow(`SELECT COUNT(*) FROM search_metrics`).Scan(&searchCount); err != nil { + t.Fatalf("count search_metrics: %v", err) + } + if toolCount != 1 { + t.Fatalf("tool_calls count = %d, want 1", toolCount) + } + if searchCount != 1 { + t.Fatalf("search_metrics count = %d, want 1", searchCount) + } + + var argsSummary, resultSummary, clientName sql.NullString + if err := rdb.QueryRow(`SELECT client_name, args_summary, result_summary FROM tool_calls WHERE id = ?`, id).Scan(&clientName, &argsSummary, &resultSummary); err != nil { + t.Fatalf("readback tool_calls: %v", err) + } + if clientName.String != "claude-code" { + t.Fatalf("client_name = %q, want claude-code", clientName.String) + } + if !strings.Contains(argsSummary.String, "Alice") { + t.Fatalf("args_summary missing entity: %q", argsSummary.String) + } +} + +func TestStrictModeHashesSearchQuery(t *testing.T) { + path := filepath.Join(t.TempDir(), "strict.db") + c := InitIfEnabled(path, true) + if c == nil { + t.Fatalf("InitIfEnabled strict returned nil") + } + t.Cleanup(func() { _ = c.Close() }) + + if !c.Strict() { + t.Fatalf("expected strict mode active") + } + + id := c.LogToolCall(ToolCallInput{Tool: "recall", DBScope: "project"}) + c.LogSearchMetrics(SearchMetricsInput{ + ToolCallID: id, + Query: "sensitive therapy question", + Channels: map[string]int{"fts": 1}, + CandidatesTotal: 1, + ResultsReturned: 1, + }) + + rdb, err := sql.Open("sqlite", "file:"+filepath.Clean(path)+"?_pragma=foreign_keys(1)") + if err != nil { + t.Fatalf("open db: %v", err) + } + defer rdb.Close() + var storedQuery sql.NullString + if err := rdb.QueryRow(`SELECT query FROM search_metrics WHERE tool_call_id = ?`, id).Scan(&storedQuery); err != nil { + t.Fatalf("readback: %v", err) + } + if strings.Contains(storedQuery.String, "sensitive") || strings.Contains(storedQuery.String, "therapy") { + t.Fatalf("strict mode leaked plaintext query: %q", storedQuery.String) + } + if !strings.HasPrefix(storedQuery.String, "sha256:") { + t.Fatalf("strict mode did not hash query, got %q", storedQuery.String) + } +} + +func TestClientCloseIsIdempotent(t *testing.T) { + path := filepath.Join(t.TempDir(), "close-twice.db") + c := InitIfEnabled(path, false) + if c == nil { + t.Fatalf("InitIfEnabled returned nil on valid path") + } + if err := c.Close(); err != nil { + t.Fatalf("first Close() = %v, want nil", err) + } + if err := c.Close(); err != nil { + t.Fatalf("second Close() = %v, want nil (Close should be idempotent)", err) + } + if err := c.Close(); err != nil { + t.Fatalf("third Close() = %v, want nil", err) + } +} + +// A late-arriving tool call that lands after the Runtime has begun shutting +// down must not panic. Close nils the db/stmt fields; LogToolCall and +// LogSearchMetrics must both degrade gracefully to a no-op. +func TestLogAfterCloseDoesNotPanic(t *testing.T) { + path := filepath.Join(t.TempDir(), "log-after-close.db") + c := InitIfEnabled(path, false) + if c == nil { + t.Fatalf("init failed") + } + if err := c.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + // LogToolCall after Close must return 0 without panicking. + if id := c.LogToolCall(ToolCallInput{Tool: "remember"}); id != 0 { + t.Fatalf("LogToolCall after Close returned id %d, want 0", id) + } + + // LogSearchMetrics after Close must silently no-op (no panic). + c.LogSearchMetrics(SearchMetricsInput{ToolCallID: 42, Query: "anything"}) +} + +// Under concurrent Close + LogToolCall, the client mutex must serialize +// operations so no goroutine observes a half-closed state. Run with +// `go test -race` to turn an unsynchronized access into a test failure +// instead of an occasional production panic. +func TestClientCloseRacesWithLogging(t *testing.T) { + path := filepath.Join(t.TempDir(), "race.db") + c := InitIfEnabled(path, false) + if c == nil { + t.Fatalf("init failed") + } + + const loggers = 32 + var wg sync.WaitGroup + start := make(chan struct{}) + + for i := 0; i < loggers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-start + // Fire several calls so Close has real overlap with active inserts. + for j := 0; j < 4; j++ { + _ = c.LogToolCall(ToolCallInput{Tool: "remember"}) + c.LogSearchMetrics(SearchMetricsInput{ToolCallID: 1, Query: "q"}) + } + }() + } + + close(start) + // Give the goroutines a moment to be mid-flight, then close. The mutex + // in Client must make this safe; without it, -race would flag the + // db/stmt pointer reads against the Close writes. + if err := c.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + wg.Wait() + + // A second set of calls after Close must be no-ops, not panics. + if id := c.LogToolCall(ToolCallInput{Tool: "post-close"}); id != 0 { + t.Fatalf("post-close LogToolCall returned %d, want 0", id) + } + c.LogSearchMetrics(SearchMetricsInput{ToolCallID: 99, Query: "post"}) +} + +// After the first Exec failure the client must flip to degraded mode and +// stop emitting stderr warnings on subsequent Log* calls — otherwise a +// wedged telemetry DB (disk full, permissions change, corruption) would +// spam stderr once per tool call for the rest of the session. +func TestClientDegradedModeSuppressesSpam(t *testing.T) { + path := filepath.Join(t.TempDir(), "degraded.db") + c := InitIfEnabled(path, false) + if c == nil { + t.Fatalf("init failed") + } + t.Cleanup(func() { _ = c.Close() }) + + // Redirect stderr into a pipe so we can count emitted warnings. Restore + // on cleanup even if the test fails early. + origStderr := os.Stderr + pr, pw, err := os.Pipe() + if err != nil { + t.Fatalf("pipe: %v", err) + } + os.Stderr = pw + t.Cleanup(func() { + os.Stderr = origStderr + }) + + // Force the next Exec to fail by closing the underlying sql.DB. The + // prepared statements still reference the now-closed pool so Exec will + // surface a "database is closed" error — exactly the kind of persistent + // failure degraded mode must mute after the first warning. + c.mu.Lock() + _ = c.db.Close() + c.mu.Unlock() + + // First call: Exec fails, warning emitted, degraded set. + if id := c.LogToolCall(ToolCallInput{Tool: "remember"}); id != 0 { + t.Fatalf("first LogToolCall after forced failure returned %d, want 0", id) + } + // Second + third calls: degraded now blocks Exec; no new warnings. + _ = c.LogToolCall(ToolCallInput{Tool: "remember"}) + c.LogSearchMetrics(SearchMetricsInput{ToolCallID: 1, Query: "q"}) + + // Close the write side and drain stderr. + _ = pw.Close() + os.Stderr = origStderr + var buf bytes.Buffer + if _, err := io.Copy(&buf, pr); err != nil { + t.Fatalf("drain stderr pipe: %v", err) + } + output := buf.String() + + if n := strings.Count(output, "telemetry log failed"); n != 1 { + t.Fatalf("expected exactly 1 'telemetry log failed' warning, got %d:\n%s", n, output) + } + if !c.degraded { + t.Fatalf("degraded flag not set after forced Exec failure") + } +} + +func TestLogSearchMetricsZeroToolCallIDIsNoop(t *testing.T) { + path := filepath.Join(t.TempDir(), "noop.db") + c := InitIfEnabled(path, false) + if c == nil { + t.Fatalf("init failed") + } + t.Cleanup(func() { _ = c.Close() }) + + c.LogSearchMetrics(SearchMetricsInput{ToolCallID: 0, Query: "anything"}) + var count int + if err := c.db.QueryRow(`SELECT COUNT(*) FROM search_metrics`).Scan(&count); err != nil { + t.Fatalf("count: %v", err) + } + if count != 0 { + t.Fatalf("LogSearchMetrics with ToolCallID=0 inserted row, count=%d", count) + } +} diff --git a/internal/telemetry/detect.go b/internal/telemetry/detect.go new file mode 100644 index 0000000..1887053 --- /dev/null +++ b/internal/telemetry/detect.go @@ -0,0 +1,39 @@ +package telemetry + +import "os" + +// ClientInfo identifies the MCP client currently calling the server. +type ClientInfo struct { + Name string + Version string + Source string // "protocol" | "env" | "none" +} + +// DetectClient resolves the active client identity. Priority: +// 1. protocolName / protocolVersion from the MCP initialize handshake +// 2. environment variables set by known MCP clients +// 3. ClientInfo{Name: "unknown", Source: "none"} +func DetectClient(protocolName, protocolVersion string) ClientInfo { + if protocolName != "" { + return ClientInfo{Name: protocolName, Version: protocolVersion, Source: "protocol"} + } + if os.Getenv("KILO") != "" { + return ClientInfo{Name: "kilo", Version: os.Getenv("KILOCODE_VERSION"), Source: "env"} + } + if os.Getenv("CLAUDE_CODE_SSE_PORT") != "" { + return ClientInfo{Name: "claude-code", Source: "env"} + } + if os.Getenv("CURSOR_TRACE_ID") != "" { + return ClientInfo{Name: "cursor", Source: "env"} + } + if os.Getenv("WINDSURF_EXTENSION_ID") != "" { + return ClientInfo{Name: "windsurf", Source: "env"} + } + if os.Getenv("VSCODE_MCP_HTTP_PREFER") != "" { + return ClientInfo{Name: "vscode-copilot", Source: "env"} + } + if os.Getenv("TERM_PROGRAM") == "vscode" { + return ClientInfo{Name: "vscode-unknown", Source: "env"} + } + return ClientInfo{Name: "unknown", Source: "none"} +} diff --git a/internal/telemetry/detect_test.go b/internal/telemetry/detect_test.go new file mode 100644 index 0000000..3dc1efd --- /dev/null +++ b/internal/telemetry/detect_test.go @@ -0,0 +1,69 @@ +package telemetry + +import ( + "testing" +) + +func TestDetectClientProtocolWins(t *testing.T) { + t.Setenv("KILO", "1") + info := DetectClient("claude-desktop", "0.9.1") + if info.Name != "claude-desktop" || info.Version != "0.9.1" || info.Source != "protocol" { + t.Fatalf("protocol should win, got %+v", info) + } +} + +func TestDetectClientEnvFallbackKilo(t *testing.T) { + unsetMCPClientEnv(t) + t.Setenv("KILO", "1") + t.Setenv("KILOCODE_VERSION", "0.43.6") + info := DetectClient("", "") + if info.Name != "kilo" || info.Version != "0.43.6" || info.Source != "env" { + t.Fatalf("kilo env not detected: %+v", info) + } +} + +func TestDetectClientEnvFallbackClaudeCode(t *testing.T) { + unsetMCPClientEnv(t) + t.Setenv("CLAUDE_CODE_SSE_PORT", "7123") + info := DetectClient("", "") + if info.Name != "claude-code" || info.Source != "env" { + t.Fatalf("claude-code not detected: %+v", info) + } +} + +func TestDetectClientVSCodeCopilotRequiresNonEmptyVar(t *testing.T) { + unsetMCPClientEnv(t) + t.Setenv("VSCODE_MCP_HTTP_PREFER", "auto") + info := DetectClient("", "") + if info.Name != "vscode-copilot" || info.Source != "env" { + t.Fatalf("vscode-copilot non-empty var not detected: %+v", info) + } +} + +func TestDetectClientNoSignals(t *testing.T) { + unsetMCPClientEnv(t) + info := DetectClient("", "") + if info.Name != "unknown" || info.Source != "none" { + t.Fatalf("no-signal detection wrong: %+v", info) + } +} + +func unsetMCPClientEnv(t *testing.T) { + t.Helper() + for _, v := range []string{ + "KILO", "KILOCODE_VERSION", + "CLAUDE_CODE_SSE_PORT", + "CURSOR_TRACE_ID", + "WINDSURF_EXTENSION_ID", + "VSCODE_MCP_HTTP_PREFER", + "TERM_PROGRAM", + } { + t.Setenv(v, "") + } + // These tests model "absent" env vars by setting them to "". That + // matches the current DetectClient implementation, which uses + // os.Getenv(...) != "" rather than os.LookupEnv, so empty-string + // values are treated as absent for every signal. The non-empty + // requirement on VSCODE_MCP_HTTP_PREFER is covered by its own + // dedicated test. +} diff --git a/internal/telemetry/hash.go b/internal/telemetry/hash.go new file mode 100644 index 0000000..37634e0 --- /dev/null +++ b/internal/telemetry/hash.go @@ -0,0 +1,25 @@ +package telemetry + +import ( + "crypto/sha256" + "encoding/hex" +) + +// hashString returns sha256: for a non-empty input, or "" for empty input. +func hashString(s string) string { + if s == "" { + return "" + } + h := sha256.Sum256([]byte(s)) + return "sha256:" + hex.EncodeToString(h[:]) +} + +// hashIfStrict returns s unchanged unless strict is true, in which case s is +// replaced with its sha256 hash (prefixed with "sha256:"). Empty input always +// returns empty. +func hashIfStrict(s string, strict bool) string { + if !strict { + return s + } + return hashString(s) +} diff --git a/internal/telemetry/hash_test.go b/internal/telemetry/hash_test.go new file mode 100644 index 0000000..c65f532 --- /dev/null +++ b/internal/telemetry/hash_test.go @@ -0,0 +1,39 @@ +package telemetry + +import ( + "strings" + "testing" +) + +func TestHashStringDeterministicAndPrefixed(t *testing.T) { + a := hashString("hello") + b := hashString("hello") + if a != b { + t.Fatalf("hashString not deterministic: %q vs %q", a, b) + } + if !strings.HasPrefix(a, "sha256:") { + t.Fatalf("hashString missing sha256: prefix: %q", a) + } + if len(a) != len("sha256:")+64 { + t.Fatalf("hashString unexpected length %d: %q", len(a), a) + } +} + +func TestHashStringEmptyInputReturnsEmpty(t *testing.T) { + if got := hashString(""); got != "" { + t.Fatalf("hashString(\"\") = %q, want empty", got) + } +} + +func TestHashIfStrict(t *testing.T) { + if got := hashIfStrict("alpha", false); got != "alpha" { + t.Fatalf("hashIfStrict(strict=false) should pass through, got %q", got) + } + got := hashIfStrict("alpha", true) + if !strings.HasPrefix(got, "sha256:") { + t.Fatalf("hashIfStrict(strict=true) should hash, got %q", got) + } + if got == "alpha" { + t.Fatalf("hashIfStrict(strict=true) returned plaintext") + } +} diff --git a/internal/telemetry/sanitize.go b/internal/telemetry/sanitize.go new file mode 100644 index 0000000..e8327a9 --- /dev/null +++ b/internal/telemetry/sanitize.go @@ -0,0 +1,137 @@ +package telemetry + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + "unicode/utf8" +) + +// encodeNoEscape marshals v as JSON without escaping <, >, & — telemetry +// rows are never rendered as HTML, and the escapes obscure sentinel markers +// like "<51 chars>" that must remain readable in the DB. +func encodeNoEscape(v any) (string, error) { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.SetEscapeHTML(false) + if err := enc.Encode(v); err != nil { + return "", err + } + return strings.TrimRight(buf.String(), "\n"), nil +} + +// SanitizeArgs produces the args_summary string for a tool call. +// +// Rules: +// - "observation", "content", "context" string values are replaced with +// "" (length only, never content) +// - "facts" and "observations" arrays are replaced with "" / +// "" (count only, never contents) +// - "entity", "from", "to", "label", "query" string values are hashed when +// strict is true; left unchanged otherwise +// - all other fields pass through unchanged +func SanitizeArgs(args map[string]any, strict bool) string { + if args == nil { + return "" + } + safe := make(map[string]any, len(args)) + for k, v := range args { + switch k { + case "observation", "content", "context": + if s, ok := v.(string); ok { + safe[k] = fmt.Sprintf("<%d chars>", utf8.RuneCountInString(s)) + continue + } + safe[k] = v + case "facts": + if arr, ok := v.([]any); ok { + safe[k] = fmt.Sprintf("<%d facts>", len(arr)) + continue + } + safe[k] = v + case "observations": + if arr, ok := v.([]any); ok { + safe[k] = fmt.Sprintf("<%d observations>", len(arr)) + continue + } + safe[k] = v + case "entity", "from", "to", "label", "query": + if s, ok := v.(string); ok { + safe[k] = hashIfStrict(s, strict) + continue + } + safe[k] = v + default: + safe[k] = v + } + } + out, err := encodeNoEscape(safe) + if err != nil { + return "" + } + return out +} + +// Summarizable is implemented by result types that want to produce their +// own telemetry summary without going through the JSON round-trip fallback. +// This is the fast path: types returning large payloads (large recall +// results, big graph dumps) can implement this to avoid the cost of a +// full Marshal + Unmarshal on every tool call. +// +// The returned map must contain only count/boolean/short-string values — +// never the actual payload content. SummarizeResult serializes the map as +// JSON without any further inspection. +type Summarizable interface { + TelemetrySummary() map[string]any +} + +// SummarizeResult extracts count-only telemetry fields from a tool result. +// It never leaks content: only counts, booleans, and structural indicators. +// +// Fast path: if the result implements Summarizable, its TelemetrySummary() +// map is serialized directly. Fallback: JSON round-trip so the telemetry +// package stays domain-agnostic for types that do not opt in. +func SummarizeResult(result any) string { + if result == nil { + return "" + } + if s, ok := result.(Summarizable); ok { + out, err := encodeNoEscape(s.TelemetrySummary()) + if err != nil { + return "" + } + return out + } + raw, err := json.Marshal(result) + if err != nil { + return "" + } + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + return "" + } + out := make(map[string]any, 8) + for _, k := range []string{"total_facts", "total", "stored", "deleted", "created", "found", "observations_attached", "compact"} { + if v, ok := m[k]; ok { + out[k] = v + } + } + if arr, ok := m["results"].([]any); ok { + out["entity_groups"] = len(arr) + } + if arr, ok := m["entities"].([]any); ok { + out["entities"] = len(arr) + } + if arr, ok := m["events"].([]any); ok { + out["events"] = len(arr) + } + if arr, ok := m["facts"].([]any); ok { + out["facts"] = len(arr) + } + encoded, err := encodeNoEscape(out) + if err != nil { + return "" + } + return encoded +} diff --git a/internal/telemetry/sanitize_test.go b/internal/telemetry/sanitize_test.go new file mode 100644 index 0000000..7be6763 --- /dev/null +++ b/internal/telemetry/sanitize_test.go @@ -0,0 +1,140 @@ +package telemetry + +import ( + "strings" + "testing" +) + +func TestSanitizeArgsStripsObservationContent(t *testing.T) { + got := SanitizeArgs(map[string]any{ + "entity": "Alice", + "observation": "this is sensitive content that must never be logged", + }, false) + if strings.Contains(got, "sensitive content") { + t.Fatalf("observation content leaked in args_summary: %s", got) + } + if !strings.Contains(got, "<51 chars>") { + t.Fatalf("observation length marker missing: %s", got) + } + if !strings.Contains(got, "Alice") { + t.Fatalf("entity should be plaintext in permissive mode: %s", got) + } +} + +func TestSanitizeArgsStripsFactsAndObservationsArrays(t *testing.T) { + got := SanitizeArgs(map[string]any{ + "facts": []any{map[string]any{}, map[string]any{}, map[string]any{}}, + "observations": []any{map[string]any{}, map[string]any{}}, + }, false) + if !strings.Contains(got, "<3 facts>") { + t.Fatalf("facts count marker missing: %s", got) + } + if !strings.Contains(got, "<2 observations>") { + t.Fatalf("observations count marker missing: %s", got) + } +} + +func TestSanitizeArgsStrictModeHashesIdentifiers(t *testing.T) { + got := SanitizeArgs(map[string]any{ + "entity": "Alice", + "from": "Bob", + "to": "Carol", + "label": "therapy session", + "query": "anxiety", + "limit": 10, + }, true) + for _, leak := range []string{"Alice", "Bob", "Carol", "therapy session", "anxiety"} { + if strings.Contains(got, leak) { + t.Fatalf("strict mode leaked %q in args_summary: %s", leak, got) + } + } + if !strings.Contains(got, "sha256:") { + t.Fatalf("strict mode did not hash identifiers: %s", got) + } + if !strings.Contains(got, "\"limit\":10") { + t.Fatalf("non-identifier fields should pass through in strict mode: %s", got) + } +} + +func TestSanitizeArgsPermissiveModeKeepsIdentifiers(t *testing.T) { + got := SanitizeArgs(map[string]any{ + "entity": "Alice", + "query": "preferences", + }, false) + if !strings.Contains(got, "Alice") { + t.Fatalf("permissive mode should keep entity plaintext: %s", got) + } + if !strings.Contains(got, "preferences") { + t.Fatalf("permissive mode should keep query plaintext: %s", got) + } +} + +func TestSanitizeArgsNilReturnsEmpty(t *testing.T) { + if got := SanitizeArgs(nil, false); got != "" { + t.Fatalf("SanitizeArgs(nil) = %q, want empty", got) + } +} + +type summarizableFake struct { + countA int + countB int +} + +func (s summarizableFake) TelemetrySummary() map[string]any { + return map[string]any{"a": s.countA, "b": s.countB} +} + +func TestSummarizeResultUsesFastPathForSummarizable(t *testing.T) { + got := SummarizeResult(summarizableFake{countA: 3, countB: 7}) + if !strings.Contains(got, "\"a\":3") || !strings.Contains(got, "\"b\":7") { + t.Fatalf("SummarizeResult fast path missing fields: %s", got) + } +} + +func TestSummarizeResultFallsBackToJSONForNonSummarizable(t *testing.T) { + // Plain map — no Summarizable interface, must go through fallback + got := SummarizeResult(map[string]any{ + "stored": true, + "results": []any{map[string]any{}, map[string]any{}}, + }) + if !strings.Contains(got, "\"stored\":true") { + t.Fatalf("fallback missing stored: %s", got) + } + if !strings.Contains(got, "\"entity_groups\":2") { + t.Fatalf("fallback missing entity_groups: %s", got) + } +} + +func TestSummarizeResultReturnsCountsOnly(t *testing.T) { + result := map[string]any{ + "stored": true, + "results": []any{ + map[string]any{"entity": "A", "observations": []any{"secret content 1"}}, + map[string]any{"entity": "B", "observations": []any{"secret content 2"}}, + }, + "total": 5, + "compact": false, + } + got := SummarizeResult(result) + if strings.Contains(got, "secret content") { + t.Fatalf("SummarizeResult leaked content: %s", got) + } + if strings.Contains(got, "\"A\"") || strings.Contains(got, "\"B\"") { + t.Fatalf("SummarizeResult leaked entity names: %s", got) + } + if !strings.Contains(got, "\"entity_groups\":2") { + t.Fatalf("SummarizeResult missing entity_groups count: %s", got) + } + if !strings.Contains(got, "\"stored\":true") { + t.Fatalf("SummarizeResult missing stored: %s", got) + } + if !strings.Contains(got, "\"total\":5") { + t.Fatalf("SummarizeResult missing total: %s", got) + } +} + +func TestSummarizeResultNilReturnsEmpty(t *testing.T) { + if got := SummarizeResult(nil); got != "" { + t.Fatalf("SummarizeResult(nil) = %q, want empty", got) + } +} diff --git a/internal/telemetry/schema.go b/internal/telemetry/schema.go new file mode 100644 index 0000000..3aa8ab3 --- /dev/null +++ b/internal/telemetry/schema.go @@ -0,0 +1,48 @@ +package telemetry + +// schemaStatements is executed one-by-one by InitIfEnabled. The main store +// follows the same single-statement-per-Exec pattern (see +// internal/store/sqlite.go InitSchema) — more portable across SQLite +// drivers than chaining multiple statements into one db.Exec call. +var schemaStatements = []string{ + `CREATE TABLE IF NOT EXISTS tool_calls ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f', 'now')), + tool TEXT NOT NULL, + client_name TEXT, + client_version TEXT, + client_source TEXT, + db_scope TEXT NOT NULL DEFAULT 'global', + project_path TEXT, + duration_ms REAL, + args_summary TEXT, + result_summary TEXT, + is_error INTEGER NOT NULL DEFAULT 0 +)`, + `CREATE TABLE IF NOT EXISTS search_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tool_call_id INTEGER REFERENCES tool_calls(id), + query TEXT, + channels TEXT, + candidates_total INTEGER, + results_returned INTEGER, + limit_requested INTEGER, + score_min REAL, + score_max REAL, + score_median REAL, + compact INTEGER DEFAULT 0 +)`, + `CREATE INDEX IF NOT EXISTS idx_tool_calls_ts ON tool_calls(ts)`, + `CREATE INDEX IF NOT EXISTS idx_tool_calls_tool ON tool_calls(tool)`, + `CREATE INDEX IF NOT EXISTS idx_search_metrics_tool_call ON search_metrics(tool_call_id)`, +} + +const ( + insertCallSQL = `INSERT INTO tool_calls + (tool, client_name, client_version, client_source, db_scope, project_path, duration_ms, args_summary, result_summary, is_error) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + insertSearchSQL = `INSERT INTO search_metrics + (tool_call_id, query, channels, candidates_total, results_returned, limit_requested, score_min, score_max, score_median, compact) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` +)