Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion openviking/session/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ async def extract_long_term_memories(
user: Optional["UserIdentifier"] = None,
session_id: Optional[str] = None,
ctx: Optional[RequestContext] = None,
strict_extract_errors: bool = False,
) -> List[Context]:
"""Extract long-term memories from messages."""
if not messages:
Expand All @@ -146,7 +147,12 @@ async def extract_long_term_memories(
if not ctx:
return []

candidates = await self.extractor.extract(context, user, session_id)
if strict_extract_errors:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] strict_extract_errors=True 时异常会直接从 extract_long_term_memories 抛出,调用方 commit_async()(session.py:338)没有 try/catch,而是依赖外层 task tracker 来捕获。

这个假设本身是合理的,但建议至少在此处或 commit_async 中加一行注释说明 "extraction errors intentionally propagate to caller (task tracker)",避免后续维护者误以为遗漏了异常处理而加上 try/catch 吞掉错误。

# Intentionally let extraction errors bubble up so caller (task tracker)
# can mark background commit tasks as failed with an explicit error.
candidates = await self.extractor.extract_strict(context, user, session_id)
else:
candidates = await self.extractor.extract(context, user, session_id)

if not candidates:
return []
Expand Down
19 changes: 18 additions & 1 deletion openviking/session/memory_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,14 @@ async def extract(
context: dict,
user: UserIdentifier,
session_id: str,
*,
strict: bool = False,
) -> List[CandidateMemory]:
"""Extract memory candidates from messages."""
"""Extract memory candidates from messages.

When ``strict`` is True, extraction failures are re-raised as
``RuntimeError`` so async task tracking can mark tasks as failed.
"""
user = user
vlm = get_openviking_config().vlm
if not vlm or not vlm.is_available():
Expand Down Expand Up @@ -342,8 +348,19 @@ async def extract(

except Exception as e:
logger.error(f"Memory extraction failed: {e}")
if strict:
raise RuntimeError(f"memory_extraction_failed: {e}") from e
return []

async def extract_strict(
self,
context: dict,
user: UserIdentifier,
session_id: str,
) -> List[CandidateMemory]:
"""Compatibility wrapper: strict mode delegates to ``extract``."""
return await self.extract(context, user, session_id, strict=True)

async def create_memory(
self,
candidate: CandidateMemory,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug] extract_strict()extract() 的完整复制粘贴(~115 行),仅在 except 块中有行为差异(raise RuntimeError vs return [])。

这意味着后续对 extract() 的任何 bug 修复或功能变更都必须同步到 extract_strict(),否则两者会产生行为分歧(silent divergence)。

建议通过在 extract() 中添加一个 strict 参数来消除重复:

async def extract(
    self, context, user, session_id, *, strict: bool = False
) -> List[CandidateMemory]:
    # ... existing logic ...
    except Exception as e:
        logger.error(f"Memory extraction failed: {e}")
        if strict:
            raise RuntimeError(f"memory_extraction_failed: {e}") from e
        return []

然后 extract_strict 可以只做一层委托,或直接在 compressor 中传 strict=True

Expand Down
1 change: 1 addition & 0 deletions openviking/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ async def commit_async(self) -> Dict[str, Any]:
user=self.user,
session_id=self.session_id,
ctx=self.ctx,
strict_extract_errors=True,
)
logger.info(f"Extracted {len(memories)} memories")
result["memories_extracted"] = len(memories)
Expand Down
28 changes: 28 additions & 0 deletions tests/test_session_task_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,34 @@ async def failing_commit(_sid, _ctx):
assert "LLM provider timeout" in result["error"]


async def test_task_failed_when_memory_extraction_raises(api_client):
"""Extractor failures should propagate to task error instead of silent completed+0."""
client, service = api_client
session_id = await _new_session_with_message(client)

async def failing_extract(_context, _user, _session_id):
raise RuntimeError("memory_extraction_failed: synthetic extractor error")

service.sessions._session_compressor.extractor.extract_strict = failing_extract

resp = await client.post(f"/api/v1/sessions/{session_id}/commit", params={"wait": False})
task_id = resp.json()["result"]["task_id"]

result = None
for _ in range(120):
await asyncio.sleep(0.1)
task_resp = await client.get(f"/api/v1/tasks/{task_id}")
assert task_resp.status_code == 200
result = task_resp.json()["result"]
if result["status"] in {"completed", "failed"}:
break

assert result is not None
assert result["status"] in {"completed", "failed"}
assert result["status"] == "failed"
assert "memory_extraction_failed" in result["error"]


# ── Duplicate commit rejection ──


Expand Down
Loading