diff --git a/openviking/storage/queuefs/semantic_dag.py b/openviking/storage/queuefs/semantic_dag.py index 79213f71..3279e22c 100644 --- a/openviking/storage/queuefs/semantic_dag.py +++ b/openviking/storage/queuefs/semantic_dag.py @@ -432,15 +432,20 @@ async def _file_summary_task(self, parent_uri: str, file_path: str) -> None: try: if need_vectorize: + vectorize_parent = parent_uri + vectorize_file_path = file_path + if self._incremental_update: + vectorize_parent = self._get_target_file_path(parent_uri) or parent_uri + vectorize_file_path = self._get_target_file_path(file_path) or file_path task = VectorizeTask( task_type="file", - uri=file_path, + uri=vectorize_file_path, context_type=self._context_type, ctx=self._ctx, semantic_msg_id=self._semantic_msg_id, - file_path=file_path, + file_path=vectorize_file_path, summary_dict=summary_dict, - parent_uri=parent_uri, + parent_uri=vectorize_parent, ) await self._add_vectorize_task(task) except Exception as e: @@ -551,9 +556,12 @@ async def _overview_task(self, dir_uri: str) -> None: try: if need_vectorize: + vectorize_uri = dir_uri + if self._incremental_update: + vectorize_uri = self._get_target_file_path(dir_uri) or dir_uri task = VectorizeTask( task_type="directory", - uri=dir_uri, + uri=vectorize_uri, context_type=self._context_type, ctx=self._ctx, semantic_msg_id=self._semantic_msg_id, diff --git a/tests/storage/test_semantic_dag_stats.py b/tests/storage/test_semantic_dag_stats.py index 94f9441f..93b1d7d4 100644 --- a/tests/storage/test_semantic_dag_stats.py +++ b/tests/storage/test_semantic_dag_stats.py @@ -62,6 +62,23 @@ async def register(self, **_kwargs): return None +def _mock_transaction_layer(monkeypatch): + """Mock lock layer: LockContext as no-op passthrough.""" + mock_handle = MagicMock() + monkeypatch.setattr( + "openviking.storage.transaction.lock_context.LockContext.__aenter__", + AsyncMock(return_value=mock_handle), + ) + monkeypatch.setattr( + "openviking.storage.transaction.lock_context.LockContext.__aexit__", + AsyncMock(return_value=False), + ) + monkeypatch.setattr( + "openviking.storage.transaction.get_lock_manager", + lambda: MagicMock(), + ) + + @pytest.mark.asyncio async def test_semantic_dag_stats_collects_nodes(monkeypatch): root_uri = "viking://resources/root" @@ -81,21 +98,7 @@ async def test_semantic_dag_stats_collects_nodes(monkeypatch): "openviking.storage.queuefs.embedding_tracker.EmbeddingTaskTracker.get_instance", lambda: _DummyTracker(), ) - - # Mock lock layer: LockContext as no-op passthrough - mock_handle = MagicMock() - monkeypatch.setattr( - "openviking.storage.transaction.lock_context.LockContext.__aenter__", - AsyncMock(return_value=mock_handle), - ) - monkeypatch.setattr( - "openviking.storage.transaction.lock_context.LockContext.__aexit__", - AsyncMock(return_value=False), - ) - monkeypatch.setattr( - "openviking.storage.transaction.get_lock_manager", - lambda: MagicMock(), - ) + _mock_transaction_layer(monkeypatch) processor = _FakeProcessor() ctx = RequestContext(user=UserIdentifier("acc1", "user1", "agent1"), role=Role.USER) @@ -110,7 +113,7 @@ async def test_semantic_dag_stats_collects_nodes(monkeypatch): stats = executor.get_stats() assert isinstance(stats, DagStats) - assert stats.total_nodes == 5 # 2 dirs + 3 files + assert stats.total_nodes == 5 assert stats.pending_nodes == 0 assert stats.done_nodes == 5 assert stats.in_progress_nodes == 0 @@ -120,5 +123,48 @@ async def test_semantic_dag_stats_collects_nodes(monkeypatch): ) +@pytest.mark.asyncio +async def test_incremental_update_vectorizes_with_target_uris(monkeypatch): + temp_root_uri = "viking://temp/tmp123/demo" + target_root_uri = "viking://resources/demo" + tree = { + temp_root_uri: [ + {"name": "doc.md", "isDir": False}, + {"name": "subdir", "isDir": True}, + ], + f"{temp_root_uri}/subdir": [ + {"name": "nested.txt", "isDir": False}, + ], + } + fake_fs = _FakeVikingFS(tree) + monkeypatch.setattr("openviking.storage.queuefs.semantic_dag.get_viking_fs", lambda: fake_fs) + monkeypatch.setattr( + "openviking.storage.queuefs.embedding_tracker.EmbeddingTaskTracker.get_instance", + lambda: _DummyTracker(), + ) + _mock_transaction_layer(monkeypatch) + + processor = _FakeProcessor() + ctx = RequestContext(user=UserIdentifier("acc1", "user1", "agent1"), role=Role.USER) + executor = SemanticDagExecutor( + processor=processor, + context_type="resource", + max_concurrent_llm=2, + ctx=ctx, + incremental_update=True, + target_uri=target_root_uri, + ) + await executor.run(temp_root_uri) + await asyncio.sleep(0) + + # Directories should be vectorized at target URIs + assert processor.vectorized_dirs == [target_root_uri, f"{target_root_uri}/subdir"] + # Files should be vectorized at target URIs, not temp + assert sorted(processor.vectorized_files) == sorted([ + f"{target_root_uri}/doc.md", + f"{target_root_uri}/subdir/nested.txt", + ]) + + if __name__ == "__main__": pytest.main([__file__])