diff --git a/llama_index/indices/vector_store/base.py b/llama_index/indices/vector_store/base.py index f64c1c5cad..57c3e355c1 100644 --- a/llama_index/indices/vector_store/base.py +++ b/llama_index/indices/vector_store/base.py @@ -145,7 +145,7 @@ async def _async_add_nodes_to_index( return nodes = await self._aget_node_with_embedding(nodes, show_progress) - new_ids = self._vector_store.add(nodes) + new_ids = await self._vector_store.async_add(nodes) # if the vector store doesn't store text, we need to add the nodes to the # index struct and document store diff --git a/llama_index/vector_stores/pinecone.py b/llama_index/vector_stores/pinecone.py index c624b0c6b8..7918bc3abd 100644 --- a/llama_index/vector_stores/pinecone.py +++ b/llama_index/vector_stores/pinecone.py @@ -4,6 +4,7 @@ """ +import asyncio import logging from collections import Counter from functools import partial @@ -11,6 +12,7 @@ from llama_index.bridge.pydantic import PrivateAttr from llama_index.schema import BaseNode, MetadataMode, TextNode +from llama_index.utils import iter_batch from llama_index.vector_stores.types import ( BasePydanticVectorStore, MetadataFilters, @@ -30,7 +32,9 @@ SPARSE_VECTOR_KEY = "sparse_values" METADATA_KEY = "metadata" -DEFAULT_BATCH_SIZE = 100 +DEFAULT_BATCH_SIZE = 200 + +SEM_MAX_CONCURRENT = 10 _logger = logging.getLogger(__name__) @@ -97,6 +101,18 @@ def _to_pinecone_filter(standard_filters: MetadataFilters) -> dict: return filters +async def async_upload( + index: Any, vectors: List[Dict], batch_size: int, semaphore: asyncio.Semaphore +) -> None: + async def send_batch(batch: List[Dict]): # type: ignore + async with semaphore: + return await asyncio.to_thread(index.upsert, batch, async_req=True) + + await asyncio.gather( + *[send_batch(chunk) for chunk in iter_batch(vectors, size=batch_size)] + ) + + import_err_msg = ( "`pinecone` package not found, please run `pip install pinecone-client`" ) @@ -170,7 +186,7 @@ def __init__( if tokenizer is None and add_sparse_vector: tokenizer = get_default_tokenizer() - self._tokenizer = tokenizer + self._tokenizer = tokenizer # type: ignore super().__init__( index_name=index_name, @@ -223,46 +239,70 @@ def from_params( def class_name(cls) -> str: return "PinconeVectorStore" - def add( - self, - nodes: List[BaseNode], - ) -> List[str]: - """Add nodes to index. - - Args: - nodes: List[BaseNode]: list of nodes with embeddings - - """ - ids = [] + def _prepare_entries_for_upsert(self, nodes: List[BaseNode]) -> List[Dict]: entries = [] for node in nodes: - node_id = node.node_id - metadata = node_to_metadata_dict( node, remove_text=False, flat_metadata=self.flat_metadata ) entry = { - ID_KEY: node_id, + ID_KEY: node.node_id, VECTOR_KEY: node.get_embedding(), METADATA_KEY: metadata, } - if self.add_sparse_vector and self._tokenizer is not None: + + if self.add_sparse_vector: sparse_vector = generate_sparse_vectors( [node.get_content(metadata_mode=MetadataMode.EMBED)], - self._tokenizer, + self._tokenizer, # type: ignore )[0] entry[SPARSE_VECTOR_KEY] = sparse_vector - ids.append(node_id) entries.append(entry) - self._pinecone_index.upsert( - entries, - namespace=self.namespace, - batch_size=self.batch_size, - **self.insert_kwargs, - ) - return ids + + return entries + + def add( + self, + nodes: List[BaseNode], + ) -> List[str]: + """Add nodes to index. + + Args: + nodes: List[BaseNode]: list of nodes with embeddings + + """ + entries = self._prepare_entries_for_upsert(nodes) + + [ + self._pinecone_index.upsert( + vectors=batch, + async_req=True, + ) + for batch in iter_batch(entries, self.batch_size) + ] + + return [entry[ID_KEY] for entry in entries] + + async def async_add( + self, + nodes: List[BaseNode], + ) -> List[str]: + """Asynchronously add a list of embedding results to the collection. + + Args: + nodes (List[BaseNode]): Embedding results to add. + + Returns: + List[str]: List of IDs of the added documents. + """ + entries = self._prepare_entries_for_upsert(nodes) + + semaphore = asyncio.Semaphore(SEM_MAX_CONCURRENT) + await async_upload(self._pinecone_index, entries, DEFAULT_BATCH_SIZE, semaphore) + + return [entry[ID_KEY] for entry in entries] def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: """ diff --git a/tests/indices/vector_store/utils.py b/tests/indices/vector_store/utils.py index a8c5044d02..d15de7779d 100644 --- a/tests/indices/vector_store/utils.py +++ b/tests/indices/vector_store/utils.py @@ -12,19 +12,19 @@ class MockPineconeIndex: def __init__(self) -> None: """Mock pinecone index.""" - self._tuples: List[Dict[str, Any]] = [] + self._vectors: List[Dict[str, Any]] = [] - def upsert(self, tuples: List[Dict[str, Any]], **kwargs: Any) -> None: + def upsert(self, vectors: List[Dict[str, Any]], **kwargs: Any) -> None: """Mock upsert.""" - self._tuples.extend(tuples) + self._vectors.extend(vectors) def delete(self, ids: List[str]) -> None: """Mock delete.""" - new_tuples = [] - for tup in self._tuples: - if tup["id"] not in ids: - new_tuples.append(tup) - self._tuples = new_tuples + new_vectors = [] + for vec in self._vectors: + if vec["id"] not in ids: + new_vectors.append(vec) + self._vectors = new_vectors def query( self, @@ -38,7 +38,7 @@ def query( ) -> Any: """Mock query.""" # index_mat is n x k - index_mat = np.array([tup["values"] for tup in self._tuples]) + index_mat = np.array([tup["values"] for tup in self._vectors]) query_vec = np.array(vector)[np.newaxis, :] # compute distances @@ -49,10 +49,10 @@ def query( matches = [] for index in indices: - tup = self._tuples[index] + vec = self._vectors[index] match = MagicMock() - match.metadata = tup["metadata"] - match.id = tup["id"] + match.metadata = vec["metadata"] + match.id = vec["id"] matches.append(match) response = MagicMock()