Skip to content

Revert "feat(resource): implement incremental update with COW pattern"#584

Merged
qin-ctx merged 1 commit intomainfrom
revert-535-feature/incremental_update
Mar 13, 2026
Merged

Revert "feat(resource): implement incremental update with COW pattern"#584
qin-ctx merged 1 commit intomainfrom
revert-535-feature/incremental_update

Conversation

@qin-ctx
Copy link
Collaborator

@qin-ctx qin-ctx commented Mar 13, 2026

Reverts #535

@CLAassistant
Copy link

CLAassistant commented Mar 13, 2026

CLA assistant check
All committers have signed the CLA.

@qin-ctx qin-ctx marked this pull request as draft March 13, 2026 14:01
@github-actions
Copy link

github-actions bot commented Mar 13, 2026

PR Reviewer Guide 🔍

(Review updated until commit d6f2e24)

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis 🔶

535 - Partially compliant

Compliant requirements:

(This is a revert PR, so no requirements from the original ticket are fulfilled)

Non-compliant requirements:

  • Add support for incremental updates using copy-on-write pattern
  • Add ResourceLockManager for managing concurrent updates
  • Introduce EmbeddingTaskTracker to track embedding task completion
  • Modify TreeBuilder to support temp URIs and skip conflict resolution
  • Update SemanticDagExecutor to handle incremental updates
  • Extend memory extractor/compressor to work with temp URIs
  • Add exists() method to VikingFS for URI existence checks
  • Update Context to include temp_uri field

Requires further human verification:

  • Verify that all files added in the original PR (e.g., resource_lock.py, embedding_tracker.py) are properly deleted
  • Check that the Context class no longer has the temp_uri field
  • Ensure VikingFS no longer has the exists() method (if it was added in the original PR)
⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Revert Verification

Verify that the commit_async method is fully reverted to the original implementation without COW pattern and temp URI management.

async def commit_async(self) -> Dict[str, Any]:
    """Async commit session: create archive, extract memories, persist."""
    result = {
        "session_id": self.session_id,
        "status": "committed",
        "memories_extracted": 0,
        "active_count_updated": 0,
        "archived": False,
        "stats": None,
    }
    if not self._messages:
        return result

    # 1. Archive current messages
    self._compression.compression_index += 1
    messages_to_archive = self._messages.copy()

    summary = await self._generate_archive_summary_async(messages_to_archive)
    archive_abstract = self._extract_abstract_from_summary(summary)
    archive_overview = summary

    await self._write_archive_async(
        index=self._compression.compression_index,
        messages=messages_to_archive,
        abstract=archive_abstract,
        overview=archive_overview,
    )

    self._compression.original_count += len(messages_to_archive)
    result["archived"] = True

    self._messages.clear()
    logger.info(
        f"Archived: {len(messages_to_archive)} messages → history/archive_{self._compression.compression_index:03d}/"
    )

    # 2. Extract long-term memories
    if self._session_compressor:
        logger.info(
            f"Starting memory extraction from {len(messages_to_archive)} archived messages"
        )
        memories = await self._session_compressor.extract_long_term_memories(
            messages=messages_to_archive,
        )
        logger.info(f"Extracted {len(memories)} memories")
        result["memories_extracted"] = len(memories)
        self._stats.memories_extracted += len(memories)

    # 3. Write current messages to AGFS
    await self._write_to_agfs_async(self._messages)

    # 4. Create relations
    await self._write_relations_async()

    # 5. Update active_count
    active_count_updated = await self._update_active_counts_async()
    result["active_count_updated"] = active_count_updated

    # 6. Update statistics
    self._stats.compression_count = self._compression.compression_index
    result["stats"] = {
        "total_turns": self._stats.total_turns,
        "contexts_used": self._stats.contexts_used,
        "skills_used": self._stats.skills_used,
        "memories_extracted": self._stats.memories_extracted,
    }

    self._stats.total_tokens = 0
    logger.info(f"Session {self.session_id} committed (async)")
    return result
EmbeddingTaskTracker Removal

Ensure all references to EmbeddingTaskTracker are removed and the original semantic processing flow is restored.

async def _enqueue_semantic_msg(self, msg: SemanticMsg) -> None:
    """Enqueue a SemanticMsg to the semantic queue for processing."""
    from openviking.storage.queuefs import get_queue_manager

    queue_manager = get_queue_manager()
    semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC)
    # The queue manager returns SemanticQueue but method signature says NamedQueue
    # We need to ignore the type error for the enqueue call
    await semantic_queue.enqueue(msg)  # type: ignore
    logger.debug(f"Enqueued semantic message for processing: {msg.uri}")

async def _collect_directory_info(
    self,
    uri: str,
    result: List[Tuple[str, List[str], List[str]]],
) -> None:
    """Recursively collect directory info, post-order traversal ensures bottom-up order."""
    viking_fs = get_viking_fs()

    try:
        entries = await viking_fs.ls(uri, ctx=self._current_ctx)
    except Exception as e:
        logger.warning(f"Failed to list directory {uri}: {e}")
        return

    children_uris = []
    file_paths = []

    for entry in entries:
        name = entry.get("name", "")
        if not name or name.startswith(".") or name in [".", ".."]:
            continue

        item_uri = VikingURI(uri).join(name).uri

        if entry.get("isDir", False):
            # Child directory
            children_uris.append(item_uri)
            # Recursively collect children
            await self._collect_directory_info(item_uri, result)
        else:
            # File (not starting with .)
            file_paths.append(item_uri)

    # Add current directory info
    result.append((uri, children_uris, file_paths))

async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
    """Process dequeued SemanticMsg, recursively process all subdirectories."""
    try:
        import json

        if not data:
            return None

        if "data" in data and isinstance(data["data"], str):
            data = json.loads(data["data"])

        # data is guaranteed to be not None at this point
        assert data is not None
        msg = SemanticMsg.from_dict(data)
        self._current_msg = msg
        self._current_ctx = self._ctx_from_semantic_msg(msg)
        logger.info(
            f"Processing semantic generation for: {msg.uri} (recursive={msg.recursive})"
        )

        if msg.recursive:
            executor = SemanticDagExecutor(
                processor=self,
                context_type=msg.context_type,
                max_concurrent_llm=self.max_concurrent_llm,
                ctx=self._current_ctx,
            )
            self._dag_executor = executor
            await executor.run(msg.uri)
            logger.info(f"Completed semantic generation for: {msg.uri}")
            self.report_success()
            return None
        else:
            # Non-recursive processing: directly process this directory
            children_uris = []
            file_paths = []

            # Collect immediate children info only (no recursion)
            viking_fs = get_viking_fs()
            try:
                entries = await viking_fs.ls(msg.uri, ctx=self._current_ctx)
                for entry in entries:
                    name = entry.get("name", "")
                    if not name or name.startswith(".") or name in [".", ".."]:
                        continue

                    item_uri = VikingURI(msg.uri).join(name).uri

                    if entry.get("isDir", False):
                        children_uris.append(item_uri)
                    else:
                        file_paths.append(item_uri)
            except Exception as e:
                logger.warning(f"Failed to list directory {msg.uri}: {e}")

            # Process this directory
            await self._process_single_directory(
                uri=msg.uri,
                context_type=msg.context_type,
                children_uris=children_uris,
                file_paths=file_paths,
            )

            logger.info(f"Completed semantic generation for: {msg.uri}")
            self.report_success()
            return None

    except Exception as e:
        logger.error(f"Failed to process semantic message: {e}", exc_info=True)
        self.report_error(str(e), data)
        return None
    finally:
        self._current_msg = None

def get_dag_stats(self) -> Optional["DagStats"]:
    if not self._dag_executor:
        return None
    return self._dag_executor.get_stats()

async def _process_single_directory(
    self,
    uri: str,
    context_type: str,
    children_uris: List[str],
    file_paths: List[str],
) -> None:
    """Process single directory, generate .abstract.md and .overview.md."""
    viking_fs = get_viking_fs()

    # 1. Collect .abstract.md from subdirectories (already processed earlier)
    children_abstracts = await self._collect_children_abstracts(children_uris)

    # 2. Concurrently generate summaries for files in directory
    file_summaries = await self._generate_file_summaries(
        file_paths, context_type=context_type, parent_uri=uri, enqueue_files=True
    )

    # 3. Generate .overview.md (contains brief description)
    overview = await self._generate_overview(uri, file_summaries, children_abstracts)

    # 4. Extract abstract from overview
    abstract = self._extract_abstract_from_overview(overview)

    # 5. Write files
    await viking_fs.write_file(f"{uri}/.overview.md", overview, ctx=self._current_ctx)
    await viking_fs.write_file(f"{uri}/.abstract.md", abstract, ctx=self._current_ctx)

    logger.debug(f"Generated overview and abstract for {uri}")

    # 6. Vectorize directory
    try:
        await self._vectorize_directory_simple(uri, context_type, abstract, overview)
    except Exception as e:
        logger.error(f"Failed to vectorize directory {uri}: {e}", exc_info=True)
Unique URI Resolution

Verify that the _resolve_unique_uri method is restored and TreeBuilder no longer skips conflict resolution.

    base_uri = parent_uri or auto_base_uri
    # 3. Determine candidate_uri
    if to_uri:
        # Exact target URI: must not exist yet
        try:
            await viking_fs.stat(to_uri, ctx=ctx)
            # If we get here, it already exists
            raise FileExistsError(f"Target URI already exists: {to_uri}")
        except FileExistsError:
            raise
        except Exception:
            # It doesn't exist, good to use
            pass
        candidate_uri = to_uri
    else:
        if parent_uri:
            # Parent URI must exist and be a directory
            try:
                stat_result = await viking_fs.stat(parent_uri, ctx=ctx)
            except Exception as e:
                raise FileNotFoundError(f"Parent URI does not exist: {parent_uri}") from e
            if not stat_result.get("isDir"):
                raise ValueError(f"Parent URI is not a directory: {parent_uri}")
        candidate_uri = VikingURI(base_uri).join(final_doc_name).uri

    final_uri = await self._resolve_unique_uri(candidate_uri, ctx=ctx)

    if final_uri != candidate_uri:
        logger.info(f"[TreeBuilder] Resolved name conflict: {candidate_uri} -> {final_uri}")
    else:
        logger.info(f"[TreeBuilder] Finalizing from temp: {final_uri}")

    # 4. Move directory tree from temp to final location in AGFS
    await self._move_temp_to_dest(viking_fs, temp_doc_uri, final_uri, ctx=ctx)
    logger.info(f"[TreeBuilder] Moved temp tree: {temp_doc_uri} -> {final_uri}")

    # 5. Cleanup temporary root directory
    try:
        await viking_fs.delete_temp(temp_uri, ctx=ctx)
        logger.info(f"[TreeBuilder] Cleaned up temp root: {temp_uri}")
    except Exception as e:
        logger.warning(f"[TreeBuilder] Failed to cleanup temp root: {e}")

    # 6. Enqueue to SemanticQueue for async semantic generation
    if trigger_semantic:
        try:
            await self._enqueue_semantic_generation(final_uri, "resource", ctx=ctx)
            logger.info(f"[TreeBuilder] Enqueued semantic generation for: {final_uri}")
        except Exception as e:
            logger.error(
                f"[TreeBuilder] Failed to enqueue semantic generation: {e}", exc_info=True
            )

    # 7. Return simple BuildingTree (no scanning needed)
    tree = BuildingTree(
        source_path=source_path,
        source_format=source_format,
    )
    tree._root_uri = final_uri

    # Create a minimal Context object for the root so that tree.root is not None
    root_context = Context(uri=final_uri)
    tree.add_context(root_context)

    return tree

async def _resolve_unique_uri(
    self, uri: str, max_attempts: int = 100, ctx: Optional[RequestContext] = None
) -> str:
    """Return a URI that does not collide with an existing resource.

    If *uri* is free, return it unchanged.  Otherwise append ``_1``,
    ``_2``, … until a free name is found (like macOS Finder / Windows
    Explorer).
    """
    viking_fs = get_viking_fs()

    async def _exists(u: str) -> bool:
        try:
            await viking_fs.stat(u, ctx=ctx)
            return True
        except Exception:
            return False

    if not await _exists(uri):
        return uri

    for i in range(1, max_attempts + 1):
        candidate = f"{uri}_{i}"
        if not await _exists(candidate):
            return candidate

    raise FileExistsError(f"Cannot resolve unique name for {uri} after {max_attempts} attempts")

@github-actions
Copy link

PR Code Suggestions ✨

No code suggestions found for the PR.

@qin-ctx qin-ctx marked this pull request as ready for review March 13, 2026 14:03
@github-actions
Copy link

Persistent review updated to latest commit d6f2e24

@github-actions
Copy link

Failed to generate code suggestions for PR

@qin-ctx qin-ctx merged commit 20b5dab into main Mar 13, 2026
5 of 7 checks passed
@qin-ctx qin-ctx deleted the revert-535-feature/incremental_update branch March 13, 2026 15:26
mvanhorn added a commit to mvanhorn/OpenViking that referenced this pull request Mar 14, 2026
…ction

commit_async() called extract_long_term_memories(messages=...) without
passing user, session_id, or ctx. Because the compressor returns early
when ctx is None, async commits always produced memories_extracted=0.

The sync commit() path already passes all three parameters correctly.
This aligns the async path to match.

Regression from the COW pattern revert (volcengine#584) which dropped these
arguments from the async call.

Fixes volcengine#602
qin-ctx pushed a commit that referenced this pull request Mar 15, 2026
…ction (#610)

commit_async() called extract_long_term_memories(messages=...) without
passing user, session_id, or ctx. Because the compressor returns early
when ctx is None, async commits always produced memories_extracted=0.

The sync commit() path already passes all three parameters correctly.
This aligns the async path to match.

Regression from the COW pattern revert (#584) which dropped these
arguments from the async call.

Fixes #602

Co-authored-by: Matt Van Horn <455140+mvanhorn@users.noreply.github.com>
CHW0n9 pushed a commit to CHW0n9/OpenViking that referenced this pull request Mar 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants