diff --git a/docs/en/concepts/05-storage.md b/docs/en/concepts/05-storage.md index 0c14986b..74ada850 100644 --- a/docs/en/concepts/05-storage.md +++ b/docs/en/concepts/05-storage.md @@ -137,6 +137,7 @@ index_meta = { | `local` | Local persistence | | `http` | HTTP remote service | | `volcengine` | Volcengine VikingDB | +| `oceanbase` | OceanBase vector database | ## Vector Synchronization diff --git a/docs/en/guides/01-configuration.md b/docs/en/guides/01-configuration.md index 52a55a91..46ddabec 100644 --- a/docs/en/guides/01-configuration.md +++ b/docs/en/guides/01-configuration.md @@ -546,7 +546,7 @@ Vector database storage configuration | Parameter | Type | Description | Default | |-----------|------|-------------|---------| -| `backend` | str | VectorDB backend type: 'local' (file-based), 'http' (remote service), 'volcengine' (cloud VikingDB), or 'vikingdb' (private deployment) | "local" | +| `backend` | str | VectorDB backend: 'local', 'http', 'volcengine', 'vikingdb', or 'oceanbase' (OceanBase) | "local" | | `name` | str | VectorDB collection name | "context" | | `url` | str | Remote service URL for 'http' type (e.g., 'http://localhost:5000') | null | | `project_name` | str | Project name (alias project) | "default" | @@ -555,6 +555,7 @@ Vector database storage configuration | `sparse_weight` | float | Sparse weight for hybrid vector search, only effective when using hybrid index | 0.0 | | `volcengine` | object | 'volcengine' type VikingDB configuration | - | | `vikingdb` | object | 'vikingdb' type private deployment configuration | - | +| `oceanbase` | object | OceanBase config when backend is 'oceanbase' | - | Default local mode ``` @@ -588,6 +589,29 @@ Supports cloud-deployed VikingDB on Volcengine ``` +
+OceanBase (oceanbase) +Use [OceanBase](https://www.oceanbase.com/) as the vector backend. See [OceanBase integration guide](./06-oceanbase-integration.md) for installation and configuration. + +```json +{ + "storage": { + "vectordb": { + "name": "context", + "backend": "oceanbase", + "distance_metric": "cosine", + "oceanbase": { + "uri": "127.0.0.1:2881", + "user": "root@test", + "password": "your-password", + "db_name": "test" + } + } + } +} +``` +
+ ## Config Files diff --git a/docs/en/guides/06-oceanbase-integration.md b/docs/en/guides/06-oceanbase-integration.md new file mode 100644 index 00000000..5c181575 --- /dev/null +++ b/docs/en/guides/06-oceanbase-integration.md @@ -0,0 +1,274 @@ +# OpenViking and OceanBase Integration + +This guide describes how to use [OceanBase](https://www.oceanbase.com/) as the vector store backend for OpenViking, with **official step-by-step tutorials** you can run from scratch. + +--- + +## Overview + +- Connection is via [pyobvector](https://github.com/oceanbase/pyobvector) and OceanBase’s vector tables and HNSW index. +- Set `storage.vectordb.backend` to `"oceanbase"` and fill in the `oceanbase` connection block in config. +- Suitable when you want context indexes in a relational/HTAP database or to reuse an existing OceanBase deployment. + +**Prerequisites**: OceanBase 4.3.3.0+ (with vector type and index support); install `pyobvector` or `openviking[oceanbase]`. + +--- + +## Installation and configuration + +### Install + +```bash +# Option 1: pyobvector only (if openviking is already installed) +pip install pyobvector + +# Option 2: openviking with oceanbase extra (recommended) +pip install openviking[oceanbase] +``` + +### Configuration example (ov.conf) + +In `~/.openviking/ov.conf`, set `storage.vectordb` and the `oceanbase` block: + +```json +{ + "storage": { + "vectordb": { + "name": "context", + "backend": "oceanbase", + "distance_metric": "l2", + "oceanbase": { + "uri": "127.0.0.1:2881", + "user": "root@test", + "password": "", + "db_name": "openviking" + } + } + } +} +``` + +> **Note**: When using Docker OceanBase (slim mode), set `distance_metric` to `"l2"`. For OceanBase versions that support cosine, you can use `"cosine"`. See [Configuration](./01-configuration.md#vectordb) for full options. + +OpenViking creates the collection and index on first write or server start; no manual table creation is required. + +--- + +## Tutorial 1: Run OpenViking + OceanBase in 5 minutes + +This tutorial mirrors the [Quick Start](../getting-started/02-quickstart.md): from starting OceanBase to performing one semantic search, with runnable steps and expected output. + +### Step 1: Start OceanBase (Docker) + +If you don’t have OceanBase installed, start a single-node instance with Docker (first run may pull the image; boot can take a few minutes): + +```bash +# Start container (port 2881) +docker run -d -p 2881:2881 --name oceanbase-ce -e MODE=slim oceanbase/oceanbase-ce + +# Wait until boot completes (look for "boot success!" in logs) +docker logs oceanbase-ce 2>&1 | tail -5 + +# Create database (root@test tenant) +docker exec -it oceanbase-ce mysql -h127.0.0.1 -P2881 -uroot@test -e "CREATE DATABASE IF NOT EXISTS openviking;" +``` + +If you use an existing OceanBase instance, ensure the database in `oceanbase.db_name` exists and that the host can connect with the given user and password. + +### Step 2: Prepare configuration + +Ensure `~/.openviking/ov.conf` includes **embedding** (same as [Quick Start](../getting-started/02-quickstart.md)) and **storage.vectordb (OceanBase)**. Minimal example (replace embedding api_key, model, etc. with your values): + +```json +{ + "embedding": { + "dense": { + "api_base": "", + "api_key": "", + "provider": "", + "dimension": 1024, + "model": "" + } + }, + "storage": { + "vectordb": { + "name": "context", + "backend": "oceanbase", + "distance_metric": "l2", + "oceanbase": { + "uri": "127.0.0.1:2881", + "user": "root@test", + "password": "", + "db_name": "openviking" + } + } + } +} +``` + +Full examples per provider are in [Configuration guide - Examples](./01-configuration.md#configuration-examples). + +### Step 3: Create the example script + +Create `example_oceanbase.py` (same flow as Quick Start, with vector store set to OceanBase): + +```python +import openviking as ov + +# Uses default config ~/.openviking/ov.conf (vectordb backend = oceanbase) +client = ov.OpenViking(path="./data") + +try: + client.initialize() + + # Add resource (URL, file, or directory) + add_result = client.add_resource( + path="https://raw.githubusercontent.com/volcengine/OpenViking/refs/heads/main/README.md" + ) + root_uri = add_result["root_uri"] + + # List resource structure + ls_result = client.ls(root_uri) + print(f"Directory structure:\n{ls_result}\n") + + # Wait for semantic processing (vectors written to OceanBase) + print("Waiting for semantic processing...") + client.wait_processed() + + # Get abstract and overview + abstract = client.abstract(root_uri) + overview = client.overview(root_uri) + print(f"Abstract:\n{abstract}\n\nOverview:\n{overview}\n") + + # Semantic search (vector search is performed against OceanBase) + results = client.find("what is openviking", target_uri=root_uri) + print("Search results:") + for r in results.resources: + print(f" {r.uri} (score: {r.score:.4f})") + + client.close() + +except Exception as e: + print(f"Error: {e}") +``` + +### Step 4: Run + +```bash +python example_oceanbase.py +``` + +### Step 5: Expected output + +``` +Directory structure: +... + +Waiting for semantic processing... +Abstract: +... + +Overview: +... + +Search results: + viking://resources/... (score: 0.xxxx) + ... +``` + +You have now run OpenViking with OceanBase as the vector store. Content is still stored in local AGFS (`path="./data"`); vectors and metadata are stored in OceanBase. + +--- + +## Tutorial 2: Enterprise knowledge base (batch import + scoped search) + +This tutorial shows: importing multiple resources, then searching by natural language within a URI prefix—suitable for internal docs, Wiki, or knowledge bases. + +### Step 1: Prepare content and config + +- OceanBase is running and the database exists (same as Tutorial 1). +- `ov.conf` has `storage.vectordb.backend` set to `oceanbase` and valid embedding and oceanbase connection settings. + +### Step 2: Batch import and search + +Create `example_knowledge_base.py`: + +```python +import openviking as ov + +client = ov.OpenViking(path="./data") +client.initialize() + +# Batch add resources (local directory or URL) +client.add_resource("/path/to/your/wiki") # local directory +client.add_resource("https://example.com/doc.md") # or URL +client.wait_processed() + +# Semantic search within a URI prefix (useful for multi-tenant or per-project scope) +results = client.find( + "user login and authentication flow", + target_uri="viking://resources/", + limit=5 +) + +print(f"Total: {results.total} result(s)") +for ctx in results.resources: + print(f" {ctx.uri}") + print(f" score={ctx.score:.3f} abstract={ctx.abstract[:80]}...") + print() + +client.close() +``` + +Replace `/path/to/your/wiki` with your doc root or remove that line to use only the URL. Then run: + +```bash +python example_knowledge_base.py +``` + +Using `target_uri="viking://resources/"` limits search to the resource tree; different tenants can use different prefixes (e.g. `viking://resources/tenant-a/`) for logical isolation. + +--- + +## Docker quick reference + +| Step | Command | +|------|---------| +| Start OceanBase | `docker run -d -p 2881:2881 --name oceanbase-ce -e MODE=slim oceanbase/oceanbase-ce` | +| Wait for ready | `docker logs oceanbase-ce 2>&1 \| tail -1` until you see `boot success!` | +| Create DB | `docker exec -it oceanbase-ce mysql -h127.0.0.1 -P2881 -uroot@test -e "CREATE DATABASE IF NOT EXISTS openviking;"` | + +Set `oceanbase.uri: "127.0.0.1:2881"` and `oceanbase.db_name: "openviking"` in config. + +--- + +## Distance metrics and versions + +| distance_metric | Description | +|----------------|-------------| +| `cosine` | Mapped to neg_ip where supported by your OceanBase version | +| `l2` / `ip` | Map directly to OceanBase L2 / IP distance | + +If you see "this type of vector index distance algorithm is not supported", set `distance_metric` to `"l2"` (recommended for Docker slim mode). + +--- + +## Running integration tests + +OceanBase tests in this repo start OceanBase via Docker by default; no local OceanBase installation is required: + +```bash +# Requires Docker; will pull and start oceanbase/oceanbase-ce +pytest tests/vectordb/test_oceanbase_live.py -v -s +# or +python -m unittest tests.vectordb.test_oceanbase_live -v +``` + +--- + +## See also + +- [Configuration](./01-configuration.md) — `storage.vectordb` and common options for all backends +- [Storage architecture](../concepts/05-storage.md) — role of the vector store in OpenViking +- [Quick Start](../getting-started/02-quickstart.md) — get started with OpenViking (default local vector store) +- [OceanBase integration (Chinese)](../../zh/guides/06-oceanbase-integration.md) — 中文版本文档 diff --git a/docs/zh/concepts/05-storage.md b/docs/zh/concepts/05-storage.md index 495bdc38..3fa4256d 100644 --- a/docs/zh/concepts/05-storage.md +++ b/docs/zh/concepts/05-storage.md @@ -135,6 +135,7 @@ index_meta = { | `local` | 本地持久化 | | `http` | HTTP 远程服务 | | `volcengine` | 火山引擎 VikingDB | +| `oceanbase` | OceanBase 向量库 | ## 向量同步 diff --git a/docs/zh/guides/01-configuration.md b/docs/zh/guides/01-configuration.md index 1c83eb85..190e52f0 100644 --- a/docs/zh/guides/01-configuration.md +++ b/docs/zh/guides/01-configuration.md @@ -520,7 +520,7 @@ AST 提取支持:Python、JavaScript/TypeScript、Rust、Go、Java、C/C++。 | 参数 | 类型 | 说明 | 默认值 | |------|------|------|--------| -| `backend` | str | VectorDB 后端类型: 'local'(基于文件), 'http'(远程服务), 'volcengine'(云上VikingDB)或 'vikingdb'(私有部署) | "local" | +| `backend` | str | VectorDB 后端类型: 'local'(基于文件), 'http'(远程服务), 'volcengine'(云上VikingDB), 'vikingdb'(私有部署), 'oceanbase'(OceanBase) | "local" | | `name` | str | VectorDB 的集合名称 | "context" | | `url` | str | 'http' 类型的远程服务 URL(例如 'http://localhost:5000') | null | | `project_name` | str | 项目名称(别名 project) | "default" | @@ -529,6 +529,7 @@ AST 提取支持:Python、JavaScript/TypeScript、Rust、Go、Java、C/C++。 | `sparse_weight` | float | 混合向量搜索的稀疏权重,仅在使用混合索引时生效 | 0.0 | | `volcengine` | object | 'volcengine' 类型的 VikingDB 配置 | - | | `vikingdb` | object | 'vikingdb' 类型的私有部署配置 | - | +| `oceanbase` | object | 'oceanbase' 类型的 OceanBase 配置 | - | 默认使用本地模式 ``` @@ -562,7 +563,28 @@ AST 提取支持:Python、JavaScript/TypeScript、Rust、Go、Java、C/C++。 ``` +
+OceanBase (oceanbase) +使用 [OceanBase](https://www.oceanbase.com/) 作为向量库后端。安装与配置详见 [OceanBase 集成指南](./06-oceanbase-integration.md)。 +```json +{ + "storage": { + "vectordb": { + "name": "context", + "backend": "oceanbase", + "distance_metric": "cosine", + "oceanbase": { + "uri": "127.0.0.1:2881", + "user": "root@test", + "password": "your-password", + "db_name": "test" + } + } + } +} +``` +
## 配置文件 diff --git a/docs/zh/guides/06-oceanbase-integration.md b/docs/zh/guides/06-oceanbase-integration.md new file mode 100644 index 00000000..4e47fc66 --- /dev/null +++ b/docs/zh/guides/06-oceanbase-integration.md @@ -0,0 +1,274 @@ +# OpenViking 与 OceanBase 集成指南 + +本文介绍如何将 [OceanBase](https://www.oceanbase.com/) 作为 OpenViking 的向量库后端,并**通过官方实战示例**从零跑通语义检索。 + +--- + +## 一、集成概述 + +- 通过 [pyobvector](https://github.com/oceanbase/pyobvector) 连接 OceanBase,使用其向量表与 HNSW 索引。 +- 在配置中设置 `storage.vectordb.backend = "oceanbase"` 并填写 `oceanbase` 连接块即可切换后端。 +- 适用于希望将上下文索引落在关系型/HTAP 数据库或复用现有 OceanBase 的场景。 + +**前置条件**:OceanBase 4.3.3.0+(支持向量类型与向量索引);需安装 `pyobvector` 或 `openviking[oceanbase]`。 + +--- + +## 二、安装与配置 + +### 2.1 安装 + +```bash +# 方式一:仅安装 pyobvector(若已安装 openviking) +pip install pyobvector + +# 方式二:安装 openviking 时一并安装 OceanBase 依赖(推荐) +pip install openviking[oceanbase] +``` + +### 2.2 配置示例(ov.conf) + +在 `~/.openviking/ov.conf` 中增加或修改 `storage.vectordb`,并填写 `oceanbase` 连接信息: + +```json +{ + "storage": { + "vectordb": { + "name": "context", + "backend": "oceanbase", + "distance_metric": "l2", + "oceanbase": { + "uri": "127.0.0.1:2881", + "user": "root@test", + "password": "", + "db_name": "openviking" + } + } + } +} +``` + +> **说明**:若使用 Docker 启动的 OceanBase(slim 模式),建议将 `distance_metric` 设为 `"l2"`;若使用支持 cosine 的 OceanBase 版本,可设为 `"cosine"`。完整配置项见 [配置说明](./01-configuration.md#vectordb)。 + +首次写入或启动服务时,OpenViking 会按 Context Schema 自动建表与索引,无需手动建表。 + +--- + +## 三、实战示例一:5 分钟跑通 OpenViking + OceanBase + +本示例与 [快速开始](../getting-started/02-quickstart.md) 风格一致:从启动 OceanBase 到完成一次语义检索,全程可复现。 + +### 步骤 1:启动 OceanBase(Docker) + +若本机尚未安装 OceanBase,可使用 Docker 快速启动单机实例(首次会拉取镜像,启动约需数分钟): + +```bash +# 启动容器(端口 2881) +docker run -d -p 2881:2881 --name oceanbase-ce -e MODE=slim oceanbase/oceanbase-ce + +# 等待启动完成(日志出现 "boot success!") +docker logs oceanbase-ce 2>&1 | tail -5 + +# 创建数据库(root@test 租户) +docker exec -it oceanbase-ce mysql -h127.0.0.1 -P2881 -uroot@test -e "CREATE DATABASE IF NOT EXISTS openviking;" +``` + +若使用已有 OceanBase,请确保已创建 `ov.conf` 中 `oceanbase.db_name` 对应的数据库,并保证网络与账号权限可用。 + +### 步骤 2:准备配置文件 + +确保 `~/.openviking/ov.conf` 中已配置 **embedding**(与 [快速开始](../getting-started/02-quickstart.md) 相同)和 **storage.vectordb(OceanBase)**。以下为最小示例(请将 embedding 的 api_key、model 等替换为实际值): + +```json +{ + "embedding": { + "dense": { + "api_base": "", + "api_key": "", + "provider": "", + "dimension": 1024, + "model": "" + } + }, + "storage": { + "vectordb": { + "name": "context", + "backend": "oceanbase", + "distance_metric": "l2", + "oceanbase": { + "uri": "127.0.0.1:2881", + "user": "root@test", + "password": "", + "db_name": "openviking" + } + } + } +} +``` + +各模型服务的完整配置见 [配置指南 - 配置示例](./01-configuration.md#配置示例)。 + +### 步骤 3:创建示例脚本 + +创建 `example_oceanbase.py`,内容如下(与快速开始示例一致,仅向量库改为 OceanBase): + +```python +import openviking as ov + +# 使用默认配置 ~/.openviking/ov.conf(其中 vectordb 已设为 oceanbase) +client = ov.OpenViking(path="./data") + +try: + client.initialize() + + # 添加资源(支持 URL、本地文件或目录) + add_result = client.add_resource( + path="https://raw.githubusercontent.com/volcengine/OpenViking/refs/heads/main/README.md" + ) + root_uri = add_result["root_uri"] + + # 查看资源结构 + ls_result = client.ls(root_uri) + print(f"目录结构:\n{ls_result}\n") + + # 等待语义处理完成(向量将写入 OceanBase) + print("等待语义处理...") + client.wait_processed() + + # 获取摘要与概览 + abstract = client.abstract(root_uri) + overview = client.overview(root_uri) + print(f"摘要:\n{abstract}\n\n概览:\n{overview}\n") + + # 语义检索(底层从 OceanBase 做向量搜索) + results = client.find("what is openviking", target_uri=root_uri) + print("检索结果:") + for r in results.resources: + print(f" {r.uri} (score: {r.score:.4f})") + + client.close() + +except Exception as e: + print(f"错误: {e}") +``` + +### 步骤 4:运行 + +```bash +python example_oceanbase.py +``` + +### 步骤 5:预期输出 + +``` +目录结构: +... + +等待语义处理... +摘要: +... + +概览: +... + +检索结果: + viking://resources/... (score: 0.xxxx) + ... +``` + +至此,你已用 OceanBase 作为向量库完成 OpenViking 的首次语义检索。内容仍存储在本地 AGFS(`path="./data"`),向量与元数据存储在 OceanBase 中。 + +--- + +## 四、实战示例二:企业知识库(批量导入 + 按范围检索) + +本示例演示:将多个资源导入后,通过自然语言在指定 URI 范围内检索,适合企业知识库、Wiki 等场景。 + +### 步骤 1:准备内容与配置 + +- 确保 OceanBase 已启动并已创建数据库(同实战示例一)。 +- 确保 `ov.conf` 中 `storage.vectordb.backend` 为 `oceanbase`,且 embedding、oceanbase 连接已配置。 + +### 步骤 2:批量导入并检索 + +创建 `example_knowledge_base.py`: + +```python +import openviking as ov + +client = ov.OpenViking(path="./data") +client.initialize() + +# 批量添加资源(本地目录或 URL) +client.add_resource("/path/to/your/wiki") # 本地目录 +client.add_resource("https://example.com/doc.md") # 或 URL +client.wait_processed() + +# 在指定 URI 前缀下做语义检索(多租户/多项目时可限定范围) +results = client.find( + "用户登录与鉴权流程", + target_uri="viking://resources/", + limit=5 +) + +print(f"共 {results.total} 条相关结果") +for ctx in results.resources: + print(f" {ctx.uri}") + print(f" score={ctx.score:.3f} abstract={ctx.abstract[:80]}...") + print() + +client.close() +``` + +将 `/path/to/your/wiki` 替换为实际文档目录或删除该行仅用 URL。运行: + +```bash +python example_knowledge_base.py +``` + +通过 `target_uri="viking://resources/"` 可限定只在资源树下检索;不同业务可约定不同 URI 前缀(如 `viking://resources/tenant-a/`)实现逻辑隔离。 + +--- + +## 五、Docker 快速启动参考 + +| 步骤 | 命令 | +|------|------| +| 启动 OceanBase | `docker run -d -p 2881:2881 --name oceanbase-ce -e MODE=slim oceanbase/oceanbase-ce` | +| 等待就绪 | `docker logs oceanbase-ce 2>&1 \| tail -1` 出现 `boot success!` | +| 建库 | `docker exec -it oceanbase-ce mysql -h127.0.0.1 -P2881 -uroot@test -e "CREATE DATABASE IF NOT EXISTS openviking;"` | + +配置中填写 `oceanbase.uri: "127.0.0.1:2881"`、`oceanbase.db_name: "openviking"` 即可。 + +--- + +## 六、距离度量与版本 + +| distance_metric | 说明 | +|-----------------|------| +| `cosine` | 在支持的 OceanBase 版本中会映射为 neg_ip | +| `l2` / `ip` | 直接对应 OceanBase 的 L2 / IP 距离 | + +若报错「this type of vector index distance algorithm is not supported」,请将 `distance_metric` 改为 `"l2"` 后重试(Docker slim 模式建议使用 `l2`)。 + +--- + +## 七、运行集成测试 + +本仓库内 OceanBase 相关测试**默认通过 Docker 启动 OceanBase**,无需本机预先安装: + +```bash +# 需已安装 Docker;会自动拉取并启动 oceanbase/oceanbase-ce +pytest tests/vectordb/test_oceanbase_live.py -v -s +# 或 +python -m unittest tests.vectordb.test_oceanbase_live -v +``` + +--- + +## 八、相关文档 + +- [配置说明](./01-configuration.md) — `storage.vectordb` 与各后端通用参数 +- [存储架构](../concepts/05-storage.md) — 向量库在 OpenViking 中的角色 +- [快速开始](../getting-started/02-quickstart.md) — 5 分钟上手 OpenViking(默认本地向量库) +- [OceanBase 集成(英文)](../../en/guides/06-oceanbase-integration.md) — 英文版本文档 diff --git a/openviking/storage/vectordb_adapters/__init__.py b/openviking/storage/vectordb_adapters/__init__.py index 8446b64a..04de30e2 100644 --- a/openviking/storage/vectordb_adapters/__init__.py +++ b/openviking/storage/vectordb_adapters/__init__.py @@ -6,6 +6,7 @@ from .factory import create_collection_adapter from .http_adapter import HttpCollectionAdapter from .local_adapter import LocalCollectionAdapter +from .oceanbase_adapter import OceanBaseCollectionAdapter from .vikingdb_private_adapter import VikingDBPrivateCollectionAdapter from .volcengine_adapter import VolcengineCollectionAdapter @@ -15,5 +16,6 @@ "HttpCollectionAdapter", "VolcengineCollectionAdapter", "VikingDBPrivateCollectionAdapter", + "OceanBaseCollectionAdapter", "create_collection_adapter", ] diff --git a/openviking/storage/vectordb_adapters/factory.py b/openviking/storage/vectordb_adapters/factory.py index 6c11954c..fbff1363 100644 --- a/openviking/storage/vectordb_adapters/factory.py +++ b/openviking/storage/vectordb_adapters/factory.py @@ -7,6 +7,7 @@ from .base import CollectionAdapter from .http_adapter import HttpCollectionAdapter from .local_adapter import LocalCollectionAdapter +from .oceanbase_adapter import OceanBaseCollectionAdapter from .vikingdb_private_adapter import VikingDBPrivateCollectionAdapter from .volcengine_adapter import VolcengineCollectionAdapter @@ -15,6 +16,7 @@ "http": HttpCollectionAdapter, "volcengine": VolcengineCollectionAdapter, "vikingdb": VikingDBPrivateCollectionAdapter, + "oceanbase": OceanBaseCollectionAdapter, } diff --git a/openviking/storage/vectordb_adapters/oceanbase_adapter.py b/openviking/storage/vectordb_adapters/oceanbase_adapter.py new file mode 100644 index 00000000..160b358e --- /dev/null +++ b/openviking/storage/vectordb_adapters/oceanbase_adapter.py @@ -0,0 +1,556 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""OceanBase vector database adapter (via pyobvector).""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from openviking.storage.vectordb_adapters.base import CollectionAdapter +from openviking.storage.vectordb.collection.collection import Collection, ICollection +from openviking.storage.vectordb.collection.result import ( + AggregateResult, + DataItem, + FetchDataInCollectionResult, + SearchItemResult, + SearchResult, +) +from openviking_cli.utils import get_logger + +logger = get_logger(__name__) + +# Optional import: pyobvector is required only when backend is "oceanbase" +try: + from pyobvector.client.milvus_like_client import MilvusLikeClient + from pyobvector.client.collection_schema import CollectionSchema, FieldSchema + from pyobvector.client.schema_type import DataType + from pyobvector.client.index_param import IndexParam, IndexParams, VecIndexType + + _PYOBVECTOR_AVAILABLE = True +except ImportError: + _PYOBVECTOR_AVAILABLE = False + MilvusLikeClient = None # type: ignore + CollectionSchema = None # type: ignore + FieldSchema = None # type: ignore + DataType = None # type: ignore + IndexParam = None # type: ignore + IndexParams = None # type: ignore + VecIndexType = None # type: ignore + + +def _openviking_field_to_field_schema(field: Dict[str, Any], vector_dim: int) -> FieldSchema: + """Convert OpenViking Fields entry to pyobvector FieldSchema.""" + name = field.get("FieldName", "") + typ = (field.get("FieldType") or "").lower() + is_primary = bool(field.get("IsPrimaryKey", False)) + + if typ == "string" or typ == "path": + return FieldSchema(name, DataType.VARCHAR, is_primary=is_primary, max_length=4096) + if typ == "int64": + return FieldSchema(name, DataType.INT64, is_primary=is_primary) + if typ == "vector": + dim = field.get("Dim") or vector_dim + return FieldSchema(name, DataType.FLOAT_VECTOR, is_primary=False, dim=dim) + if typ == "sparse_vector": + return FieldSchema(name, DataType.SPARSE_FLOAT_VECTOR, is_primary=False) + if typ == "date_time": + return FieldSchema(name, DataType.INT64, is_primary=False) + # default + return FieldSchema(name, DataType.VARCHAR, is_primary=is_primary, max_length=4096) + + +def _build_oceanbase_schema(meta: Dict[str, Any], vector_dim: int) -> CollectionSchema: + """Build pyobvector CollectionSchema from OpenViking collection meta.""" + if not _PYOBVECTOR_AVAILABLE or CollectionSchema is None or FieldSchema is None: + raise RuntimeError("pyobvector is required for OceanBase backend. Install with: pip install pyobvector") + + fields_meta = meta.get("Fields", []) + vector_dim = meta.get("Dimension") or vector_dim + fields = [] + for f in fields_meta: + if f.get("FieldName") == "AUTO_ID": + continue + fs = _openviking_field_to_field_schema(f, vector_dim) + fields.append(fs) + return CollectionSchema(fields=fields) + + +def _distance_to_metric(distance: str) -> str: + """Map OpenViking distance_metric to pyobvector metric_type.""" + d = (distance or "cosine").lower() + if d in ("l2", "ip", "cosine", "neg_ip"): + return "neg_ip" if d == "cosine" else d # OceanBase cosine often as neg_ip + return "l2" + + +class OceanBaseCollection(ICollection): + """ICollection implementation backed by OceanBase via pyobvector MilvusLikeClient.""" + + def __init__( + self, + client: Any, + collection_name: str, + meta_data: Dict[str, Any], + distance_metric: str = "cosine", + ): + if not _PYOBVECTOR_AVAILABLE: + raise RuntimeError("pyobvector is required for OceanBase backend. Install with: pip install pyobvector") + self._client = client + self._collection_name = collection_name + self._meta_data = dict(meta_data) + self._distance_metric = _distance_to_metric(distance_metric) + self._vector_dim = self._meta_data.get("Dimension", 0) + + def _table(self): + return self._client.load_table(self._collection_name) + + def _filter_to_where(self, filters: Optional[Dict[str, Any]]): + """Convert OpenViking filter dict to SQLAlchemy where clause list for pyobvector.""" + if not filters: + return None + from sqlalchemy import and_, or_ + + table = self._table() + + def walk(expr): + if not isinstance(expr, dict): + return None + op = expr.get("op") + if op == "must": + field = expr.get("field") + conds = expr.get("conds", []) + if field and conds is not None and field in table.c: + return table.c[field].in_(conds) + elif op == "range": + field = expr.get("field") + if field not in table.c: + return None + col = table.c[field] + parts = [] + if "gte" in expr: + parts.append(col >= expr["gte"]) + if "gt" in expr: + parts.append(col > expr["gt"]) + if "lte" in expr: + parts.append(col <= expr["lte"]) + if "lt" in expr: + parts.append(col < expr["lt"]) + return and_(*parts) if parts else None + elif op == "and": + sub = [walk(c) for c in expr.get("conds", [])] + sub = [s for s in sub if s is not None] + return and_(*sub) if sub else None + elif op == "or": + sub = [walk(c) for c in expr.get("conds", [])] + sub = [s for s in sub if s is not None] + return or_(*sub) if sub else None + return None + + clause = walk(filters) + return [clause] if clause is not None else None + + def update(self, fields: Optional[Dict[str, Any]] = None, description: Optional[str] = None): + if fields: + self._meta_data.update(fields) + + def get_meta_data(self) -> Dict[str, Any]: + return dict(self._meta_data) + + def close(self): + try: + if hasattr(self._client, "engine") and self._client.engine: + self._client.engine.dispose() + except Exception as e: + logger.warning("OceanBaseCollection close: %s", e) + + def drop(self): + self._client.drop_collection(self._collection_name) + + # "default" is reserved in OceanBase; use a safe index name when creating index + _OB_INDEX_NAME_FOR_DEFAULT = "ov_vector_idx" + + def create_index(self, index_name: str, meta_data: Dict[str, Any]) -> Any: + vec_meta = (meta_data.get("VectorIndex") or {}) + distance = vec_meta.get("Distance", "l2") + metric = _distance_to_metric(distance) + field_name = "vector" + ob_index_name = self._OB_INDEX_NAME_FOR_DEFAULT if index_name == "default" else index_name + index_params = IndexParams() + index_params.add_index( + field_name=field_name, + index_type=VecIndexType.HNSW, + index_name=ob_index_name, + metric_type=metric, + ) + self._client.create_index(self._collection_name, index_params) + return None + + def has_index(self, index_name: str) -> bool: + try: + table = self._table() + from sqlalchemy import inspect + insp = inspect(self._client.engine) + idxs = insp.get_indexes(self._collection_name) + ob_name = self._OB_INDEX_NAME_FOR_DEFAULT if index_name == "default" else index_name + return any(idx.get("name") == ob_name for idx in idxs) + except Exception: + return False + + def get_index(self, index_name: str) -> Optional[Any]: + return None if not self.has_index(index_name) else object() + + def search_by_vector( + self, + index_name: str, + dense_vector: Optional[List[float]] = None, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + sparse_vector: Optional[Dict[str, float]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + flter = self._filter_to_where(filters) + data = dense_vector if dense_vector is not None else sparse_vector + if data is None: + return SearchResult(data=[]) + fetch_limit = limit + offset if offset else limit + search_params = {"metric_type": self._distance_metric} + try: + rows = self._client.search( + self._collection_name, + data=data, + anns_field="vector", + with_dist=True, + flter=flter, + limit=fetch_limit, + output_fields=output_fields, + search_params=search_params, + ) + except Exception as e: + logger.warning("OceanBase search failed: %s", e) + return SearchResult(data=[]) + items = [] + for i, row in enumerate(rows): + if offset and i < offset: + continue + if len(items) >= limit: + break + score = ( + row.get("score") + or row.get("l2_distance") + or row.get("inner_product") + or row.get("cosine_distance") + ) + if score is None and isinstance(row, dict) and len(row) > 0: + # pyobvector may put distance as last column + last_val = list(row.values())[-1] + if isinstance(last_val, (int, float)): + score = last_val + score = float(score) if score is not None else 0.0 + pk = self._meta_data.get("PrimaryKey", "id") + row_id = row.get(pk) or row.get("id") + fields = { + k: v + for k, v in row.items() + if k not in (pk, "id", "score", "l2_distance", "inner_product", "cosine_distance") + and not (isinstance(k, str) and "distance" in k.lower()) + } + items.append(SearchItemResult(id=row_id, fields=fields, score=score)) + return SearchResult(data=items) + + def search_by_keywords( + self, + index_name: str, + keywords: Optional[List[str]] = None, + query: Optional[str] = None, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + # Not supported without vectorizer; delegate to random with filter + return self.search_by_random(index_name, limit, offset, filters, output_fields) + + def search_by_id( + self, + index_name: str, + id: Any, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + ids = [id] if not isinstance(id, (list, tuple)) else list(id) + rows = self._client.get(self._collection_name, ids=ids, output_fields=output_fields) + if not rows: + return SearchResult(data=[]) + row = rows[0] + vec = row.get("vector") + if vec is not None: + return self.search_by_vector( + index_name, dense_vector=vec, limit=limit, offset=offset, + filters=filters, output_fields=output_fields, + ) + return SearchResult(data=[]) + + def search_by_multimodal( + self, + index_name: str, + text: Optional[str] = None, + image: Optional[Any] = None, + video: Optional[Any] = None, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + return self.search_by_random(index_name, limit, offset, filters, output_fields) + + def search_by_random( + self, + index_name: str, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + flter = self._filter_to_where(filters) + try: + rows = self._client.query( + self._collection_name, + flter=flter, + output_fields=output_fields, + ) + except Exception as e: + logger.warning("OceanBase query failed: %s", e) + return SearchResult(data=[]) + pk = self._meta_data.get("PrimaryKey", "id") + items = [] + for i, row in enumerate(rows): + if i < offset: + continue + if len(items) >= limit: + break + row_id = row.get(pk) or row.get("id") + fields = {k: v for k, v in row.items() if k != pk} + items.append(SearchItemResult(id=row_id, fields=fields, score=None)) + return SearchResult(data=items) + + def search_by_scalar( + self, + index_name: str, + field: str, + order: Optional[str] = "desc", + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + from sqlalchemy import select, text as sql_text + + table = self._table() + flter = self._filter_to_where(filters) + if field not in table.c: + return SearchResult(data=[]) + order_col = table.c[field] + stmt = select(table).order_by( + order_col.desc() if (order or "desc").lower() == "desc" else order_col.asc() + ).limit(limit + offset).offset(offset) + if flter: + stmt = stmt.where(*flter) + try: + with self._client.engine.connect() as conn: + res = conn.execute(stmt) + rows = [dict(zip(res.keys(), row)) for row in res.fetchall()] + except Exception as e: + logger.warning("OceanBase search_by_scalar failed: %s", e) + return SearchResult(data=[]) + pk = self._meta_data.get("PrimaryKey", "id") + items = [] + for row in rows: + row_id = row.get(pk) or row.get("id") + fields = {k: v for k, v in row.items() if k != pk} + score = row.get(field) + items.append(SearchItemResult(id=row_id, fields=fields, score=score)) + return SearchResult(data=items) + + def update_index( + self, + index_name: str, + scalar_index: Optional[Dict[str, Any]] = None, + description: Optional[str] = None, + ): + pass + + def get_index_meta_data(self, index_name: str) -> Dict[str, Any]: + return {"IndexName": index_name} + + def list_indexes(self) -> List[str]: + try: + from sqlalchemy import inspect + insp = inspect(self._client.engine) + idxs = insp.get_indexes(self._collection_name) + names = [idx.get("name") for idx in idxs if idx.get("name")] + # Expose "default" to callers to match OpenViking convention + if self._OB_INDEX_NAME_FOR_DEFAULT in names and "default" not in names: + names = ["default"] + [n for n in names if n != self._OB_INDEX_NAME_FOR_DEFAULT] + return names or ["default"] + except Exception: + return ["default"] + + def drop_index(self, index_name: str): + ob_name = self._OB_INDEX_NAME_FOR_DEFAULT if index_name == "default" else index_name + self._client.drop_index(self._collection_name, ob_name) + + def _default_for_field_type(self, field: Dict[str, Any]) -> Any: + """Fill default values for required schema fields missing in row (OceanBase has no column default).""" + typ = (field.get("FieldType") or "").lower() + if typ == "sparse_vector": + return {} + if typ in ("int64", "date_time"): + return 0 + if typ in ("string", "path"): + return "" + return None + + def upsert_data(self, data_list: List[Dict[str, Any]], ttl: int = 0): + if not data_list: + return + fields_meta = self._meta_data.get("Fields", []) + for row in data_list: + if "id" in row and row["id"] is not None: + row["id"] = str(row["id"]) + # Fill schema fields missing in row to avoid "Field doesn't have a default value" + for f in fields_meta: + name = f.get("FieldName") + if name and name not in row: + default = self._default_for_field_type(f) + if default is not None: + row[name] = default + self._client.upsert(self._collection_name, data_list) + + def fetch_data(self, primary_keys: List[Any]) -> FetchDataInCollectionResult: + ids = [str(k) for k in primary_keys] + try: + rows = self._client.get(self._collection_name, ids=ids) + except Exception as e: + logger.warning("OceanBase get failed: %s", e) + return FetchDataInCollectionResult(items=[], ids_not_exist=ids) + pk = self._meta_data.get("PrimaryKey", "id") + items = [] + for row in rows: + row_id = row.get(pk) or row.get("id") + fields = {k: v for k, v in row.items() if k != pk} + items.append(DataItem(id=row_id, fields=fields)) + found = {item.id for item in items} + ids_not_exist = [k for k in ids if k not in found] + return FetchDataInCollectionResult(items=items, ids_not_exist=ids_not_exist) + + def delete_data(self, primary_keys: List[Any]): + if not primary_keys: + return + ids = [str(k) for k in primary_keys] + self._client.delete(self._collection_name, ids=ids) + + def delete_all_data(self): + self._client.delete(self._collection_name, ids=None, flter=None) + + def aggregate_data( + self, + index_name: str, + op: str = "count", + field: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + cond: Optional[Dict[str, Any]] = None, + ) -> AggregateResult: + from sqlalchemy import select, func + + table = self._table() + flter = self._filter_to_where(filters) + stmt = select(func.count()).select_from(table) + if flter: + stmt = stmt.where(*flter) + try: + with self._client.engine.connect() as conn: + res = conn.execute(stmt) + total = res.scalar() or 0 + except Exception as e: + logger.warning("OceanBase aggregate failed: %s", e) + return AggregateResult(agg={}, op=op, field=field) + return AggregateResult(agg={"_total": total}, op=op, field=field) + + +class OceanBaseCollectionAdapter(CollectionAdapter): + """Adapter for OceanBase vector database (pyobvector).""" + + def __init__( + self, + collection_name: str, + client: Any, + distance_metric: str = "cosine", + ): + super().__init__(collection_name=collection_name) + self.mode = "oceanbase" + self._client = client + self._distance_metric = distance_metric + + @classmethod + def from_config(cls, config: Any) -> "OceanBaseCollectionAdapter": + if not _PYOBVECTOR_AVAILABLE: + raise RuntimeError( + "OceanBase backend requires pyobvector. Install with: pip install pyobvector" + ) + ob = config.oceanbase + if not ob: + raise ValueError("VectorDB oceanbase backend requires 'oceanbase' config") + client = MilvusLikeClient( + uri=ob.uri, + user=ob.user, + password=ob.password, + db_name=ob.db_name, + ) + return cls( + collection_name=config.name or "context", + client=client, + distance_metric=getattr(config, "distance_metric", "cosine") or "cosine", + ) + + def _load_existing_collection_if_needed(self) -> None: + if self._collection is not None: + return + try: + if self._client.has_collection(self._collection_name): + meta = self._build_meta_from_existing() + self._collection = Collection( + OceanBaseCollection( + client=self._client, + collection_name=self._collection_name, + meta_data=meta, + distance_metric=self._distance_metric, + ) + ) + except Exception as e: + logger.debug("OceanBase load collection %s: %s", self._collection_name, e) + + def _build_meta_from_existing(self) -> Dict[str, Any]: + """Build minimal meta from existing table (for read-only bind).""" + from openviking.storage.collection_schemas import CollectionSchemas + + dim = getattr(self._client, "_vector_dim", None) or 1024 + return CollectionSchemas.context_collection(self._collection_name, dim) + + def _create_backend_collection(self, meta: Dict[str, Any]) -> Collection: + vector_dim = meta.get("Dimension", 0) + schema = _build_oceanbase_schema(meta, vector_dim) + self._client.create_collection( + self._collection_name, + schema=schema, + ) + self._client._vector_dim = vector_dim + icoll = OceanBaseCollection( + client=self._client, + collection_name=self._collection_name, + meta_data=meta, + distance_metric=self._distance_metric, + ) + return Collection(icoll) diff --git a/openviking_cli/utils/config/vectordb_config.py b/openviking_cli/utils/config/vectordb_config.py index 6b77747a..aa9a0256 100644 --- a/openviking_cli/utils/config/vectordb_config.py +++ b/openviking_cli/utils/config/vectordb_config.py @@ -41,6 +41,23 @@ class VikingDBConfig(BaseModel): model_config = {"extra": "forbid"} +class OceanBaseConfig(BaseModel): + """Configuration for OceanBase vector database (via pyobvector).""" + + uri: str = Field( + default="127.0.0.1:2881", + description="OceanBase connection URI (host:port)", + ) + user: str = Field( + default="root@test", + description="Database user (tenant@user for OceanBase)", + ) + password: str = Field(default="", description="Database password") + db_name: str = Field(default="test", description="Database name") + + model_config = {"extra": "forbid"} + + class VectorDBBackendConfig(BaseModel): """ Configuration for VectorDB backend. @@ -99,6 +116,12 @@ class VectorDBBackendConfig(BaseModel): description="VikingDB private deployment configuration for 'vikingdb' type", ) + # OceanBase vector database (pyobvector) + oceanbase: Optional[OceanBaseConfig] = Field( + default_factory=lambda: OceanBaseConfig(), + description="OceanBase configuration for 'oceanbase' type", + ) + custom_params: Dict[str, Any] = Field( default_factory=dict, description="Custom parameters for custom backend adapters", @@ -109,7 +132,7 @@ class VectorDBBackendConfig(BaseModel): @model_validator(mode="after") def validate_config(self): """Validate configuration completeness and consistency""" - standard_backends = ["local", "http", "volcengine", "vikingdb"] + standard_backends = ["local", "http", "volcengine", "vikingdb", "oceanbase"] # Allow custom backend classes (containing dot) without standard validation if "." in self.backend: @@ -145,4 +168,10 @@ def validate_config(self): if not self.vikingdb or not self.vikingdb.host: raise ValueError("VectorDB vikingdb backend requires 'host' to be set") + elif self.backend == "oceanbase": + if not self.oceanbase: + raise ValueError("VectorDB oceanbase backend requires 'oceanbase' config") + if not self.oceanbase.uri: + raise ValueError("VectorDB oceanbase backend requires 'oceanbase.uri' to be set") + return self diff --git a/pyproject.toml b/pyproject.toml index c26aa7cf..2a2918f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,6 +99,9 @@ eval = [ "datasets>=2.0.0", "pandas>=2.0.0", ] +oceanbase = [ + "pyobvector>=0.2.20", +] build = [ "setuptools>=61.0", "setuptools-scm>=8.0", diff --git a/tests/vectordb/test_oceanbase_live.py b/tests/vectordb/test_oceanbase_live.py new file mode 100644 index 00000000..f1f3f54f --- /dev/null +++ b/tests/vectordb/test_oceanbase_live.py @@ -0,0 +1,542 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +""" +OceanBase adapter live tests. OceanBase is started via Docker for the test run. + + - TestOceanBaseLive: basic create/upsert/query/delete flow. + - TestOceanBaseLiveRealWorld: real-world scenarios (multi-tenant context, filter by + context_type/account_id/uri, vector search + filter, delete by uri). + +Prerequisites: + - Docker running, pip install pyobvector or openviking[oceanbase] + +Run: + pytest tests/vectordb/test_oceanbase_live.py -v -s + python -m unittest tests.vectordb.test_oceanbase_live -v +""" + +import hashlib +import subprocess +import time +import unittest +import uuid + +# ----------------------------------------------------------------------------- +# Docker OceanBase helpers (inlined for self-contained tests) +# ----------------------------------------------------------------------------- +OB_IMAGE = "oceanbase/oceanbase-ce" +OB_PORT = "2881" +OB_DB_NAME = "openviking" +BOOT_TIMEOUT_SEC = 300 +BOOT_POLL_INTERVAL_SEC = 3 + + +def _run(cmd: list, capture: bool = True, timeout: int | None = 60) -> subprocess.CompletedProcess: + return subprocess.run( + cmd, + capture_output=capture, + text=True, + timeout=timeout, + ) + + +def _container_name() -> str: + return f"openviking-ob-test-{uuid.uuid4().hex[:8]}" + + +def _start_oceanbase_docker( + container_name: str | None = None, + port: str | None = None, + db_name: str = OB_DB_NAME, + mode: str = "slim", + boot_timeout_sec: int = BOOT_TIMEOUT_SEC, +) -> dict: + """Start OceanBase in Docker, wait until boot success, create database, return connection info.""" + name = container_name or _container_name() + _run(["docker", "rm", "-f", name], capture=True, timeout=10) + host_port_spec = f"0:{OB_PORT}" if port is None else f"{port}:{OB_PORT}" + r = _run(["docker", "run", "-d", "-p", host_port_spec, "--name", name, "-e", "MODE=" + mode, OB_IMAGE], timeout=30) + if r.returncode != 0: + raise RuntimeError(f"docker run failed: {r.stderr or r.stdout}") + + if port is None: + rp = _run(["docker", "port", name, OB_PORT], timeout=5) + if rp.returncode != 0 or not rp.stdout: + _stop_oceanbase_docker(name) + raise RuntimeError("Could not get container host port") + port = rp.stdout.strip().split(":")[-1] + else: + port = str(port) + + deadline = time.monotonic() + boot_timeout_sec + while time.monotonic() < deadline: + # --tail 50: boot success may not be the very last line; bounded read to avoid timeout + r = _run(["docker", "logs", "--tail", "50", name], timeout=20) + log_snippet = (r.stdout or "") + (r.stderr or "") + if "boot success!" in log_snippet: + break + time.sleep(BOOT_POLL_INTERVAL_SEC) + else: + _stop_oceanbase_docker(name) + raise RuntimeError("OceanBase Docker container did not report boot success within timeout") + + for user in ("root@test", "root"): + created = False + for client in ("mysql", "obclient"): + r = _run([ + "docker", "exec", name, + client, "-h127.0.0.1", f"-P{OB_PORT}", f"-u{user}", "-e", + f"CREATE DATABASE IF NOT EXISTS `{db_name}`;", + ], timeout=15) + if r.returncode == 0: + created = True + break + if not created: + _stop_oceanbase_docker(name) + raise RuntimeError(f"Could not create database for user {user}") + + def stop_callback() -> None: + _stop_oceanbase_docker(name) + + return { + "host": "127.0.0.1", + "port": port, + "user": "root", + "password": "", + "db_name": db_name, + "container_name": name, + "stop_callback": stop_callback, + } + + +def _stop_oceanbase_docker(container_name: str) -> None: + _run(["docker", "stop", "-t", "10", container_name], timeout=30) + _run(["docker", "rm", "-f", container_name], timeout=15) + + +def _is_docker_available() -> bool: + r = _run(["docker", "info"], timeout=15) + return r.returncode == 0 + + +# ----------------------------------------------------------------------------- +# Connection args for live DB; filled by Docker startup +# ----------------------------------------------------------------------------- +CONNECTION_ARGS = { + "host": "127.0.0.1", + "port": "2881", + "user": "root", + "password": "", + "db_name": "openviking", +} + +# Set by Docker startup; cleared in last class tearDownClass +_docker_stop_callback = None + + +def _get_oceanbase_config( + collection_name: str = "openviking_test_context", + distance_metric: str = "cosine", +): + """Build VectorDBBackendConfig (OceanBase) from CONNECTION_ARGS.""" + from openviking_cli.utils.config.vectordb_config import OceanBaseConfig, VectorDBBackendConfig + + uri = f"{CONNECTION_ARGS['host']}:{CONNECTION_ARGS['port']}" + return VectorDBBackendConfig( + backend="oceanbase", + name=collection_name, + distance_metric=distance_metric, + dimension=8, # short vectors for live tests + oceanbase=OceanBaseConfig( + uri=uri, + user=CONNECTION_ARGS["user"], + password=CONNECTION_ARGS["password"], + db_name=CONNECTION_ARGS["db_name"], + ), + ) + + +def _get_context_schema(name: str, dimension: int = 8): + """Return OpenViking context collection schema with Dimension.""" + from openviking.storage.collection_schemas import CollectionSchemas + + schema = CollectionSchemas.context_collection(name, dimension) + schema["Dimension"] = dimension + return schema + + +class TestOceanBaseLive(unittest.TestCase): + """Live tests against real OceanBase (127.0.0.1:2881, db_name=openviking).""" + + @classmethod + def setUpClass(cls): + global _docker_stop_callback + if not _is_docker_available(): + raise unittest.SkipTest("Docker not available; OceanBase tests require Docker") + if _docker_stop_callback is None: + info = _start_oceanbase_docker(mode="slim") + CONNECTION_ARGS["host"] = info["host"] + CONNECTION_ARGS["port"] = info["port"] + CONNECTION_ARGS["user"] = info["user"] + CONNECTION_ARGS["password"] = info["password"] + CONNECTION_ARGS["db_name"] = info["db_name"] + _docker_stop_callback = info["stop_callback"] + # Docker OceanBase (slim) does not support cosine/neg_ip; use L2 + cls._distance = "l2" + try: + import pyobvector # noqa: F401 + except ImportError: + raise unittest.SkipTest("pyobvector not installed (pip install pyobvector or openviking[oceanbase])") + cls.config = _get_oceanbase_config(distance_metric=cls._distance) + cls.schema = _get_context_schema(cls.config.name or "openviking_test_context", dimension=cls.config.dimension or 8) + cls.adapter = None + cls.collection_name = cls.config.name or "openviking_test_context" + + def test_01_create_adapter_and_collection(self): + """Create adapter and collection (skip if already exists).""" + from openviking.storage.vectordb_adapters.factory import create_collection_adapter + + self.adapter = create_collection_adapter(self.config) + self.assertIsNotNone(self.adapter) + self.assertEqual(self.adapter.mode, "oceanbase") + + if not self.adapter.collection_exists(): + created = self.adapter.create_collection( + self.collection_name, + self.schema, + distance=self._distance, + sparse_weight=0.0, + index_name="default", + ) + self.assertTrue(created, "create_collection should return True when collection was created") + else: + # Collection exists, skip create + pass + + def test_02_upsert_and_fetch(self): + """Upsert records and fetch by id.""" + from openviking.storage.vectordb_adapters.factory import create_collection_adapter + + if self.adapter is None: + self.adapter = create_collection_adapter(self.config) + if not self.adapter.collection_exists(): + self.test_01_create_adapter_and_collection() + + # 8-dim vectors (match schema); adapter fills required fields without defaults + records = [ + { + "id": "live-test-id-1", + "uri": "viking://resources/test/doc1.md", + "type": "file", + "context_type": "resource", + "vector": [0.1] * 8, + "sparse_vector": {}, + "abstract": "first doc", + "created_at": 0, + "updated_at": 0, + }, + { + "id": "live-test-id-2", + "uri": "viking://resources/test/doc2.md", + "type": "file", + "context_type": "resource", + "vector": [0.2] * 8, + "sparse_vector": {}, + "abstract": "second doc", + "created_at": 0, + "updated_at": 0, + }, + ] + ids = self.adapter.upsert(records) + self.assertEqual(len(ids), 2) + self.assertIn("live-test-id-1", ids) + self.assertIn("live-test-id-2", ids) + + fetched = self.adapter.get(ids=["live-test-id-1", "live-test-id-2"]) + self.assertGreaterEqual(len(fetched), 2) + + def test_03_search_by_vector(self): + """Vector search.""" + from openviking.storage.vectordb_adapters.factory import create_collection_adapter + + if self.adapter is None: + self.adapter = create_collection_adapter(self.config) + if not self.adapter.collection_exists(): + self.test_01_create_adapter_and_collection() + + query_vector = [0.15] * 8 + results = self.adapter.query( + query_vector=query_vector, + limit=5, + ) + self.assertIsInstance(results, list) + # At least 2 rows from test_02 + self.assertGreaterEqual(len(results), 2) + + def test_04_count_and_cleanup(self): + """Count and delete test data.""" + from openviking.storage.vectordb_adapters.factory import create_collection_adapter + + if self.adapter is None: + self.adapter = create_collection_adapter(self.config) + if not self.adapter.collection_exists(): + return + + n = self.adapter.count() + self.assertGreaterEqual(n, 0) + + # Delete test ids inserted by this script + deleted = self.adapter.delete(ids=["live-test-id-1", "live-test-id-2"]) + self.assertGreaterEqual(deleted, 0) + + +# ----------------------------------------------------------------------------- +# Real-world: multi-tenant, filter, vector search, delete by uri +# ----------------------------------------------------------------------------- + +REALWORLD_COLLECTION = "openviking_live_context" + + +def _realworld_record( + uri: str, + context_type: str, + account_id: str, + owner_space: str, + abstract: str, + vector: list, + level: int = 2, + name: str = "", +): + """Build one context record matching production shape (id from account_id:uri, same as TextEmbeddingHandler).""" + id_seed = f"{account_id}:{uri}" + record_id = hashlib.md5(id_seed.encode("utf-8")).hexdigest() + return { + "id": record_id, + "uri": uri, + "type": "file", + "context_type": context_type, + "vector": vector, + "abstract": abstract, + "account_id": account_id, + "owner_space": owner_space, + "level": level, + "name": name or abstract[:50], + } + + +class TestOceanBaseLiveRealWorld(unittest.TestCase): + """ + OceanBase live tests for real-world scenarios: + - Multi-tenant context upsert (resource / memory) + - Filter by context_type, account_id, uri + - Vector search + filter + - Delete by uri, delete by filter, full cleanup + """ + + @classmethod + def setUpClass(cls): + try: + import pyobvector # noqa: F401 + except ImportError: + raise unittest.SkipTest("pyobvector not installed") + # Use L2: many OceanBase versions do not support neg_ip (cosine); real-world collection uses L2 + cls.config = _get_oceanbase_config(REALWORLD_COLLECTION, distance_metric="l2") + cls.schema = _get_context_schema(REALWORLD_COLLECTION, dimension=cls.config.dimension or 8) + cls.adapter = None + + def _adapter(self): + from openviking.storage.vectordb_adapters.factory import create_collection_adapter + + if self.adapter is None: + self.adapter = create_collection_adapter(self.config) + return self.adapter + + def _ensure_collection(self): + a = self._adapter() + if not a.collection_exists(): + a.create_collection( + REALWORLD_COLLECTION, + self.schema, + distance="l2", # Match config; OceanBase may not support neg_ip + sparse_weight=0.0, + index_name="default", + ) + + def test_realworld_01_upsert_multi_tenant_context(self): + """Real-world: upsert multi-tenant resource + memory context data.""" + self._ensure_collection() + dim = 8 + records = [ + _realworld_record( + uri="viking://resources/acc_1/proj_a/readme.md", + context_type="resource", + account_id="acc_1", + owner_space="default", + abstract="Project A readme", + vector=[0.1] * dim, + name="readme", + ), + _realworld_record( + uri="viking://resources/acc_1/proj_a/src/main.py", + context_type="resource", + account_id="acc_1", + owner_space="default", + abstract="Main entry", + vector=[0.12, 0.11, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1], + name="main", + ), + _realworld_record( + uri="viking://user/acc_1/default/memories/mem_001.md", + context_type="memory", + account_id="acc_1", + owner_space="default", + abstract="User preference: prefers Python", + vector=[0.2] * dim, + name="memory", + ), + _realworld_record( + uri="viking://resources/acc_2/proj_b/doc.md", + context_type="resource", + account_id="acc_2", + owner_space="default", + abstract="Project B doc", + vector=[0.15] * dim, + name="doc", + ), + _realworld_record( + uri="viking://user/acc_2/default/memories/mem_002.md", + context_type="memory", + account_id="acc_2", + owner_space="default", + abstract="User preference: prefers Markdown", + vector=[0.18, 0.18, 0.18, 0.18, 0.18, 0.18, 0.18, 0.18], + name="memory", + ), + ] + ids = self._adapter().upsert(records) + self.assertEqual(len(ids), 5) + self.assertEqual(len(set(ids)), 5) + + def test_realworld_02_filter_by_context_type(self): + """Real-world: filter by context_type.""" + self._ensure_collection() + from openviking.storage.expr import Eq + + resource_records = self._adapter().query( + filter=Eq("context_type", "resource"), + limit=20, + ) + memory_records = self._adapter().query( + filter=Eq("context_type", "memory"), + limit=20, + ) + self.assertGreaterEqual(len(resource_records), 3, "at least 3 resource (2 acc_1 + 1 acc_2)") + self.assertGreaterEqual(len(memory_records), 2, "at least 2 memory") + for r in resource_records: + self.assertEqual(r.get("context_type"), "resource") + for r in memory_records: + self.assertEqual(r.get("context_type"), "memory") + + def test_realworld_03_filter_by_account_id(self): + """Real-world: filter by account_id (tenant isolation).""" + self._ensure_collection() + from openviking.storage.expr import Eq + + acc1 = self._adapter().query(filter=Eq("account_id", "acc_1"), limit=20) + acc2 = self._adapter().query(filter=Eq("account_id", "acc_2"), limit=20) + self.assertGreaterEqual(len(acc1), 3, "acc_1 at least 3") + self.assertGreaterEqual(len(acc2), 2, "acc_2 at least 2") + for r in acc1: + self.assertEqual(r.get("account_id"), "acc_1") + for r in acc2: + self.assertEqual(r.get("account_id"), "acc_2") + + def test_realworld_04_vector_search_with_filter(self): + """Real-world: vector search + context_type filter.""" + self._ensure_collection() + from openviking.storage.expr import And, Eq + + query_vector = [0.12] * 8 + results = self._adapter().query( + query_vector=query_vector, + filter=Eq("context_type", "resource"), + limit=5, + ) + self.assertIsInstance(results, list) + self.assertGreaterEqual(len(results), 1) + for r in results: + self.assertEqual(r.get("context_type"), "resource") + self.assertIn("abstract", r) + + def test_realworld_05_fetch_by_uri(self): + """Real-world: fetch by uri (simulate fetch_by_uri).""" + self._ensure_collection() + target_uri = "viking://resources/acc_1/proj_a/readme.md" + records = self._adapter().query( + filter={"op": "must", "field": "uri", "conds": [target_uri]}, + limit=2, + ) + self.assertGreaterEqual(len(records), 1) + self.assertEqual(records[0].get("uri"), target_uri) + self.assertEqual(records[0].get("account_id"), "acc_1") + + def test_realworld_06_get_by_ids(self): + """Real-world: get by ids (same ids as returned by upsert).""" + self._ensure_collection() + uri = "viking://resources/acc_1/proj_a/readme.md" + record_id = hashlib.md5(f"acc_1:{uri}".encode("utf-8")).hexdigest() + fetched = self._adapter().get(ids=[record_id]) + self.assertGreaterEqual(len(fetched), 1) + self.assertEqual(fetched[0].get("id"), record_id) + self.assertEqual(fetched[0].get("uri"), uri) + + def test_realworld_07_count_with_filter(self): + """Real-world: count with filter.""" + self._ensure_collection() + from openviking.storage.expr import Eq + + total = self._adapter().count() + self.assertGreaterEqual(total, 5) + resource_count = self._adapter().count(filter=Eq("context_type", "resource")) + self.assertGreaterEqual(resource_count, 3) + acc1_count = self._adapter().count(filter=Eq("account_id", "acc_1")) + self.assertGreaterEqual(acc1_count, 3) + + def test_realworld_08_delete_by_uri_then_cleanup(self): + """Real-world: delete by uri, then cleanup by account_id.""" + self._ensure_collection() + # Delete one by uri + target_uri = "viking://resources/acc_2/proj_b/doc.md" + records = self._adapter().query( + filter={"op": "must", "field": "uri", "conds": [target_uri]}, + limit=2, + ) + if records: + rid = records[0].get("id") + if rid: + self._adapter().delete(ids=[rid]) + # Delete all for acc_1 and acc_2 (tenant cleanup) + from openviking.storage.expr import Eq + + for acc in ("acc_1", "acc_2"): + deleted = self._adapter().delete(filter=Eq("account_id", acc), limit=10000) + self.assertGreaterEqual(deleted, 0) + # Collection may be empty if only this class uses it + final_count = self._adapter().count() + self.assertGreaterEqual(final_count, 0) + # This test only verifies delete by filter runs; for strict cleanup, drop collection + self.assertIsInstance(final_count, int) + self.adapter = None # Allow later tests to recreate adapter + + @classmethod + def tearDownClass(cls): + global _docker_stop_callback + if _docker_stop_callback is not None: + try: + _docker_stop_callback() + finally: + _docker_stop_callback = None + + +if __name__ == "__main__": + unittest.main()