diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index cf164177..16ec2291 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -463,6 +463,13 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: root_files, root_dirs = await list_children(root_dir) target_files, target_dirs = await list_children(target_dir) + try: + await viking_fs._mv_vector_store_l0_l1(root_dir, target_dir, ctx=ctx) + except Exception as e: + logger.error( + f"[SyncDiff] Failed to move L0/L1 index: {root_dir} -> {target_dir}, error={e}" + ) + file_names = set(root_files.keys()) | set(target_files.keys()) for name in sorted(file_names): root_file = root_files.get(name) diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index 42e5e0f2..c25c251b 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -1210,6 +1210,7 @@ async def _update_vector_store_uris( old_base: str, new_base: str, ctx: Optional[RequestContext] = None, + levels: Optional[List[int]] = None, ) -> None: """Update URIs in vector store (when moving files). @@ -1232,11 +1233,71 @@ async def _update_vector_store_uris( uri=uri, new_uri=new_uri, new_parent_uri=new_parent_uri, + levels=levels, ) logger.debug(f"[VikingFS] Updated URI: {uri} -> {new_uri}") except Exception as e: logger.warning(f"[VikingFS] Failed to update {uri} in vector store: {e}") + async def _mv_vector_store_l0_l1( + self, + old_uri: str, + new_uri: str, + ctx: Optional[RequestContext] = None, + ) -> None: + from openviking.storage.errors import LockAcquisitionError, ResourceBusyError + from openviking.storage.transaction import LockContext, get_lock_manager + + self._ensure_access(old_uri, ctx) + self._ensure_access(new_uri, ctx) + + real_ctx = self._ctx_or_default(ctx) + old_dir = VikingURI.normalize(old_uri).rstrip("/") + new_dir = VikingURI.normalize(new_uri).rstrip("/") + if old_dir == new_dir: + return + + for uri in (old_dir, new_dir): + if uri.endswith(("/.abstract.md", "/.overview.md")): + raise ValueError(f"mv_vector_store expects directory URIs, got: {uri}") + + try: + old_stat = await self.stat(old_dir, ctx=real_ctx) + except Exception as e: + raise FileNotFoundError(f"mv_vector_store old_uri not found: {old_dir}") from e + try: + new_stat = await self.stat(new_dir, ctx=real_ctx) + except Exception as e: + raise FileNotFoundError(f"mv_vector_store new_uri not found: {new_dir}") from e + + if not (isinstance(old_stat, dict) and old_stat.get("isDir", False)): + raise ValueError(f"mv_vector_store expects old_uri to be a directory: {old_dir}") + if not (isinstance(new_stat, dict) and new_stat.get("isDir", False)): + raise ValueError(f"mv_vector_store expects new_uri to be a directory: {new_dir}") + + old_path = self._uri_to_path(old_dir, ctx=real_ctx) + new_path = self._uri_to_path(new_dir, ctx=real_ctx) + dst_parent = new_path.rsplit("/", 1)[0] if "/" in new_path else new_path + + try: + async with LockContext( + get_lock_manager(), + [old_path], + lock_mode="mv", + mv_dst_parent_path=dst_parent, + src_is_dir=True, + ): + await self._update_vector_store_uris( + uris=[old_dir], + old_base=old_dir, + new_base=new_dir, + ctx=real_ctx, + levels=[0, 1], + ) + + except LockAcquisitionError: + raise ResourceBusyError(f"Resource is being processed: {old_dir}") + def _get_vector_store(self) -> Optional["VikingVectorIndexBackend"]: """Get vector store instance.""" return self.vector_store diff --git a/openviking/storage/viking_vector_index_backend.py b/openviking/storage/viking_vector_index_backend.py index 29026d49..5a8b06e8 100644 --- a/openviking/storage/viking_vector_index_backend.py +++ b/openviking/storage/viking_vector_index_backend.py @@ -829,14 +829,22 @@ async def update_uri_mapping( uri: str, new_uri: str, new_parent_uri: str, + levels: Optional[List[int]] = None, ) -> bool: import hashlib - records = await self.filter( - filter=And([Eq("uri", uri), Eq("account_id", ctx.account_id)]), - limit=100, - ctx=ctx, - ) + conds: List[FilterExpr] = [Eq("uri", uri), Eq("account_id", ctx.account_id)] + if levels: + conds.append(In("level", levels)) + if ctx.role == Role.USER and uri.startswith(("viking://user/", "viking://agent/")): + owner_space = ( + ctx.user.user_space_name() + if uri.startswith("viking://user/") + else ctx.user.agent_space_name() + ) + conds.append(Eq("owner_space", owner_space)) + + records = await self.filter(filter=And(conds), limit=100, ctx=ctx) if not records: return False diff --git a/tests/storage/test_semantic_processor_mv_vector_store.py b/tests/storage/test_semantic_processor_mv_vector_store.py new file mode 100644 index 00000000..44e1c069 --- /dev/null +++ b/tests/storage/test_semantic_processor_mv_vector_store.py @@ -0,0 +1,217 @@ +from __future__ import annotations + +import hashlib +from typing import Any, Dict, List, Optional + +import pytest + +from openviking.server.identity import RequestContext, Role +from openviking.storage.expr import And, Eq, In +from openviking_cli.session.user_id import UserIdentifier + + +class _FakeVectorStore: + def __init__(self, records: List[Dict[str, Any]]): + self.records = list(records) + self.deleted_ids: List[str] = [] + + async def update_uri_mapping( + self, + *, + ctx: RequestContext, + uri: str, + new_uri: str, + new_parent_uri: str, + levels: Optional[List[int]] = None, + ) -> bool: + def seed_uri_for_id(target_uri: str, level: int) -> str: + if level == 0: + return ( + target_uri + if target_uri.endswith("/.abstract.md") + else f"{target_uri}/.abstract.md" + ) + if level == 1: + return ( + target_uri + if target_uri.endswith("/.overview.md") + else f"{target_uri}/.overview.md" + ) + return target_uri + + touched = False + ids_to_delete: List[str] = [] + for record in list(self.records): + if record.get("account_id") != ctx.account_id: + continue + if record.get("uri") != uri: + continue + try: + level = int(record.get("level", 2)) + except (TypeError, ValueError): + level = 2 + if levels is not None and level not in set(levels): + continue + + seed_uri = seed_uri_for_id(new_uri, level) + new_id = hashlib.md5(f"{ctx.account_id}:{seed_uri}".encode("utf-8")).hexdigest() + new_record = dict(record) + new_record["id"] = new_id + new_record["uri"] = new_uri + new_record["parent_uri"] = new_parent_uri + self.records.append(new_record) + touched = True + + old_id = record.get("id") + if old_id and old_id != new_id: + ids_to_delete.append(old_id) + + if ids_to_delete: + await self.delete(list(set(ids_to_delete)), ctx=ctx) + + return touched + + async def filter(self, *, filter=None, limit: int = 100, ctx: RequestContext): + conds = [] + if filter is not None: + if isinstance(filter, And): + conds = list(filter.conds) + else: + conds = [filter] + + uri: Optional[str] = None + account_id: Optional[str] = None + owner_space: Optional[str] = None + levels: Optional[List[int]] = None + + for cond in conds: + if isinstance(cond, Eq) and cond.field == "uri": + uri = cond.value + elif isinstance(cond, Eq) and cond.field == "account_id": + account_id = cond.value + elif isinstance(cond, Eq) and cond.field == "owner_space": + owner_space = cond.value + elif isinstance(cond, In) and cond.field == "level": + levels = [int(v) for v in cond.values] + + matched = [ + r + for r in self.records + if (uri is None or r.get("uri") == uri) + and (account_id is None or r.get("account_id") == account_id) + and (owner_space is None or r.get("owner_space") == owner_space) + and (levels is None or int(r.get("level", 2)) in levels) + ] + return matched[:limit] + + async def delete(self, ids: List[str], *, ctx: RequestContext) -> int: + id_set = set(ids) + self.deleted_ids.extend(ids) + self.records = [r for r in self.records if r.get("id") not in id_set] + return len(ids) + + +class _NoopLockContext: + def __init__(self, *_args, **_kwargs): + return None + + async def __aenter__(self): + return None + + async def __aexit__(self, exc_type, exc, tb): + return False + + +@pytest.mark.asyncio +async def test_mv_vector_store_moves_records(monkeypatch): + from openviking.storage.viking_fs import VikingFS + + ctx = RequestContext(user=UserIdentifier("acc", "user", "agent"), role=Role.ROOT) + old_uri = "viking://resources/a" + new_uri = "viking://resources/b" + + store = _FakeVectorStore( + [ + {"id": "l0", "uri": old_uri, "level": 0, "account_id": ctx.account_id, "owner_space": ""}, + {"id": "l1", "uri": old_uri, "level": 1, "account_id": ctx.account_id, "owner_space": ""}, + {"id": "l2", "uri": old_uri, "level": 2, "account_id": ctx.account_id, "owner_space": ""}, + { + "id": "child-l0", + "uri": f"{old_uri}/x", + "level": 0, + "account_id": ctx.account_id, + "owner_space": "", + }, + ] + ) + + class _FakeAGFS: + def rm(self, _path, recursive: bool = False): + return None + + class _FakeVikingFS(VikingFS): + def __init__(self): + super().__init__(agfs=_FakeAGFS(), vector_store=store) + + def _uri_to_path(self, uri, ctx=None): + return f"/mock/{uri.replace('viking://', '')}" + + async def stat(self, uri, ctx=None): + return {"isDir": True} + + def _ensure_access(self, uri, ctx): + return None + + monkeypatch.setattr( + "openviking.storage.viking_fs.get_viking_fs", + lambda: _FakeVikingFS(), + ) + monkeypatch.setattr("openviking.storage.transaction.get_lock_manager", lambda: None) + monkeypatch.setattr("openviking.storage.transaction.LockContext", _NoopLockContext) + + fs = _FakeVikingFS() + await fs._mv_vector_store_l0_l1(old_uri, new_uri, ctx=ctx) + + assert {r["id"] for r in store.records if r.get("uri") == old_uri} == {"l2"} + assert {r["id"] for r in store.records if r.get("uri") == f"{old_uri}/x"} == {"child-l0"} + assert {int(r["level"]) for r in store.records if r.get("uri") == new_uri} == {0, 1} + assert set(store.deleted_ids) == {"l0", "l1"} + + +@pytest.mark.asyncio +async def test_mv_vector_store_requires_directories(monkeypatch): + from openviking.storage.viking_fs import VikingFS + + ctx = RequestContext(user=UserIdentifier("acc", "user", "agent"), role=Role.ROOT) + old_uri = "viking://resources/a" + new_uri = "viking://resources/b" + + store = _FakeVectorStore([]) + + class _FakeAGFS: + def rm(self, _path, recursive: bool = False): + return None + + class _FakeVikingFS(VikingFS): + def __init__(self): + super().__init__(agfs=_FakeAGFS(), vector_store=store) + + def _uri_to_path(self, uri, ctx=None): + return f"/mock/{uri.replace('viking://', '')}" + + async def stat(self, uri, ctx=None): + return {"isDir": uri == old_uri} + + def _ensure_access(self, uri, ctx): + return None + + monkeypatch.setattr( + "openviking.storage.viking_fs.get_viking_fs", + lambda: _FakeVikingFS(), + ) + monkeypatch.setattr("openviking.storage.transaction.get_lock_manager", lambda: None) + monkeypatch.setattr("openviking.storage.transaction.LockContext", _NoopLockContext) + + fs = _FakeVikingFS() + with pytest.raises(ValueError): + await fs._mv_vector_store_l0_l1(old_uri, new_uri, ctx=ctx)