From 278ee80cecf88cfca38b29701d364c6f42207fc1 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 27 May 2024 17:19:04 +0100 Subject: [PATCH 1/6] Initial support for MongoDB Vector Search --- engine/clients/client_factory.py | 9 ++ engine/clients/mongodb/__init__.py | 3 + engine/clients/mongodb/config.py | 32 +++++ engine/clients/mongodb/configure.py | 115 ++++++++++++++++++ engine/clients/mongodb/search.py | 59 +++++++++ engine/clients/mongodb/upload.py | 78 ++++++++++++ .../configurations/mongodb-single-node.json | 15 +++ 7 files changed, 311 insertions(+) create mode 100644 engine/clients/mongodb/__init__.py create mode 100644 engine/clients/mongodb/config.py create mode 100644 engine/clients/mongodb/configure.py create mode 100644 engine/clients/mongodb/search.py create mode 100644 engine/clients/mongodb/upload.py create mode 100644 experiments/configurations/mongodb-single-node.json diff --git a/engine/clients/client_factory.py b/engine/clients/client_factory.py index a74df2ab..5762d84c 100644 --- a/engine/clients/client_factory.py +++ b/engine/clients/client_factory.py @@ -31,6 +31,12 @@ WeaviateUploader, ) +from engine.clients.mongodb import ( + MongoConfigurator, + MongoSearcher, + MongoUploader, +) + ENGINE_CONFIGURATORS = { "qdrant": QdrantConfigurator, "weaviate": WeaviateConfigurator, @@ -39,6 +45,7 @@ "opensearch": OpenSearchConfigurator, "redis": RedisConfigurator, "pgvector": PgVectorConfigurator, + "mongodb": MongoConfigurator, } ENGINE_UPLOADERS = { @@ -49,6 +56,7 @@ "opensearch": OpenSearchUploader, "redis": RedisUploader, "pgvector": PgVectorUploader, + "mongodb": MongoUploader, } ENGINE_SEARCHERS = { @@ -59,6 +67,7 @@ "opensearch": OpenSearchSearcher, "redis": RedisSearcher, "pgvector": PgVectorSearcher, + "mongodb": MongoSearcher, } diff --git a/engine/clients/mongodb/__init__.py b/engine/clients/mongodb/__init__.py new file mode 100644 index 00000000..fb90af48 --- /dev/null +++ b/engine/clients/mongodb/__init__.py @@ -0,0 +1,3 @@ +from engine.clients.mongodb.configure import MongoConfigurator +from engine.clients.mongodb.upload import MongoUploader +from engine.clients.mongodb.search import MongoSearcher diff --git a/engine/clients/mongodb/config.py b/engine/clients/mongodb/config.py new file mode 100644 index 00000000..88b97ad4 --- /dev/null +++ b/engine/clients/mongodb/config.py @@ -0,0 +1,32 @@ +import os +from pymongo.mongo_client import MongoClient + +MONGO_PORT = int(os.getenv("MONGO_PORT", 27017)) +MONGO_AUTH = os.getenv("MONGO_AUTH", "performance") +MONGO_USER = os.getenv("MONGO_USER", "performance") +MONGO_READ_PREFERENCE = os.getenv("MONGO_READ_PREFERENCE", "primary") +MONGO_WRITE_CONCERN = os.getenv("MONGO_READ_PREFERENCE", "1") +EMBEDDING_FIELD_NAME = os.getenv("EMBEDDING_FIELD_NAME", "embedding") +EMBEDDING_DISTANCE = os.getenv("EMBEDDING_DISTANCE", None) +ATLAS_DB_NAME = os.getenv("ATLAS_DB_NAME", "vector-db") +ATLAS_COLLECTION_NAME = os.getenv("ATLAS_COLLECTION_NAME", "vector-collection") +ATLAS_VECTOR_SEARCH_INDEX_NAME = os.getenv( + "ATLAS_VECTOR_SEARCH_INDEX_NAME", "vector-index" +) + +# 90 seconds timeout +MONGO_QUERY_TIMEOUT = int(os.getenv("MONGO_QUERY_TIMEOUT", 90 * 1000)) + + +def get_mongo_client(host, connection_params): + user = MONGO_USER + auth = MONGO_AUTH + uri = f"mongodb+srv://{user}:{auth}@{host}/?retryWrites=true&w={MONGO_WRITE_CONCERN}&appName=vector-db-benchmark&readPreference={MONGO_READ_PREFERENCE}" + # Create a new client and connect to the server + client = MongoClient(uri) + # Send a ping to confirm a successful connection + try: + client.admin.command("ping") + except Exception as e: + print(f"Failed pinging the deployment... error {e}") + return client diff --git a/engine/clients/mongodb/configure.py b/engine/clients/mongodb/configure.py new file mode 100644 index 00000000..bdd021d6 --- /dev/null +++ b/engine/clients/mongodb/configure.py @@ -0,0 +1,115 @@ +from benchmark.dataset import Dataset +from engine.base_client import IncompatibilityError +from engine.base_client.configure import BaseConfigurator +from engine.base_client.distances import Distance +from engine.clients.mongodb.config import ( + get_mongo_client, + EMBEDDING_FIELD_NAME, + ATLAS_COLLECTION_NAME, + ATLAS_VECTOR_SEARCH_INDEX_NAME, + ATLAS_DB_NAME, +) +import time + + +class MongoConfigurator(BaseConfigurator): + DISTANCE_MAPPING = { + Distance.L2: "euclidean", + Distance.COSINE: "cosine", + Distance.DOT: "dotProduct", + } + + def __init__(self, host, collection_params: dict, connection_params: dict): + super().__init__(host, collection_params, connection_params) + self.client = get_mongo_client(host, connection_params) + self.db = self.client[ATLAS_DB_NAME] + self.collection = self.db[ATLAS_COLLECTION_NAME] + + def clean(self): + index_exists = True + try_count = 1 + + while index_exists is True: + index_exists = False + print( + f"Ensuring the search index named {ATLAS_VECTOR_SEARCH_INDEX_NAME} does not exist..." + ) + try: + self.collection.drop_search_index(ATLAS_VECTOR_SEARCH_INDEX_NAME) + except Exception as e: + if "IndexNotFound" in e.__str__(): + pass + else: + print(e) + + stats = self.db.command("collstats", self.collection.name) + # Print the index details + index_details = stats.get("indexDetails", {}) + index_exists = False + for index_name, details in index_details.items(): + if ATLAS_VECTOR_SEARCH_INDEX_NAME in index_name: + print(f"Still detected index. Stats: {details}") + index_exists = True + try_count = try_count + 1 + # sleep for 10 seconds to avoid invalid state + time.sleep(10) + + print( + f"Finished ensuring the search index does not exist... after {try_count} tries" + ) + + print("Ensuring the collection does not exist...") + + collection_exists = True + while collection_exists is True: + try_count = try_count + 1 + try: + self.db.drop_collection(ATLAS_COLLECTION_NAME) + except Exception as e: + if "not exist" in e.__str__(): + pass + else: + print(e) + collection_exists = False + collection_names = self.db.list_collection_names() + for collection_name in collection_names: + if ATLAS_COLLECTION_NAME in collection_name: + print( + f"Still detected collection named {ATLAS_COLLECTION_NAME}. Trying again..." + ) + collection_exists = True + # sleep for 10 seconds to avoid invalid state + time.sleep(10) + print( + f"Finished ensuring the collection does not exist... after {try_count} tries" + ) + + def recreate(self, dataset: Dataset, collection_params): + # Explicitly create a collection in a MongoDB database. + print(f"Explicitly creating a collection named {ATLAS_COLLECTION_NAME}...") + self.db.create_collection(ATLAS_COLLECTION_NAME) + self.collection = self.db[ATLAS_COLLECTION_NAME] + print( + f"Creating the search index with vector mapping named {ATLAS_VECTOR_SEARCH_INDEX_NAME}..." + ) + + self.collection.create_search_index( + { + "definition": { + "mappings": { + "dynamic": True, + "fields": { + EMBEDDING_FIELD_NAME: { + "dimensions": dataset.config.vector_size, + "similarity": self.DISTANCE_MAPPING[ + dataset.config.distance + ], + "type": "knnVector", + } + }, + } + }, + "name": ATLAS_VECTOR_SEARCH_INDEX_NAME, + } + ) + pass diff --git a/engine/clients/mongodb/search.py b/engine/clients/mongodb/search.py new file mode 100644 index 00000000..c03232da --- /dev/null +++ b/engine/clients/mongodb/search.py @@ -0,0 +1,59 @@ +from engine.base_client.distances import Distance +from engine.base_client.search import BaseSearcher +from engine.clients.mongodb.config import ( + ATLAS_VECTOR_SEARCH_INDEX_NAME, + get_mongo_client, + EMBEDDING_FIELD_NAME, + ATLAS_DB_NAME, + ATLAS_COLLECTION_NAME, +) +import copy +from typing import List, Tuple + + +class MongoSearcher(BaseSearcher): + search_params = {} + client = None + + @classmethod + def init_client(cls, host, distance, connection_params: dict, search_params: dict): + cls.distance = distance + cls.client = get_mongo_client(host, connection_params) + cls.search_params = copy.deepcopy(search_params) + + @classmethod + def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]: + numCandidates = cls.search_params.pop("numCandidates", 100) + # define pipeline + + pipeline = [ + { + "$vectorSearch": { + "index": ATLAS_VECTOR_SEARCH_INDEX_NAME, + "path": EMBEDDING_FIELD_NAME, + "queryVector": vector, + "numCandidates": numCandidates, + "limit": top, + } + }, + { + "$project": { + "score": {"$meta": "vectorSearchScore"}, + } + }, + ] + + # run pipeline + results = cls.client[ATLAS_DB_NAME][ATLAS_COLLECTION_NAME].aggregate(pipeline) + search_result = [] + for result in results: + reverted_normalization_score = float(result["score"]) + # In MongoDB Atlas, for cosine and dotProduct similarities, + # the normalization of the score is done using the following formula: + # score = (1 + cosine/dot_product(v1,v2)) / 2 + # to revert it we simply do: + if cls.distance == Distance.COSINE or cls.distance == Distance.L2: + reverted_normalization_score = (2.0 * reverted_normalization_score) - 1 + search_result.append((int(result["_id"]), reverted_normalization_score)) + + return search_result diff --git a/engine/clients/mongodb/upload.py b/engine/clients/mongodb/upload.py new file mode 100644 index 00000000..8bb090bc --- /dev/null +++ b/engine/clients/mongodb/upload.py @@ -0,0 +1,78 @@ +from typing import List, Optional +import numpy as np +from pymongo import InsertOne + +from engine.base_client.upload import BaseUploader +from engine.clients.mongodb.config import ( + get_mongo_client, + EMBEDDING_FIELD_NAME, + ATLAS_COLLECTION_NAME, + ATLAS_DB_NAME, + ATLAS_VECTOR_SEARCH_INDEX_NAME, +) +import time + + +class MongoUploader(BaseUploader): + client = None + upload_params = {} + + @classmethod + def init_client(cls, host, distance, connection_params, upload_params): + cls.client = get_mongo_client(host, connection_params) + cls.upload_params = upload_params + # Getting the database instance + cls.db = cls.client[ATLAS_DB_NAME] + # Creating a collection + cls.collection = cls.db[ATLAS_COLLECTION_NAME] + + @classmethod + def upload_batch( + cls, ids: List[int], vectors: List[list], metadata: Optional[List[dict]] + ): + # Update the collection with the embeddings + requests = [] + + for i in range(len(ids)): + doc_id = ids[i] + embedding = vectors[i] + doc = {} + doc["_id"] = doc_id + doc[EMBEDDING_FIELD_NAME] = embedding + requests.append(InsertOne(doc)) + + cls.collection.bulk_write(requests) + + @classmethod + def post_upload(cls, _distance): + print("waiting for search index status to be Active") + + queryable = False + status = "n/a" + try_count = 1 + while status != "ACTIVE" and queryable is False: + print(f"checking search indices. try: {try_count}...") + search_indexes = cls.collection.list_search_indexes() + for search_index in search_indexes: + index_name = search_index["name"] + if index_name == ATLAS_VECTOR_SEARCH_INDEX_NAME: + print( + f"detected search index named {ATLAS_VECTOR_SEARCH_INDEX_NAME}. checking status..." + ) + print(search_index) + queryable = search_index["queryable"] + status = search_index["status"] + try_count = try_count + 1 + print( + f"Finished waiting for search index status={status} and queryable={queryable}." + ) + return {} + + @classmethod + def get_memory_usage(cls): + # sleeping for 60 seconds given on Mongodb Atlas the index takes longer to be fully indexed + print( + "sleeping for 60 seconds given on Mongodb Atlas the index takes longer to be fully indexed" + ) + time.sleep(60) + return {} diff --git a/experiments/configurations/mongodb-single-node.json b/experiments/configurations/mongodb-single-node.json new file mode 100644 index 00000000..a4770196 --- /dev/null +++ b/experiments/configurations/mongodb-single-node.json @@ -0,0 +1,15 @@ +[ + { + "name": "mongodb-default", + "engine": "mongodb", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index_options": { } }, + "search_params": [ + { "parallel": 1, "numCandidates": 128 }, { "parallel": 1, "numCandidates": 256 }, { "parallel": 1, "numCandidates": 512 }, { "parallel": 1, "numCandidates": 1024 }, { "parallel": 1, "numCandidates": 1536 }, { "parallel": 1, "numCandidates": 2048 }, + { "parallel": 100, "numCandidates": 128 }, { "parallel": 100, "numCandidates": 256 }, { "parallel": 100, "numCandidates": 512 }, { "parallel": 1, "numCandidates": 1536 }, { "parallel": 1, "numCandidates": 2048 } + ], + "upload_params": { "parallel": 16 } + } +] From 0d9cc0dc710840702e8f849a85ebc6530f355f8b Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 27 May 2024 17:33:56 +0100 Subject: [PATCH 2/6] Fixes per PR review: isort --- engine/clients/mongodb/__init__.py | 2 +- engine/clients/mongodb/config.py | 1 + engine/clients/mongodb/configure.py | 10 +++++----- engine/clients/mongodb/search.py | 11 ++++++----- engine/clients/mongodb/upload.py | 13 ++++--------- 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/engine/clients/mongodb/__init__.py b/engine/clients/mongodb/__init__.py index fb90af48..3e78844b 100644 --- a/engine/clients/mongodb/__init__.py +++ b/engine/clients/mongodb/__init__.py @@ -1,3 +1,3 @@ from engine.clients.mongodb.configure import MongoConfigurator -from engine.clients.mongodb.upload import MongoUploader from engine.clients.mongodb.search import MongoSearcher +from engine.clients.mongodb.upload import MongoUploader diff --git a/engine/clients/mongodb/config.py b/engine/clients/mongodb/config.py index 88b97ad4..18fbb789 100644 --- a/engine/clients/mongodb/config.py +++ b/engine/clients/mongodb/config.py @@ -1,4 +1,5 @@ import os + from pymongo.mongo_client import MongoClient MONGO_PORT = int(os.getenv("MONGO_PORT", 27017)) diff --git a/engine/clients/mongodb/configure.py b/engine/clients/mongodb/configure.py index bdd021d6..29b97627 100644 --- a/engine/clients/mongodb/configure.py +++ b/engine/clients/mongodb/configure.py @@ -1,15 +1,15 @@ +import time + from benchmark.dataset import Dataset -from engine.base_client import IncompatibilityError from engine.base_client.configure import BaseConfigurator from engine.base_client.distances import Distance from engine.clients.mongodb.config import ( - get_mongo_client, - EMBEDDING_FIELD_NAME, ATLAS_COLLECTION_NAME, - ATLAS_VECTOR_SEARCH_INDEX_NAME, ATLAS_DB_NAME, + ATLAS_VECTOR_SEARCH_INDEX_NAME, + EMBEDDING_FIELD_NAME, + get_mongo_client, ) -import time class MongoConfigurator(BaseConfigurator): diff --git a/engine/clients/mongodb/search.py b/engine/clients/mongodb/search.py index c03232da..cebba543 100644 --- a/engine/clients/mongodb/search.py +++ b/engine/clients/mongodb/search.py @@ -1,14 +1,15 @@ +import copy +from typing import List, Tuple + from engine.base_client.distances import Distance from engine.base_client.search import BaseSearcher from engine.clients.mongodb.config import ( + ATLAS_COLLECTION_NAME, + ATLAS_DB_NAME, ATLAS_VECTOR_SEARCH_INDEX_NAME, - get_mongo_client, EMBEDDING_FIELD_NAME, - ATLAS_DB_NAME, - ATLAS_COLLECTION_NAME, + get_mongo_client, ) -import copy -from typing import List, Tuple class MongoSearcher(BaseSearcher): diff --git a/engine/clients/mongodb/upload.py b/engine/clients/mongodb/upload.py index 8bb090bc..051df6a7 100644 --- a/engine/clients/mongodb/upload.py +++ b/engine/clients/mongodb/upload.py @@ -1,16 +1,16 @@ +import time from typing import List, Optional -import numpy as np + from pymongo import InsertOne from engine.base_client.upload import BaseUploader from engine.clients.mongodb.config import ( - get_mongo_client, - EMBEDDING_FIELD_NAME, ATLAS_COLLECTION_NAME, ATLAS_DB_NAME, ATLAS_VECTOR_SEARCH_INDEX_NAME, + EMBEDDING_FIELD_NAME, + get_mongo_client, ) -import time class MongoUploader(BaseUploader): @@ -70,9 +70,4 @@ def post_upload(cls, _distance): @classmethod def get_memory_usage(cls): - # sleeping for 60 seconds given on Mongodb Atlas the index takes longer to be fully indexed - print( - "sleeping for 60 seconds given on Mongodb Atlas the index takes longer to be fully indexed" - ) - time.sleep(60) return {} From 4585d0b548ad3c4342b652d6086eb7c3073d2aac Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 27 May 2024 17:35:30 +0100 Subject: [PATCH 3/6] Fixes per PR review: isort --- engine/clients/mongodb/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/clients/mongodb/__init__.py b/engine/clients/mongodb/__init__.py index 3e78844b..dd3cf2db 100644 --- a/engine/clients/mongodb/__init__.py +++ b/engine/clients/mongodb/__init__.py @@ -1,3 +1,4 @@ +# noqa: F401 from engine.clients.mongodb.configure import MongoConfigurator from engine.clients.mongodb.search import MongoSearcher from engine.clients.mongodb.upload import MongoUploader From 970f4f9c23e0649ea6b3448048bfe4a4d83d4322 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 27 May 2024 17:37:02 +0100 Subject: [PATCH 4/6] Fixes per PR review: isort --- engine/clients/mongodb/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/clients/mongodb/__init__.py b/engine/clients/mongodb/__init__.py index dd3cf2db..ad79f3ee 100644 --- a/engine/clients/mongodb/__init__.py +++ b/engine/clients/mongodb/__init__.py @@ -1,4 +1,5 @@ -# noqa: F401 from engine.clients.mongodb.configure import MongoConfigurator from engine.clients.mongodb.search import MongoSearcher from engine.clients.mongodb.upload import MongoUploader + +__all__ = ["MongoConfigurator", "MongoSearcher", "MongoUploader"] From 7dfef8bb32820263242e586b23536b3159d0cd49 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 27 May 2024 17:38:10 +0100 Subject: [PATCH 5/6] Fixes per PR review: isort --- engine/clients/client_factory.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/engine/clients/client_factory.py b/engine/clients/client_factory.py index 5762d84c..c9c2fe67 100644 --- a/engine/clients/client_factory.py +++ b/engine/clients/client_factory.py @@ -13,6 +13,7 @@ ElasticUploader, ) from engine.clients.milvus import MilvusConfigurator, MilvusSearcher, MilvusUploader +from engine.clients.mongodb import MongoConfigurator, MongoSearcher, MongoUploader from engine.clients.opensearch import ( OpenSearchConfigurator, OpenSearchSearcher, @@ -31,12 +32,6 @@ WeaviateUploader, ) -from engine.clients.mongodb import ( - MongoConfigurator, - MongoSearcher, - MongoUploader, -) - ENGINE_CONFIGURATORS = { "qdrant": QdrantConfigurator, "weaviate": WeaviateConfigurator, From a9459d9ccb1cebd97fa0f373581a3d225906bc26 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 26 Jun 2024 16:29:13 +0100 Subject: [PATCH 6/6] Fixes per PR linter: ruff --- engine/clients/mongodb/upload.py | 1 - 1 file changed, 1 deletion(-) diff --git a/engine/clients/mongodb/upload.py b/engine/clients/mongodb/upload.py index 051df6a7..b56ba6f5 100644 --- a/engine/clients/mongodb/upload.py +++ b/engine/clients/mongodb/upload.py @@ -1,4 +1,3 @@ -import time from typing import List, Optional from pymongo import InsertOne