Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions openviking/service/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,14 @@ async def close(self) -> None:
self._transaction_manager.stop()
self._transaction_manager = None

if self._agfs_manager:
self._agfs_manager.stop()
self._agfs_manager = None

if self._vikingdb_manager:
await self._vikingdb_manager.close()
self._vikingdb_manager = None

if self._agfs_manager:
self._agfs_manager.stop()
self._agfs_manager = None

self._viking_fs = None
self._resource_processor = None
self._skill_processor = None
Expand Down
11 changes: 10 additions & 1 deletion openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from openviking.models.embedder.base import EmbedResult
from openviking.storage.queuefs.embedding_msg import EmbeddingMsg
from openviking.storage.queuefs.named_queue import DequeueHandlerBase
from openviking.storage.vikingdb_interface import VikingDBInterface
from openviking.storage.vikingdb_interface import CollectionNotFoundError, VikingDBInterface
from openviking.utils import get_logger
from openviking.utils.config.open_viking_config import OpenVikingConfig

Expand Down Expand Up @@ -165,6 +165,15 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
logger.debug(
f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}"
)
except CollectionNotFoundError as db_err:
# During shutdown, queue workers may finish one dequeued item.
if getattr(self._vikingdb, "is_closing", False):
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self.report_success()
return None
logger.error(f"Failed to write to vector database: {db_err}")
self.report_error(str(db_err), data)
return None
except Exception as db_err:
logger.error(f"Failed to write to vector database: {db_err}")
import traceback
Expand Down
5 changes: 5 additions & 0 deletions openviking/storage/queuefs/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def _queue_worker_loop(self, queue: NamedQueue, stop_event: threading.Event) ->

def stop(self) -> None:
"""Stop QueueManager and release resources."""
global _instance
if not self._started:
return

Expand All @@ -152,6 +153,10 @@ def stop(self) -> None:
self._agfs = None
self._queues.clear()
self._started = False

if _instance is self:
_instance = None

logger.info("[QueueManager] Stopped")

def is_running(self) -> bool:
Expand Down
9 changes: 8 additions & 1 deletion openviking/storage/vikingdb_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
self._queue_manager = None
self._embedding_handler = None
self._semantic_processor = None
self._closing = False

# Initialize queue manager if AGFS URL is provided
self._init_queue_manager()
Expand Down Expand Up @@ -114,8 +115,9 @@ def _init_semantic_queue(self):

async def close(self) -> None:
"""Close storage connection and release resources, including queue manager."""
self._closing = True
try:
# First stop the queue manager to prevent new operations
# Close should stop queue processing immediately.
if self._queue_manager:
self._queue_manager.stop()
self._queue_manager = None
Expand All @@ -127,6 +129,11 @@ async def close(self) -> None:
except Exception as e:
logger.error(f"Error closing VikingDB manager: {e}")

@property
def is_closing(self) -> bool:
"""Whether the manager is in shutdown flow."""
return self._closing

# =========================================================================
# Queue Management Properties
# =========================================================================
Expand Down
Loading