From 8aee8e78dd9127e5ea717e16b9410341ae79eee3 Mon Sep 17 00:00:00 2001 From: gilangjavier Date: Wed, 18 Mar 2026 20:14:23 +0700 Subject: [PATCH 1/2] fix(queuefs): vectorize directories and files at target URIs during incremental updates When adding resources with incremental update (e.g., using temp_path), the semantic DAG previously vectorized directory-level L0/L1 under the temporary URI. After SyncDiff moves files to the final target URI, the vectors remained attached to the temp path, causing searches under the actual resource directory to return no results. Adjust SemanticDagExecutor to compute target URIs for vectorization tasks when incremental_update=True. This applies to both file and directory vectorization, ensuring vectors are indexed at the final destination. Also added test to verify target URIs are used in incremental mode. Fixes issue #743 in volcengine/OpenViking. Signed-off-by: Hephaestus --- openviking/storage/queuefs/semantic_dag.py | 16 +++-- tests/storage/test_semantic_dag_stats.py | 77 +++++++++++++++++----- 2 files changed, 73 insertions(+), 20 deletions(-) diff --git a/openviking/storage/queuefs/semantic_dag.py b/openviking/storage/queuefs/semantic_dag.py index 79213f71..3279e22c 100644 --- a/openviking/storage/queuefs/semantic_dag.py +++ b/openviking/storage/queuefs/semantic_dag.py @@ -432,15 +432,20 @@ async def _file_summary_task(self, parent_uri: str, file_path: str) -> None: try: if need_vectorize: + vectorize_parent = parent_uri + vectorize_file_path = file_path + if self._incremental_update: + vectorize_parent = self._get_target_file_path(parent_uri) or parent_uri + vectorize_file_path = self._get_target_file_path(file_path) or file_path task = VectorizeTask( task_type="file", - uri=file_path, + uri=vectorize_file_path, context_type=self._context_type, ctx=self._ctx, semantic_msg_id=self._semantic_msg_id, - file_path=file_path, + file_path=vectorize_file_path, summary_dict=summary_dict, - parent_uri=parent_uri, + parent_uri=vectorize_parent, ) await self._add_vectorize_task(task) except Exception as e: @@ -551,9 +556,12 @@ async def _overview_task(self, dir_uri: str) -> None: try: if need_vectorize: + vectorize_uri = dir_uri + if self._incremental_update: + vectorize_uri = self._get_target_file_path(dir_uri) or dir_uri task = VectorizeTask( task_type="directory", - uri=dir_uri, + uri=vectorize_uri, context_type=self._context_type, ctx=self._ctx, semantic_msg_id=self._semantic_msg_id, diff --git a/tests/storage/test_semantic_dag_stats.py b/tests/storage/test_semantic_dag_stats.py index 94f9441f..e5424c64 100644 --- a/tests/storage/test_semantic_dag_stats.py +++ b/tests/storage/test_semantic_dag_stats.py @@ -62,6 +62,22 @@ async def register(self, **_kwargs): return None +def _mock_transaction_layer(monkeypatch): + mock_handle = MagicMock() + monkeypatch.setattr( + "openviking.storage.transaction.lock_context.LockContext.__aenter__", + AsyncMock(return_value=mock_handle), + ) + monkeypatch.setattr( + "openviking.storage.transaction.lock_context.LockContext.__aexit__", + AsyncMock(return_value=False), + ) + monkeypatch.setattr( + "openviking.storage.transaction.get_lock_manager", + lambda: MagicMock(), + ) + + @pytest.mark.asyncio async def test_semantic_dag_stats_collects_nodes(monkeypatch): root_uri = "viking://resources/root" @@ -81,21 +97,7 @@ async def test_semantic_dag_stats_collects_nodes(monkeypatch): "openviking.storage.queuefs.embedding_tracker.EmbeddingTaskTracker.get_instance", lambda: _DummyTracker(), ) - - # Mock lock layer: LockContext as no-op passthrough - mock_handle = MagicMock() - monkeypatch.setattr( - "openviking.storage.transaction.lock_context.LockContext.__aenter__", - AsyncMock(return_value=mock_handle), - ) - monkeypatch.setattr( - "openviking.storage.transaction.lock_context.LockContext.__aexit__", - AsyncMock(return_value=False), - ) - monkeypatch.setattr( - "openviking.storage.transaction.get_lock_manager", - lambda: MagicMock(), - ) + _mock_transaction_layer(monkeypatch) processor = _FakeProcessor() ctx = RequestContext(user=UserIdentifier("acc1", "user1", "agent1"), role=Role.USER) @@ -110,7 +112,7 @@ async def test_semantic_dag_stats_collects_nodes(monkeypatch): stats = executor.get_stats() assert isinstance(stats, DagStats) - assert stats.total_nodes == 5 # 2 dirs + 3 files + assert stats.total_nodes == 5 assert stats.pending_nodes == 0 assert stats.done_nodes == 5 assert stats.in_progress_nodes == 0 @@ -120,5 +122,48 @@ async def test_semantic_dag_stats_collects_nodes(monkeypatch): ) +@pytest.mark.asyncio +async def test_incremental_update_vectorizes_with_target_uris(monkeypatch): + temp_root_uri = "viking://temp/tmp123/demo" + target_root_uri = "viking://resources/demo" + tree = { + temp_root_uri: [ + {"name": "doc.md", "isDir": False}, + {"name": "subdir", "isDir": True}, + ], + f"{temp_root_uri}/subdir": [ + {"name": "nested.txt", "isDir": False}, + ], + } + fake_fs = _FakeVikingFS(tree) + monkeypatch.setattr("openviking.storage.queuefs.semantic_dag.get_viking_fs", lambda: fake_fs) + monkeypatch.setattr( + "openviking.storage.queuefs.embedding_tracker.EmbeddingTaskTracker.get_instance", + lambda: _DummyTracker(), + ) + _mock_transaction_layer(monkeypatch) + + processor = _FakeProcessor() + ctx = RequestContext(user=UserIdentifier("acc1", "user1", "agent1"), role=Role.USER) + executor = SemanticDagExecutor( + processor=processor, + context_type="resource", + max_concurrent_llm=2, + ctx=ctx, + incremental_update=True, + target_uri=target_root_uri, + ) + await executor.run(temp_root_uri) + await asyncio.sleep(0) + + # Directories should be vectorized at target URIs + assert processor.vectorized_dirs == [target_root_uri, f"{target_root_uri}/subdir"] + # Files should be vectorized at target URIs, not temp + assert sorted(processor.vectorized_files) == sorted([ + f"{target_root_uri}/doc.md", + f"{target_root_uri}/subdir/nested.txt", + ]) + + if __name__ == "__main__": pytest.main([__file__]) From b63244a75b90450c55490c942ad62fcd6fe3c262 Mon Sep 17 00:00:00 2001 From: gilangjavier Date: Thu, 19 Mar 2026 10:16:24 +0700 Subject: [PATCH 2/2] test: add helper docstring for transaction layer mock --- tests/storage/test_semantic_dag_stats.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/storage/test_semantic_dag_stats.py b/tests/storage/test_semantic_dag_stats.py index e5424c64..93b1d7d4 100644 --- a/tests/storage/test_semantic_dag_stats.py +++ b/tests/storage/test_semantic_dag_stats.py @@ -63,6 +63,7 @@ async def register(self, **_kwargs): def _mock_transaction_layer(monkeypatch): + """Mock lock layer: LockContext as no-op passthrough.""" mock_handle = MagicMock() monkeypatch.setattr( "openviking.storage.transaction.lock_context.LockContext.__aenter__",