diff --git a/api/routers/query.py b/api/routers/query.py index c8a126b..a81b6e4 100644 --- a/api/routers/query.py +++ b/api/routers/query.py @@ -647,7 +647,8 @@ async def close_stream(stream): sampled=False, ) if full_content.strip(): - yield emit("chunk", {"chunk": "\n\n[Connection interrupted. Partial technical response delivered.]"}) + mode_label = "technical " if mode == TECHNICAL_MODE else "" + yield emit("chunk", {"chunk": f"\n\n[Connection interrupted. Partial {mode_label}response delivered.]"}) yield emit("done", "[DONE]") if full_content.strip(): await cache_set(_cache_key(topic, level, mode), {"text": full_content}) @@ -723,56 +724,101 @@ async def close_stream(stream): ) -async def save_to_history(user, topic: str, levels: list[str], mode: str): - """Background task to save query to history.""" +async def save_to_history(user, topic: str, levels: list[str], mode: str) -> None: + """ + Persist a query to the user's history. + + Failures are logged as errors with full context but do not propagate — + history loss is preferable to crashing the response task. + Typically called via _persist_history_safely() for bounded execution. + """ + user_id_hash = anonymize_user_id(str(getattr(user, "id", "") or "")) + topic_hash = anonymize_text(topic) + try: await ensure_user_exists(user) - supabase = get_supabase_admin() - if not supabase: - logger.error("save_to_history_task_no_supabase_admin") - return - - def _fetch_existing(): - return ( - supabase.table("history") - .select("id, levels") - .eq("user_id", user.id) - .eq("topic", topic) - .execute() - ) + except Exception as exc: + logger.error( + "save_to_history_ensure_user_failed", + error=str(exc), + error_type=type(exc).__name__, + user_id_hash=user_id_hash, + sampled=False, + ) + return # cannot proceed without a valid user row - existing = await asyncio.to_thread(_fetch_existing) + supabase = get_supabase_admin() + if not supabase: + logger.error( + "save_to_history_no_supabase_admin", + user_id_hash=user_id_hash, + sampled=False, + ) + return - normalized_mode = normalize_mode(mode) + normalized_mode = normalize_mode(mode) + try: + existing = await asyncio.to_thread( + lambda: supabase.table("history") + .select("id, levels") + .eq("user_id", user.id) + .eq("topic", topic) + .execute() + ) + except Exception as exc: + logger.error( + "save_to_history_fetch_failed", + error=str(exc), + error_type=type(exc).__name__, + user_id_hash=user_id_hash, + topic_hash=topic_hash, + sampled=False, + ) + return + + try: data = getattr(existing, "data", None) if isinstance(data, list) and data and isinstance(data[0], dict): item_id = data[0].get("id") existing_levels = set(data[0].get("levels") or []) new_levels = list(existing_levels.union(set(levels))) - def _update_existing(): - return ( - supabase.table("history") - .update({"levels": new_levels, "mode": normalized_mode}) - .eq("id", item_id) - .execute() - ) - - await asyncio.to_thread(_update_existing) + await asyncio.to_thread( + lambda: supabase.table("history") + .update({"levels": new_levels, "mode": normalized_mode}) + .eq("id", item_id) + .execute() + ) + logger.debug( + "save_to_history_updated", + user_id_hash=user_id_hash, + topic_hash=topic_hash, + mode=normalized_mode, + ) else: - def _insert_new(): - return ( - supabase.table("history") - .insert({"user_id": user.id, "topic": topic, "levels": levels, "mode": normalized_mode}) - .execute() - ) - - await asyncio.to_thread(_insert_new) + await asyncio.to_thread( + lambda: supabase.table("history") + .insert({ + "user_id": user.id, + "topic": topic, + "levels": levels, + "mode": normalized_mode, + }) + .execute() + ) + logger.debug( + "save_to_history_inserted", + user_id_hash=user_id_hash, + topic_hash=topic_hash, + mode=normalized_mode, + ) except Exception as exc: logger.error( - "save_to_history_task_error", + "save_to_history_write_failed", error=str(exc), - user_id_hash=anonymize_user_id(str(getattr(user, "id", "") or "") or None), - topic_hash=anonymize_text(topic), + error_type=type(exc).__name__, + user_id_hash=user_id_hash, + topic_hash=topic_hash, + mode=normalized_mode, sampled=False, ) diff --git a/api/tests/test_query_router.py b/api/tests/test_query_router.py index 0a81f42..4e6efd7 100644 --- a/api/tests/test_query_router.py +++ b/api/tests/test_query_router.py @@ -77,6 +77,61 @@ async def fake_auth(): assert elapsed >= 0.05 +@pytest.mark.asyncio +async def test_save_to_history_logs_error_and_returns_when_supabase_unavailable( + monkeypatch, +): + """save_to_history must not raise — it logs and returns on any failure.""" + user = SimpleNamespace(id="user-hist", email="h@example.com", user_metadata={}) + errors_logged = [] + + async def fake_ensure_user_exists(_user): + pass # succeed + + def fake_get_supabase_admin(): + return None # simulate unavailable + + def fake_log_error(event, **kwargs): + errors_logged.append(event) + + monkeypatch.setattr(query_module, "ensure_user_exists", fake_ensure_user_exists) + monkeypatch.setattr(query_module, "get_supabase_admin", fake_get_supabase_admin) + monkeypatch.setattr(query_module.logger, "error", fake_log_error) + + # Must not raise + await query_module.save_to_history(user, "topic", ["eli5"], "learning") + + assert any("no_supabase_admin" in e for e in errors_logged) + + +@pytest.mark.asyncio +async def test_save_to_history_logs_error_on_fetch_failure(monkeypatch): + """save_to_history must log fetch errors and not propagate them.""" + user = SimpleNamespace(id="user-hist2", email="h2@example.com", user_metadata={}) + errors_logged = [] + + async def fake_ensure_user_exists(_user): + pass + + class BrokenSupabase: + def table(self, _name): + raise RuntimeError("connection refused") + + def fake_get_supabase_admin(): + return BrokenSupabase() + + def fake_log_error(event, **kwargs): + errors_logged.append(event) + + monkeypatch.setattr(query_module, "ensure_user_exists", fake_ensure_user_exists) + monkeypatch.setattr(query_module, "get_supabase_admin", fake_get_supabase_admin) + monkeypatch.setattr(query_module.logger, "error", fake_log_error) + + await query_module.save_to_history(user, "topic", ["eli5"], "learning") + + assert any("fetch_failed" in e for e in errors_logged) + + @pytest.mark.asyncio async def test_query_invalid_topic(app_client): resp = await app_client.post( diff --git a/src/lib/chatStoreUtils.ts b/src/lib/chatStoreUtils.ts index 000b9d3..8f94a3b 100644 --- a/src/lib/chatStoreUtils.ts +++ b/src/lib/chatStoreUtils.ts @@ -181,7 +181,12 @@ export const notifyError = (message: string) => { } }; -export const resolveMessageKey = (message: Message) => { +/** + * Returns the canonical store key for a message. + * Preference order: clientGeneratedId > assistant_client_id > client_id > serverMessageId > id + * This key is used as the dict key in messagesById and as entries in messageIds. + */ +export const resolveMessageKey = (message: Message): string => { return ( message.clientGeneratedId || message.metadata?.assistant_client_id || @@ -191,69 +196,57 @@ export const resolveMessageKey = (message: Message) => { ); }; -export const messagesMatch = (existing: Message, incoming: Message) => { - if (existing.id === incoming.id) return true; - if ( - existing.clientGeneratedId && - incoming.clientGeneratedId && - existing.clientGeneratedId === incoming.clientGeneratedId - ) { +/** + * Returns true if two message objects refer to the same logical message. + * + * Resolution order: + * 1. Canonical clientId match (set at message creation, most reliable) + * 2. Server-assigned id match (for messages loaded from the database) + * 3. Legacy fallback for messages created before this change + * + * Do not add new branches. If a new identity field is introduced, + * add it to the canonical clientId derivation in resolveMessageKey() + * instead. + */ +export const messagesMatch = (existing: Message, incoming: Message): boolean => { + // 1. Canonical client ID — most reliable, set at creation time + const existingClientId = + existing.clientGeneratedId || + existing.metadata?.assistant_client_id || + existing.metadata?.client_id; + + const incomingClientId = + incoming.clientGeneratedId || + incoming.metadata?.assistant_client_id || + incoming.metadata?.client_id; + + if (existingClientId && incomingClientId && existingClientId === incomingClientId) { return true; } - if ( - incoming.metadata?.assistant_client_id && - existing.clientGeneratedId === incoming.metadata.assistant_client_id - ) { - return true; - } - if ( - existing.metadata?.assistant_client_id && - incoming.clientGeneratedId && - existing.metadata.assistant_client_id === incoming.clientGeneratedId - ) { + + // 2. Server-assigned UUID — for messages loaded from Supabase that never + // had a client ID (e.g. messages from a previous session) + if (existing.id && incoming.id && existing.id === incoming.id) { return true; } + + // 3. Cross-reference: server id on one side, serverMessageId on the other + // Handles the transition window when a local message gets its server id if ( existing.serverMessageId && incoming.id && existing.serverMessageId === incoming.id - ) + ) { return true; + } if ( incoming.serverMessageId && existing.id && incoming.serverMessageId === existing.id - ) - return true; - if ( - incoming.metadata?.client_id && - existing.id === incoming.metadata.client_id - ) - return true; - if ( - existing.metadata?.client_id && - existing.metadata.client_id === incoming.id - ) - return true; - if ( - incoming.metadata?.client_id && - existing.clientGeneratedId === incoming.metadata.client_id - ) - return true; - if ( - existing.metadata?.client_id && - incoming.clientGeneratedId && - existing.metadata.client_id === incoming.clientGeneratedId - ) { - return true; - } - if ( - incoming.metadata?.client_id && - existing.metadata?.client_id && - existing.metadata.client_id === incoming.metadata.client_id ) { return true; } + return false; }; diff --git a/src/stores/useChatStore.test.ts b/src/stores/useChatStore.test.ts index 8a79eca..c3c1007 100644 --- a/src/stores/useChatStore.test.ts +++ b/src/stores/useChatStore.test.ts @@ -1,6 +1,7 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { useConversationStore } from "../stores/useConversationStore"; import { useMessageStore } from "../stores/useMessageStore"; +import type { Message } from "../types/chat"; type MockBuilder = { select: () => MockBuilder; @@ -321,6 +322,78 @@ describe("useChatStore", () => { }); }); +describe("messagesMatch", () => { + beforeEach(() => { + resetStore(); + }); + + it("matches on shared clientGeneratedId", () => { + const base: Message = { + id: "server-1", + role: "assistant", + content: "hello", + created_at: "2026-01-01T00:00:00Z", + clientGeneratedId: "client-abc", + }; + const incoming: Message = { + id: "server-1", + role: "assistant", + content: "hello updated", + created_at: "2026-01-01T00:00:00Z", + clientGeneratedId: "client-abc", + }; + useMessageStore.getState().clearMessages(); + useMessageStore.getState().addMessage(base); + useMessageStore.getState().addMessage(incoming); + expect(useMessageStore.getState().messageIds).toHaveLength(1); + expect( + useMessageStore.getState().messagesById[ + useMessageStore.getState().messageIds[0] + ].content, + ).toBe("hello updated"); + }); + + it("matches on server id after client id is absent", () => { + const base: Message = { + id: "server-2", + role: "assistant", + content: "original", + created_at: "2026-01-01T00:00:00Z", + }; + const incoming: Message = { + id: "server-2", + role: "assistant", + content: "updated", + created_at: "2026-01-01T00:00:00Z", + }; + useMessageStore.getState().clearMessages(); + useMessageStore.getState().addMessage(base); + useMessageStore.getState().addMessage(incoming); + expect(useMessageStore.getState().messageIds).toHaveLength(1); + }); + + it("does NOT match messages with different client ids", () => { + const base: Message = { + id: "server-3", + role: "assistant", + content: "first", + created_at: "2026-01-01T00:00:00Z", + clientGeneratedId: "client-x", + }; + const incoming: Message = { + id: "server-4", + role: "assistant", + content: "second", + created_at: "2026-01-01T00:00:00Z", + clientGeneratedId: "client-y", + }; + useMessageStore.getState().clearMessages(); + useMessageStore.getState().addMessage(base); + useMessageStore.getState().addMessage(incoming); + expect(useMessageStore.getState().messageIds).toHaveLength(2); + }); +}); + const seedConversation = () => { const now = new Date().toISOString(); useChatStore.setState({ @@ -352,6 +425,10 @@ describe("useChatStore streaming", () => { vi.useRealTimers(); }); + afterEach(() => { + vi.unstubAllGlobals(); + }); + it("aborts the underlying fetch when a stream read timeout occurs", async () => { let abortFired = false;