diff --git a/openviking/service/core.py b/openviking/service/core.py index e559698b..d025d663 100644 --- a/openviking/service/core.py +++ b/openviking/service/core.py @@ -245,14 +245,14 @@ async def close(self) -> None: self._transaction_manager.stop() self._transaction_manager = None - if self._agfs_manager: - self._agfs_manager.stop() - self._agfs_manager = None - if self._vikingdb_manager: await self._vikingdb_manager.close() self._vikingdb_manager = None + if self._agfs_manager: + self._agfs_manager.stop() + self._agfs_manager = None + self._viking_fs = None self._resource_processor = None self._skill_processor = None diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index 5cdb442b..1226c851 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -13,7 +13,7 @@ from openviking.models.embedder.base import EmbedResult from openviking.storage.queuefs.embedding_msg import EmbeddingMsg from openviking.storage.queuefs.named_queue import DequeueHandlerBase -from openviking.storage.vikingdb_interface import VikingDBInterface +from openviking.storage.vikingdb_interface import CollectionNotFoundError, VikingDBInterface from openviking.utils import get_logger from openviking.utils.config.open_viking_config import OpenVikingConfig @@ -165,6 +165,15 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, logger.debug( f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}" ) + except CollectionNotFoundError as db_err: + # During shutdown, queue workers may finish one dequeued item. + if getattr(self._vikingdb, "is_closing", False): + logger.debug(f"Skip embedding write during shutdown: {db_err}") + self.report_success() + return None + logger.error(f"Failed to write to vector database: {db_err}") + self.report_error(str(db_err), data) + return None except Exception as db_err: logger.error(f"Failed to write to vector database: {db_err}") import traceback diff --git a/openviking/storage/queuefs/queue_manager.py b/openviking/storage/queuefs/queue_manager.py index 7585a1b7..35974b86 100644 --- a/openviking/storage/queuefs/queue_manager.py +++ b/openviking/storage/queuefs/queue_manager.py @@ -138,6 +138,7 @@ def _queue_worker_loop(self, queue: NamedQueue, stop_event: threading.Event) -> def stop(self) -> None: """Stop QueueManager and release resources.""" + global _instance if not self._started: return @@ -152,6 +153,10 @@ def stop(self) -> None: self._agfs = None self._queues.clear() self._started = False + + if _instance is self: + _instance = None + logger.info("[QueueManager] Stopped") def is_running(self) -> bool: diff --git a/openviking/storage/vikingdb_manager.py b/openviking/storage/vikingdb_manager.py index 59f9b8f3..3c9e8fa1 100644 --- a/openviking/storage/vikingdb_manager.py +++ b/openviking/storage/vikingdb_manager.py @@ -58,6 +58,7 @@ def __init__( self._queue_manager = None self._embedding_handler = None self._semantic_processor = None + self._closing = False # Initialize queue manager if AGFS URL is provided self._init_queue_manager() @@ -114,8 +115,9 @@ def _init_semantic_queue(self): async def close(self) -> None: """Close storage connection and release resources, including queue manager.""" + self._closing = True try: - # First stop the queue manager to prevent new operations + # Close should stop queue processing immediately. if self._queue_manager: self._queue_manager.stop() self._queue_manager = None @@ -127,6 +129,11 @@ async def close(self) -> None: except Exception as e: logger.error(f"Error closing VikingDB manager: {e}") + @property + def is_closing(self) -> bool: + """Whether the manager is in shutdown flow.""" + return self._closing + # ========================================================================= # Queue Management Properties # =========================================================================