diff --git a/openviking/session/compressor.py b/openviking/session/compressor.py index 927d54c6..b8d65de9 100644 --- a/openviking/session/compressor.py +++ b/openviking/session/compressor.py @@ -137,6 +137,7 @@ async def extract_long_term_memories( user: Optional["UserIdentifier"] = None, session_id: Optional[str] = None, ctx: Optional[RequestContext] = None, + strict_extract_errors: bool = False, ) -> List[Context]: """Extract long-term memories from messages.""" if not messages: @@ -146,7 +147,12 @@ async def extract_long_term_memories( if not ctx: return [] - candidates = await self.extractor.extract(context, user, session_id) + if strict_extract_errors: + # Intentionally let extraction errors bubble up so caller (task tracker) + # can mark background commit tasks as failed with an explicit error. + candidates = await self.extractor.extract_strict(context, user, session_id) + else: + candidates = await self.extractor.extract(context, user, session_id) if not candidates: return [] diff --git a/openviking/session/memory_extractor.py b/openviking/session/memory_extractor.py index 4d411b6c..c3754e36 100644 --- a/openviking/session/memory_extractor.py +++ b/openviking/session/memory_extractor.py @@ -231,8 +231,14 @@ async def extract( context: dict, user: UserIdentifier, session_id: str, + *, + strict: bool = False, ) -> List[CandidateMemory]: - """Extract memory candidates from messages.""" + """Extract memory candidates from messages. + + When ``strict`` is True, extraction failures are re-raised as + ``RuntimeError`` so async task tracking can mark tasks as failed. + """ user = user vlm = get_openviking_config().vlm if not vlm or not vlm.is_available(): @@ -342,8 +348,19 @@ async def extract( except Exception as e: logger.error(f"Memory extraction failed: {e}") + if strict: + raise RuntimeError(f"memory_extraction_failed: {e}") from e return [] + async def extract_strict( + self, + context: dict, + user: UserIdentifier, + session_id: str, + ) -> List[CandidateMemory]: + """Compatibility wrapper: strict mode delegates to ``extract``.""" + return await self.extract(context, user, session_id, strict=True) + async def create_memory( self, candidate: CandidateMemory, diff --git a/openviking/session/session.py b/openviking/session/session.py index 243069a1..9f2d7766 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -340,6 +340,7 @@ async def commit_async(self) -> Dict[str, Any]: user=self.user, session_id=self.session_id, ctx=self.ctx, + strict_extract_errors=True, ) logger.info(f"Extracted {len(memories)} memories") result["memories_extracted"] = len(memories) diff --git a/tests/test_session_task_tracking.py b/tests/test_session_task_tracking.py index ab779fb2..abeb513d 100644 --- a/tests/test_session_task_tracking.py +++ b/tests/test_session_task_tracking.py @@ -139,6 +139,34 @@ async def failing_commit(_sid, _ctx): assert "LLM provider timeout" in result["error"] +async def test_task_failed_when_memory_extraction_raises(api_client): + """Extractor failures should propagate to task error instead of silent completed+0.""" + client, service = api_client + session_id = await _new_session_with_message(client) + + async def failing_extract(_context, _user, _session_id): + raise RuntimeError("memory_extraction_failed: synthetic extractor error") + + service.sessions._session_compressor.extractor.extract_strict = failing_extract + + resp = await client.post(f"/api/v1/sessions/{session_id}/commit", params={"wait": False}) + task_id = resp.json()["result"]["task_id"] + + result = None + for _ in range(120): + await asyncio.sleep(0.1) + task_resp = await client.get(f"/api/v1/tasks/{task_id}") + assert task_resp.status_code == 200 + result = task_resp.json()["result"] + if result["status"] in {"completed", "failed"}: + break + + assert result is not None + assert result["status"] in {"completed", "failed"} + assert result["status"] == "failed" + assert "memory_extraction_failed" in result["error"] + + # ── Duplicate commit rejection ──