From 55d7f58a6e9260a64f897d10c7608d5670444bb9 Mon Sep 17 00:00:00 2001 From: deepakdevp Date: Thu, 19 Mar 2026 22:27:56 +0900 Subject: [PATCH 1/3] fix(session): add commit lock to prevent concurrent re-commit race commit_async() now acquires an asyncio.Lock during Phase 1 (copy + clear + file write). This prevents concurrent commits from re-committing the same messages. The lock is released before the slow LLM summary and memory extraction, so it doesn't block other operations. The phase order is changed: live messages are cleared BEFORE the archive summary is generated, closing the race window where a second commit could see stale data. If the file-clear fails, messages are rolled back. Fixes #580. Co-Authored-By: Claude Opus 4.6 (1M context) --- openviking/session/session.py | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/openviking/session/session.py b/openviking/session/session.py index bdb6500b..db518cae 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -5,6 +5,7 @@ Session as Context: Sessions integrated into L0/L1/L2 system. """ +import asyncio import json import re from dataclasses import dataclass, field @@ -91,6 +92,7 @@ def __init__( self._compression: SessionCompression = SessionCompression() self._stats: SessionStats = SessionStats() self._loaded = False + self._commit_lock = asyncio.Lock() logger.info(f"Session created: {self.session_id} for user {self.user}") @@ -226,7 +228,9 @@ def commit(self) -> Dict[str, Any]: async def commit_async(self) -> Dict[str, Any]: """Async commit session: two-phase approach. - Phase 1 (Archive): Write archive, clear messages. + Phase 1 (Archive prep, lock-protected): Copy messages, clear live + session, increment compression index. The lock ensures concurrent + commits cannot re-commit the same messages. Phase 2 (Memory, redo-log protected): Extract memories, write, enqueue. """ import uuid @@ -241,19 +245,32 @@ async def commit_async(self) -> Dict[str, Any]: "archived": False, "stats": None, } - if not self._messages: - get_current_telemetry().set("memory.extracted", 0) - return result - # ===== Preparation ===== - self._compression.compression_index += 1 - messages_to_archive = self._messages.copy() + # ===== Phase 1: Snapshot + clear (lock-protected) ===== + async with self._commit_lock: + if not self._messages: + get_current_telemetry().set("memory.extracted", 0) + return result + + self._compression.compression_index += 1 + messages_to_archive = self._messages.copy() + self._messages.clear() + try: + await self._write_to_agfs_async(messages=[]) + except Exception: + # Rollback: restore messages so they aren't lost + self._messages.extend(messages_to_archive) + self._compression.compression_index -= 1 + raise + # Lock released — live session is now clean. + # Any add_message() from here appends to the fresh empty list. + + # ===== Phase 1 continued: Archive write (no lock needed) ===== summary = await self._generate_archive_summary_async(messages_to_archive) archive_abstract = self._extract_abstract_from_summary(summary) archive_overview = summary - # ===== Phase 1: Archive (no lock) ===== archive_uri = ( f"{self._session_uri}/history/archive_{self._compression.compression_index:03d}" ) @@ -263,8 +280,6 @@ async def commit_async(self) -> Dict[str, Any]: abstract=archive_abstract, overview=archive_overview, ) - await self._write_to_agfs_async(messages=[]) - self._messages.clear() self._compression.original_count += len(messages_to_archive) result["archived"] = True From c06b094aedc28670c4166c2e720b9216f32c6850 Mon Sep 17 00:00:00 2001 From: deepakdevp Date: Thu, 19 Mar 2026 22:29:39 +0900 Subject: [PATCH 2/3] test(session): add race condition tests for concurrent commit Tests verify that: - Two concurrent commit_async() calls on the same session produce exactly one archive (the other returns early) - Messages added while a commit is running are preserved in the session and not lost or re-committed Part of fix for #580. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/session/test_session_commit_race.py | 67 +++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 tests/session/test_session_commit_race.py diff --git a/tests/session/test_session_commit_race.py b/tests/session/test_session_commit_race.py new file mode 100644 index 00000000..e71a471f --- /dev/null +++ b/tests/session/test_session_commit_race.py @@ -0,0 +1,67 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for session commit race condition fix (#580).""" + +import asyncio + +from openviking import AsyncOpenViking +from openviking.message import TextPart + + +class TestCommitRace: + """Test concurrent commit safety.""" + + async def test_concurrent_commit_no_duplicate(self, client: AsyncOpenViking): + """Two concurrent commits on the same session: only one should archive.""" + session = client.session(session_id="race_test_dedup") + session.add_message("user", [TextPart("Hello")]) + session.add_message("assistant", [TextPart("Hi there")]) + + results = await asyncio.gather( + session.commit_async(), + session.commit_async(), + ) + + archived_count = sum(1 for r in results if r.get("archived") is True) + assert archived_count == 1, f"Expected exactly 1 archived commit, got {archived_count}" + + # Messages should be cleared after commit + assert len(session.messages) == 0 + + # Compression index should have incremented exactly once + assert session._compression.compression_index == 1 + + async def test_message_added_during_commit_not_lost(self, client: AsyncOpenViking): + """Messages added while commit is running should not be lost.""" + session = client.session(session_id="race_test_msg_safety") + session.add_message("user", [TextPart("Original message")]) + + # Use an Event for deterministic synchronization instead of sleeps + phase1_done = asyncio.Event() + original_generate = session._generate_archive_summary_async + + async def slow_generate(messages): + # Signal that Phase 1 is complete (lock released, messages cleared) + phase1_done.set() + # Yield control so add_message can run before archive completes + await asyncio.sleep(0) + return await original_generate(messages) + + session._generate_archive_summary_async = slow_generate + + async def commit_and_add(): + """Start commit, then add a message after Phase 1 completes.""" + commit_task = asyncio.create_task(session.commit_async()) + # Wait until Phase 1 is done (lock released, messages cleared) + await phase1_done.wait() + # Add message while commit is in Phase 2 (after lock released) + session.add_message("user", [TextPart("New message during commit")]) + return await commit_task + + result = await commit_and_add() + + assert result.get("archived") is True + # The new message should still be in the session + assert len(session.messages) == 1 + assert session.messages[0].content == "New message during commit" From 44593c1aad0a1209e4dea28a358c1895123fcfeb Mon Sep 17 00:00:00 2001 From: deepakdevp Date: Fri, 20 Mar 2026 12:37:08 +0900 Subject: [PATCH 3/3] refactor(session): use PathLock instead of asyncio.Lock for commit guard Replaces the in-process asyncio.Lock with the existing PathLock (distributed filesystem lock via LockContext) for Phase 1 of commit_async(). This ensures commit serialization works across multiple HTTP workers and service instances, not just within a single Python process. Addresses review feedback from qin-ctx on PR #783. --- openviking/session/session.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/openviking/session/session.py b/openviking/session/session.py index db518cae..13b3f565 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -5,7 +5,6 @@ Session as Context: Sessions integrated into L0/L1/L2 system. """ -import asyncio import json import re from dataclasses import dataclass, field @@ -92,7 +91,6 @@ def __init__( self._compression: SessionCompression = SessionCompression() self._stats: SessionStats = SessionStats() self._loaded = False - self._commit_lock = asyncio.Lock() logger.info(f"Session created: {self.session_id} for user {self.user}") @@ -228,14 +226,14 @@ def commit(self) -> Dict[str, Any]: async def commit_async(self) -> Dict[str, Any]: """Async commit session: two-phase approach. - Phase 1 (Archive prep, lock-protected): Copy messages, clear live - session, increment compression index. The lock ensures concurrent - commits cannot re-commit the same messages. + Phase 1 (Archive prep, PathLock-protected): Copy messages, clear live + session, increment compression index. Uses a distributed filesystem lock + (PathLock) so this works across workers and processes. Phase 2 (Memory, redo-log protected): Extract memories, write, enqueue. """ import uuid - from openviking.storage.transaction import get_lock_manager + from openviking.storage.transaction import LockContext, get_lock_manager result = { "session_id": self.session_id, @@ -246,8 +244,10 @@ async def commit_async(self) -> Dict[str, Any]: "stats": None, } - # ===== Phase 1: Snapshot + clear (lock-protected) ===== - async with self._commit_lock: + # ===== Phase 1: Snapshot + clear (PathLock-protected) ===== + # Use filesystem-based distributed lock so this works across workers/processes. + session_path = self._viking_fs._uri_to_path(self._session_uri, ctx=self.ctx) + async with LockContext(get_lock_manager(), [session_path], lock_mode="point"): if not self._messages: get_current_telemetry().set("memory.extracted", 0) return result