From 08ce3613f0d4bc7cd9ad07426164fcf3f4b9ec92 Mon Sep 17 00:00:00 2001 From: Casey Clements Date: Thu, 11 Sep 2025 17:24:08 -0400 Subject: [PATCH 1/3] Prototype with new class AutoEmbeddingVectorStore and test_autoembedding.py --- .../langchain_mongodb/index.py | 52 ++++ .../langchain_mongodb/vectorstores.py | 284 ++++++++++++++++++ libs/langchain-mongodb/pyproject.toml | 1 + .../integration_tests/test_autoembedding.py | 90 ++++++ 4 files changed, 427 insertions(+) create mode 100644 libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py diff --git a/libs/langchain-mongodb/langchain_mongodb/index.py b/libs/langchain-mongodb/langchain_mongodb/index.py index 09189009..f756e62a 100644 --- a/libs/langchain-mongodb/langchain_mongodb/index.py +++ b/libs/langchain-mongodb/langchain_mongodb/index.py @@ -242,3 +242,55 @@ def create_fulltext_search_index( timeout=wait_until_complete, ) logger.info(result) + + +def create_autoembedded_vector_search_index( + collection: Collection, + index_name: str, + path: str, + model: str, + filters: Optional[List[str]] = None, + wait_until_complete: Optional[float] = None, + **kwargs: Any, +) -> None: + """Experimental Utility function to create a vector search index with autoembedding. + + Args: + collection (Collection): MongoDB Collection + index_name (str): Name of Index + path (str): field containing strings to produce embedding vectors from. + filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch + wait_until_complete (Optional[float]): If provided, number of seconds to wait + until search index is ready. + kwargs: Keyword arguments supplying any additional options to SearchIndexModel. + """ + logger.info("Creating Search Index %s on %s", index_name, collection.name) + + if collection.name not in collection.database.list_collection_names(): + collection.database.create_collection(collection.name) + + fields = [{ + "type": "text", + "model": model, + "path": path, + }] + if filters: + for field in filters: + fields.append({"type": "filter", "path": field}) + definition = {"fields": fields} + + result = collection.create_search_index( + SearchIndexModel( + definition=definition, + name=index_name, + type="vectorSearch", + ) + ) + + if wait_until_complete: + _wait_for_predicate( + predicate=lambda: _is_index_ready(collection, index_name), + err=f"{index_name=} did not complete in {wait_until_complete}!", + timeout=wait_until_complete, + ) + logger.info(result) \ No newline at end of file diff --git a/libs/langchain-mongodb/langchain_mongodb/vectorstores.py b/libs/langchain-mongodb/langchain_mongodb/vectorstores.py index 888ea75f..b686ed1e 100644 --- a/libs/langchain-mongodb/langchain_mongodb/vectorstores.py +++ b/libs/langchain-mongodb/langchain_mongodb/vectorstores.py @@ -24,10 +24,12 @@ from pymongo import MongoClient, ReplaceOne from pymongo.collection import Collection from pymongo.errors import CollectionInvalid +from langchain_voyageai import VoyageAIEmbeddings from langchain_mongodb.index import ( create_vector_search_index, update_vector_search_index, + create_autoembedded_vector_search_index, ) from langchain_mongodb.pipelines import vector_search_stage from langchain_mongodb.utils import ( @@ -865,3 +867,285 @@ def create_vector_search_index( wait_until_complete=wait_until_complete, **kwargs, ) # type: ignore [operator] + + +class AutoEmbeddingVectorStore(VectorStore): + """Automated embedding in Atlas Vector Search + + + """ + def __init__( + self, + collection: Collection[Dict[str, Any]], + index_name: str = "vector_index", + text_key: Union[str, List[str]] = "content", + model: str = "voyage-3-large", + auto_create_index: bool | None = None, + auto_index_timeout: int = 15, + **kwargs: Any, + ): + + self._collection = collection + self._index_name = index_name + self._text_key = text_key if isinstance(text_key, str) else text_key[0] + self._model = model + + # append_metadata was added in PyMongo 4.14.0, but is a valid database name on earlier versions + _append_client_metadata(self._collection.database.client) + + if auto_create_index is False: + return + if not any([ix["name"] == index_name for ix in collection.list_search_indexes()]): + create_autoembedded_vector_search_index( + collection=collection, + index_name=index_name, + path=text_key, + model=model, + wait_until_complete=auto_index_timeout, + **kwargs + ) + + @classmethod + def from_connection_string( + cls, + connection_string: str, + database_name: str, + collection_name: str, + **kwargs: Any, + ) -> MongoDBAtlasVectorSearch: + """Construct a `MongoDB Atlas Vector Search` vector store + from a MongoDB connection URI. + + Args: + connection_string: A valid MongoDB connection URI. + database_name: Will use this database or create a new one. + collection_name: Will use this collection or create a new one. + + + Returns: + A new MongoDBAtlasVectorSearch instance. + + """ + client: MongoClient = MongoClient( + connection_string, + driver=DRIVER_METADATA, + ) + collection = client[database_name][collection_name] + return cls(collection, **kwargs) + + @property + def collection(self) -> Collection[Dict[str, Any]]: + return self._collection + + def close(self) -> None: + """Close the resources used by the MongoDBAtlasVectorSearch.""" + self._collection.database.client.close() + + + def bulk_embed_and_insert_texts( + self, + texts: Union[List[str], Iterable[str]], + metadatas: Union[List[dict], Generator[dict, Any, Any]], + ids: Optional[List[str]] = None, + ) -> List[str]: + """Bulk insert single batch of texts, metadatas, and optionally ids. + + See add_texts for additional details. + """ + if not texts: + return [] + + if not ids: + ids = [str(ObjectId()) for _ in range(len(list(texts)))] + docs = [ + { + "_id": str_to_oid(i), + self._text_key: t, + **m, + } + for i, t, m, in zip(ids, texts, metadatas) + ] + operations = [ReplaceOne({"_id": doc["_id"]}, doc, upsert=True) for doc in docs] + # insert the documents in MongoDB Atlas + result = self._collection.bulk_write(operations) + assert result.upserted_ids is not None + return [oid_to_str(_id) for _id in result.upserted_ids.values()] + + def add_documents( + self, + documents: List[Document], + ids: Optional[List[str]] = None, + batch_size: int = DEFAULT_INSERT_BATCH_SIZE, + **kwargs: Any, + ) -> List[str]: + """Add documents to the vectorstore. + + Args: + documents: Documents to add to the vectorstore. + ids: Optional list of unique ids that will be used as index in VectorStore. + See note on ids in add_texts. + batch_size: Number of documents to insert at a time. + Tuning this may help with performance and sidestep MongoDB limits. + + Returns: + List of IDs of the added texts. + """ + n_docs = len(documents) + if ids: + assert len(ids) == n_docs, "Number of ids must equal number of documents." + else: + ids = [doc.id or str(ObjectId()) for doc in documents] + result_ids = [] + start = 0 + for end in range(batch_size, n_docs + batch_size, batch_size): + texts, metadatas = zip( + *[(doc.page_content, doc.metadata) for doc in documents[start:end]] + ) + result_ids.extend( + self.bulk_embed_and_insert_texts( + texts=texts, metadatas=metadatas, ids=ids[start:end] + ) + ) + start = end + return result_ids + + def similarity_search_with_score( + self, + query: str, + k: int = 4, + pre_filter: Optional[Dict[str, Any]] = None, + post_filter_pipeline: Optional[List[Dict]] = None, + oversampling_factor: int = 10, + include_embeddings: bool = False, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + + # Atlas Vector Search on auto-embedding index + + search_defn = { + "query": query, + "index": self._index_name, + "path": self._text_key, + "limit": k, + "numCandidates": k * oversampling_factor, + } + if pre_filter: + search_defn["filter"] = pre_filter + + pipeline = [ + {"$vectorSearch": search_defn}, + {"$set": {"score": {"$meta": "vectorSearchScore"}}}, + ] + + # Post-processing + if post_filter_pipeline is not None: + pipeline.extend(post_filter_pipeline) + + # Execution + cursor = self._collection.aggregate(pipeline) # type: ignore[arg-type] + docs = [] + + # Format + for res in cursor: + if self._text_key not in res: + continue + text = res.pop(self._text_key) + score = res.pop("score") + make_serializable(res) + docs.append( + (Document(page_content=text, metadata=res, id=res["_id"]), score) + ) + return docs + + @classmethod + def from_texts( + cls, + texts: List[str], + embedding: Embeddings = VoyageAIEmbeddings(model="voyage-3-large"), + metadatas: Optional[List[Dict]] = None, + collection: Optional[Collection] = None, + ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> MongoDBAtlasVectorSearch: + """Construct a `MongoDB Atlas Vector Search` vector store from raw documents. + + This is a user-friendly interface that: + 1. Embeds documents. + 2. Adds the documents to a provided MongoDB Atlas Vector Search index + (Lucene) + + This is intended to be a quick way to get started. + + See `MongoDBAtlasVectorSearch` for kwargs and further description. + + + Example: + .. code-block:: python + from pymongo import MongoClient + + from langchain_mongodb import MongoDBAtlasVectorSearch + from langchain_openai import OpenAIEmbeddings + + mongo_client = MongoClient("") + collection = mongo_client[""][""] + embeddings = OpenAIEmbeddings() + vectorstore = MongoDBAtlasVectorSearch.from_texts( + texts, + embeddings, + metadatas=metadatas, + collection=collection + ) + """ + if collection is None: + raise ValueError("Must provide 'collection' named parameter.") + vectorstore = cls(collection, embedding.model, **kwargs) + vectorstore.add_texts(texts=texts, metadatas=metadatas, ids=ids, **kwargs) + return vectorstore + + def similarity_search( + self, + query: str, + k: int = 4, + pre_filter: Optional[Dict[str, Any]] = None, + post_filter_pipeline: Optional[List[Dict]] = None, + oversampling_factor: int = 10, + include_scores: bool = False, + include_embeddings: bool = False, + **kwargs: Any, + ) -> List[Document]: # noqa: E501 + """Return MongoDB documents most similar to the given query. + + Atlas Vector Search eliminates the need to run a separate + search system alongside your database. + + Args: + query: Input text of semantic query + k: (Optional) number of documents to return. Defaults to 4. + pre_filter: List of MQL match expressions comparing an indexed field + post_filter_pipeline: (Optional) Pipeline of MongoDB aggregation stages + to filter/process results after $vectorSearch. + oversampling_factor: Multiple of k used when generating number of candidates + at each step in the HNSW Vector Search, + include_scores: If True, the query score of each result + will be included in metadata. + include_embeddings: If True, the embedding vector of each result + will be included in metadata. + kwargs: Additional arguments are specific to the search_type + + Returns: + List of documents most similar to the query and their scores. + """ + docs_and_scores = self.similarity_search_with_score( + query, + k=k, + pre_filter=pre_filter, + post_filter_pipeline=post_filter_pipeline, + oversampling_factor=oversampling_factor, + include_embeddings=include_embeddings, + **kwargs, + ) + + if include_scores: + for doc, score in docs_and_scores: + doc.metadata["score"] = score + return [doc for doc, _ in docs_and_scores] diff --git a/libs/langchain-mongodb/pyproject.toml b/libs/langchain-mongodb/pyproject.toml index 57508319..7e728f9b 100644 --- a/libs/langchain-mongodb/pyproject.toml +++ b/libs/langchain-mongodb/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "langchain-text-splitters>=0.3", "numpy>=1.26", "lark<2.0.0,>=1.1.9", + "langchain-voyageai>=0.1.7", ] [dependency-groups] diff --git a/libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py b/libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py new file mode 100644 index 00000000..c9394fd2 --- /dev/null +++ b/libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py @@ -0,0 +1,90 @@ +"""Test MongoDB Atlas Vector Search on Collections with Auto-embedding indexes.""" + +from __future__ import annotations + +from typing import Any, Dict, List + +import pytest # type: ignore[import-not-found] +from bson import ObjectId +from langchain_core.documents import Document +from langchain_core.embeddings import Embeddings +from pymongo import MongoClient +from pymongo.collection import Collection + +from langchain_mongodb import MongoDBAtlasVectorSearch +from langchain_mongodb.vectorstores import AutoEmbeddingVectorStore +from langchain_tests.integration_tests import VectorStoreIntegrationTests +from langchain_mongodb.utils import oid_to_str + +from ..utils import DB_NAME, ConsistentFakeEmbeddings, PatchedMongoDBAtlasVectorSearch + +INDEX_NAME = "langchain-test-index-autoembedding" +COLLECTION_NAME = "langchain_test_autoembedding" + +@pytest.fixture +def collection(client: MongoClient) -> Collection: + clx = client[DB_NAME][COLLECTION_NAME] + clx.delete_many({}) + return clx + +@pytest.fixture(scope="module") +def texts() -> List[str]: + return [ + "Dogs are tough.", + "Cats have fluff.", + "What is a sandwich?", + "That fence is purple.", + ] + +@pytest.fixture(scope="module") +def metadatas() -> List[Dict[str, Any]]: + return [ + {"a": 1}, + {"b": 1}, + {"c": 1}, + {"d": 1, "e": 2}, + ] + + + +def test_autoembedding(collection: Collection, texts: List[str], metadatas: List[Dict[str, Any]]) -> None: + vectorstore = AutoEmbeddingVectorStore( + collection=collection, + index_name=INDEX_NAME, + text_key="content", + model="voyage-3-large", + auto_create_index=True, + auto_index_timeout=60 + ) + + assert any([ix["name"] == INDEX_NAME for ix in collection.list_search_indexes()]) + + vectorstore.collection.delete_many({}) + + n_docs = len(texts) + documents = [ + Document(page_content=texts[i], metadata=metadatas[i]) for i in range(n_docs) + ] + result_ids = vectorstore.add_documents(documents) + assert len(result_ids) == n_docs + + vectorstore.similarity_search_with_score("Animals") + + +''' +# TODO - Run standard test suite (See test_vectorstore_standard.py) +class TestMongoDBAtlasVectorSearch(VectorStoreIntegrationTests): + @pytest.fixture() + def vectorstore(self, collection) -> VectorStore: # type: ignore + """Get an empty vectorstore for unit tests.""" + store = AutoEmbeddingVectorStore( + collection, index_name=INDEX_NAME, text_key="text", model="voyage-3-large" + ) + # note: store should be EMPTY at this point + # if you need to delete data, you may do so here + return store +''' + + +# TODO +# 1. Test add_texts. We have this Embeddings arg to deal with. \ No newline at end of file From 3f1b56035b5be77e4bdfeccca10b13de9d68fadf Mon Sep 17 00:00:00 2001 From: Casey Clements Date: Fri, 12 Sep 2025 09:15:20 -0400 Subject: [PATCH 2/3] Make text_key consistent and finish test POC --- libs/langchain-mongodb/langchain_mongodb/vectorstores.py | 2 +- .../tests/integration_tests/test_autoembedding.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/libs/langchain-mongodb/langchain_mongodb/vectorstores.py b/libs/langchain-mongodb/langchain_mongodb/vectorstores.py index b686ed1e..cb895433 100644 --- a/libs/langchain-mongodb/langchain_mongodb/vectorstores.py +++ b/libs/langchain-mongodb/langchain_mongodb/vectorstores.py @@ -878,7 +878,7 @@ def __init__( self, collection: Collection[Dict[str, Any]], index_name: str = "vector_index", - text_key: Union[str, List[str]] = "content", + text_key: Union[str, List[str]] = "text", model: str = "voyage-3-large", auto_create_index: bool | None = None, auto_index_timeout: int = 15, diff --git a/libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py b/libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py index c9394fd2..f910596d 100644 --- a/libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py +++ b/libs/langchain-mongodb/tests/integration_tests/test_autoembedding.py @@ -51,7 +51,7 @@ def test_autoembedding(collection: Collection, texts: List[str], metadatas: List vectorstore = AutoEmbeddingVectorStore( collection=collection, index_name=INDEX_NAME, - text_key="content", + text_key="text", model="voyage-3-large", auto_create_index=True, auto_index_timeout=60 @@ -68,7 +68,10 @@ def test_autoembedding(collection: Collection, texts: List[str], metadatas: List result_ids = vectorstore.add_documents(documents) assert len(result_ids) == n_docs - vectorstore.similarity_search_with_score("Animals") + found = vectorstore.similarity_search_with_score("Animals", k=2) + assert len(found) == 2 + assert all(res[0].page_content in ["Dogs are tough.", "Cats have fluff."] for res in found) + ''' From 55a46f1d15e4fc176b659e0bfa11eb5be34c4332 Mon Sep 17 00:00:00 2001 From: Casey Clements Date: Mon, 15 Sep 2025 10:06:58 -0400 Subject: [PATCH 3/3] Test that MongoDBAtlasVectorSearch can take default argument of None and standrd test suite passes. Added #TODO to fold AutoEmbeddingVectorStore into core one. --- libs/langchain-mongodb/langchain_mongodb/vectorstores.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libs/langchain-mongodb/langchain_mongodb/vectorstores.py b/libs/langchain-mongodb/langchain_mongodb/vectorstores.py index cb895433..03fdcd36 100644 --- a/libs/langchain-mongodb/langchain_mongodb/vectorstores.py +++ b/libs/langchain-mongodb/langchain_mongodb/vectorstores.py @@ -48,6 +48,10 @@ DEFAULT_INSERT_BATCH_SIZE = 100 +# TODO: fold the autoembedding indexes into MongoDBAtlasVectorSearch +# Add flag containing type: e.g. self.embedding_type in ["manual", "auto"], autoembedding (bool) +# - OR add model (str) param and infer: e.g. if embedding is not None: self.embedding_type = "manual" +# - assert not embedding and model class MongoDBAtlasVectorSearch(VectorStore): """MongoDB Atlas vector store integration. @@ -204,7 +208,7 @@ class MongoDBAtlasVectorSearch(VectorStore): def __init__( self, collection: Collection[Dict[str, Any]], - embedding: Embeddings, + embedding: Embeddings = None, index_name: str = "vector_index", text_key: Union[str, List[str]] = "text", embedding_key: str = "embedding", @@ -1061,7 +1065,7 @@ def similarity_search_with_score( def from_texts( cls, texts: List[str], - embedding: Embeddings = VoyageAIEmbeddings(model="voyage-3-large"), + embedding: Embeddings = None, # VoyageAIEmbeddings(model="voyage-3-large"), metadatas: Optional[List[Dict]] = None, collection: Optional[Collection] = None, ids: Optional[List[str]] = None,