diff --git a/app/celery.py b/app/celery.py index 2fd1261..083a67b 100644 --- a/app/celery.py +++ b/app/celery.py @@ -18,9 +18,36 @@ ) +@celery.task(name="index_full_file_content") +def index_full_file_content(content_base): + from app.main import main_app + print(f"[+ Index Full File Content: {content_base} +]") + + file_downloader = S3FileDownloader( + os.environ.get("AWS_STORAGE_ACCESS_KEY"), + os.environ.get("AWS_STORAGE_SECRET_KEY") + ) + content_base_indexer = main_app.content_base_indexer + text_splitter = TextSplitter(character_text_splitter()) + manager = IndexerFileManager( + file_downloader, + content_base_indexer, + text_splitter, + ) + index_result: bool = manager.index_full_text(content_base) + print(f"[+ Index Full File Result: {index_result} +]") + # TODO: retry indexing full text or delete embeddings + NexusRESTClient().index_succedded( + task_succeded=index_result, + nexus_task_uuid=content_base.get("task_uuid"), + file_type=content_base.get("extension_file") + ) + + @celery.task(name="index_file") def index_file_data(content_base: Dict) -> bool: from app.main import main_app + print(f"[+ Index File Data: {content_base} +]") file_downloader = S3FileDownloader( os.environ.get("AWS_STORAGE_ACCESS_KEY"), @@ -33,16 +60,22 @@ def index_file_data(content_base: Dict) -> bool: content_base_indexer, text_splitter, ) - index_result: bool = manager.index_file_url(content_base) - embbed_result: bool = content_base_indexer.check_if_doc_was_embedded_document( - file_uuid=content_base.get("file_uuid"), - content_base_uuid=str(content_base.get('content_base')), - ) + index_result: bool = manager.index(content_base) - index_result = index_result and embbed_result + print(f"[+ Index File URL result: {index_result} +]") + + if index_result: + embbed_result: bool = content_base_indexer.check_if_doc_was_embedded_document( + file_uuid=content_base.get("file_uuid"), + content_base_uuid=str(content_base.get('content_base')), + ) + print(f"[+ File was embbeded: {embbed_result} +]") + if embbed_result: + index_full_file_content.delay(content_base) + return NexusRESTClient().index_succedded( - task_succeded=index_result, + task_succeded=False, nexus_task_uuid=content_base.get("task_uuid"), file_type=content_base.get("extension_file") ) diff --git a/app/indexer/content_bases.py b/app/indexer/content_bases.py index f90dc60..1671ad5 100644 --- a/app/indexer/content_bases.py +++ b/app/indexer/content_bases.py @@ -1,3 +1,5 @@ +import os + from langchain.docstore.document import Document from app.handlers.products import Product @@ -5,6 +7,7 @@ from app.store import IStorage from typing import List from uuid import UUID +from app.rerank import rerank_chunks class ContentBaseIndexer(IDocumentIndexer): @@ -12,6 +15,9 @@ def __init__(self, storage: IStorage): self.storage = storage def index_documents(self, docs: List[Document]): + DOCUMENTS_BATCH_SIZE: int = os.environ.get("DOCUMENTS_BATCH_SIZE", 500) + docs_size: int = len(docs) + file_uuid = docs[0].metadata["file_uuid"] content_base_uuid = docs[0].metadata["content_base_uuid"] @@ -24,7 +30,10 @@ def index_documents(self, docs: List[Document]): ids = [item["_id"] for item in results] self.storage.delete(ids=ids) - return self.storage.save(docs) + for i in range(0, docs_size, DOCUMENTS_BATCH_SIZE): + self.storage.save(docs[i:DOCUMENTS_BATCH_SIZE + i]) + + return def index(self, texts: List, metadatas: dict): results = self._search_docs_by_content_base_uuid( @@ -53,15 +62,19 @@ def search(self, search, filter=None, threshold=0.1) -> list[Product]: for doc in matched_responses: full_page = doc.metadata.get("full_page") + if full_page not in seen: seen.add(full_page) return_list.append({ + "text": full_page, "full_page": full_page, "filename": doc.metadata.get("filename"), "file_uuid": doc.metadata.get("file_uuid"), }) - return return_list + chunks = rerank_chunks(search, return_list, 0.4, 5) + + return chunks def _search_docs_by_content_base_uuid( self, diff --git a/app/indexer/indexer_file_manager.py b/app/indexer/indexer_file_manager.py index 5a2dc71..747ffa8 100644 --- a/app/indexer/indexer_file_manager.py +++ b/app/indexer/indexer_file_manager.py @@ -1,7 +1,8 @@ from app.loaders import ( load_file_and_get_raw_text, load_file_url_and_get_raw_text, - load_file_url_and_split_text + load_file_url_and_split_text, + load_file_and_get_chunks ) from app.text_splitters import get_split_text from typing import Dict, List @@ -14,8 +15,9 @@ def get_file_metadata(content_base: Dict) -> Dict[str, str]: return { - 'source': content_base.get("filename"), - "content_base_uuid": str(content_base.get('content_base')) + "content_base_uuid": str(content_base.get('content_base')), + "filename": content_base.get("filename"), + "file_uuid": content_base.get("file_uuid") } @@ -48,13 +50,36 @@ def __init__(self, self.content_base_indexer = content_base_indexer self.text_splitter = text_splitter + def index(self, content_base, **kwargs) -> bool: + print(f"[+ File {content_base.get('file_uuid')} +]") + try: + load_type = content_base.get("load_type") + metadata = get_file_metadata(content_base) + docs: List[Document] + docs, _ = load_file_and_get_chunks( + content_base.get("file"), + content_base.get('extension_file'), + metadata, + load_type=load_type + ) + + try: + self.content_base_indexer.index_documents(docs) + return True + except Exception as e: + logger.exception(e) + return False + except Exception as e: # TODO: handle exceptions + logger.exception(e) + return False + def index_file_url(self, content_base, **kwargs) -> bool: + print(f"[+ File {content_base.get('file_uuid')} +]") + load_type = content_base.get("load_type") docs: List[Document] - full_content: str - - docs, full_content = load_file_url_and_split_text( + docs, _ = load_file_url_and_split_text( content_base.get("file"), content_base.get('extension_file'), self.text_splitter, @@ -63,12 +88,6 @@ def index_file_url(self, content_base, **kwargs) -> bool: document_pages: List[Document] = add_file_metadata(docs, content_base) try: self.content_base_indexer.index_documents(document_pages) - self.content_base_indexer.index_doc_content( - full_content=full_content, - content_base_uuid=str(content_base.get('content_base')), - filename=content_base.get("filename"), - file_uuid=content_base.get("file_uuid"), - ) return True except Exception as e: # TODO: handle exceptions logger.exception(e) @@ -109,3 +128,27 @@ def index_file(self, content_base): except Exception as e: # TODO: handle exceptions logger.exception(e) return False + + def index_full_text(self, content_base, **kwargs): + full_content: str + load_type = content_base.get("load_type") + + try: + _, full_content = load_file_url_and_split_text( + content_base.get("file"), + content_base.get('extension_file'), + self.text_splitter, + load_type=load_type, + return_split_text=False, + return_full_content=True, + ) + self.content_base_indexer.index_doc_content( + full_content=full_content, + content_base_uuid=str(content_base.get('content_base')), + filename=content_base.get("filename"), + file_uuid=content_base.get("file_uuid"), + ) + return True + except Exception as e: + logger.exception(e) + return False diff --git a/app/loaders/__init__.py b/app/loaders/__init__.py index 1eace35..a2e8f66 100644 --- a/app/loaders/__init__.py +++ b/app/loaders/__init__.py @@ -16,7 +16,7 @@ URLsLoader, ) from langchain.schema.document import Document -from typing import List +from typing import List, Dict from app.text_splitters import ITextSplitter from typing import Tuple @@ -73,9 +73,10 @@ def load_file_url_and_split_text( file_url: str, file_type: str, text_splitter: ITextSplitter, + return_split_text: bool = True, + return_full_content: bool = False, **kwargs ) -> Tuple[List[Document], str]: - load_type = kwargs.get("load_type", None) loader = supported_loaders_cls.get(file_type) @@ -84,4 +85,38 @@ def load_file_url_and_split_text( file=file_url, load_type=load_type ) - return data_loader.load_and_split_text(text_splitter) + return data_loader.load_and_split_text( + text_splitter, + return_split_text, + return_full_content, + ) + + +def load_file_and_get_chunks( + file_url: str, + file_type: str, + metadata: Dict, + return_split_text: bool = True, + return_full_content: bool = False, + **kwargs +) -> Tuple[List[Document], str]: + + print("=================================================") + print("[+ load_file_and_get_chunks + ]") + print(f"[+ return_split_text: {return_split_text} +]") + print(f"[+ return_full_content: {return_full_content} +]") + print("=================================================") + + load_type = kwargs.get("load_type", None) + loader = supported_loaders_cls.get(file_type) + + data_loader = DataLoaderCls( + loader=loader, + file=file_url, + load_type=load_type + ) + return data_loader.load_and_get_chunks( + metadata, + return_split_text, + return_full_content + ) diff --git a/app/loaders/loaders.py b/app/loaders/loaders.py index d0c98a0..943ad97 100644 --- a/app/loaders/loaders.py +++ b/app/loaders/loaders.py @@ -2,7 +2,7 @@ import uuid import requests from abc import ABC, abstractmethod -from typing import Tuple +from typing import Tuple, Dict from urllib.request import urlretrieve from urllib.parse import urlparse @@ -18,6 +18,14 @@ from langchain.schema.document import Document from typing import Callable, List, Union from app.text_splitters import ITextSplitter +from langchain.text_splitter import RecursiveCharacterTextSplitter +from app.util import count_words + + +PARENT_CHUNK_SIZE = os.environ.get("PARENT_CHUNK_SIZE", 1125) +CHILD_CHUNK_SIZE = os.environ.get("CHILD_CHUNK_SIZE", 225) +CHUNK_OVERLAP = os.environ.get("CHUNK_OVERLAP", 45) +LENGTH_FUNCTION = count_words class DocumentLoader(ABC): @@ -26,6 +34,61 @@ class DocumentLoader(ABC): def load(self): pass + def load_and_get_chunks( + self, + metadata: Dict, + return_split_text: bool = True, + return_full_content: bool = False, + ) -> Tuple[List[Document], str]: + + split_pages = [] + full_content = "" + + if return_full_content: + data: List[Document] = self.loader.load() + for page in data: + full_content += page.page_content + + if return_split_text: + + parent_text_splitter = RecursiveCharacterTextSplitter( + chunk_size=PARENT_CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + length_function=LENGTH_FUNCTION, + ) + + child_text_splitter = RecursiveCharacterTextSplitter( + chunk_size=CHILD_CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + length_function=LENGTH_FUNCTION, + ) + + data: List[Document] = self.loader.load() + + page: Document + child_chunks: List[Document] = [] + + for page in data: + content: str = page.page_content + metadata.update(page.metadata) + + parent_chunks = parent_text_splitter.split_text(content) + + for chunk in parent_chunks: + chunk = chunk.replace("\t", "").replace("\\t", "").replace("\\n", "\n") + metadata.update({"full_page": chunk}) + child_texts: List[str] = child_text_splitter.split_text(chunk) + + for child_text in child_texts: + child_chunk = Document( + page_content=child_text, + metadata=metadata + ) + child_chunks.append(child_chunk) + + split_pages = child_chunks + return (split_pages, full_content) + class DataLoaderCls: def __init__(self, loader: DocumentLoader, file: str, **kwargs) -> None: @@ -43,13 +106,27 @@ def load(self) -> List[Document]: def load_and_split_text( self, - text_splitter: ITextSplitter + text_splitter: ITextSplitter, + return_split_text: bool = True, + return_full_content: bool = False, ) -> Tuple[List[Document], str]: - return self.loader.load_and_split_text(text_splitter) + return self.loader.load_and_split_text( + text_splitter, + return_split_text, + return_full_content, + ) def raw_text(self) -> str: return self.loader.raw_text() + def load_and_get_chunks( + self, + metadata: Dict, + return_split_text: bool = True, + return_full_content: bool = False, + ) -> Tuple[List[Document], str]: + return self.loader.load_and_get_chunks(metadata, return_split_text, return_full_content) + class DataLoader: def __init__(self, loader: Callable, file: str) -> None: @@ -100,26 +177,34 @@ def load(self) -> List[Document]: def load_and_split_text( self, - text_splitter: ITextSplitter + text_splitter: ITextSplitter, + return_split_text: bool = True, + return_full_content: bool = False, ) -> Tuple[List[Document], str]: - data: List[Document] = self.loader.load() - full_content = data[0].page_content - pages = self.load() split_pages = [] - for page in pages: - page_content = page.page_content - metadatas = page.metadata - metadatas.update({"full_page": page_content}) - - text_chunks = text_splitter.split_text(page_content) - for chunk in text_chunks: - split_pages.append( - Document( - page_content=chunk, - metadata=metadatas + full_content = "" + + if return_full_content: + data: List[Document] = self.loader.load() + full_content = data[0].page_content + + if return_split_text: + pages = self.load() + split_pages = [] + for page in pages: + page_content = page.page_content + metadatas = page.metadata + metadatas.update({"full_page": page_content}) + + text_chunks = text_splitter.split_text(page_content) + for chunk in text_chunks: + split_pages.append( + Document( + page_content=chunk, + metadata=metadatas + ) ) - ) return (split_pages, full_content) @@ -143,27 +228,35 @@ def load(self) -> List[Document]: def load_and_split_text( self, - text_splitter: ITextSplitter + text_splitter: ITextSplitter, + return_split_text: bool = True, + return_full_content: bool = False, ) -> Tuple[List[Document], str]: - data: List[Document] = self.loader.load() - full_content = data[0].page_content - pages = self.load() split_pages = [] - - for page in pages: - page_content = page.page_content - metadatas = page.metadata - metadatas.update({"full_page": page_content}) - - text_chunks = text_splitter.split_text(page_content) - for chunk in text_chunks: - split_pages.append( - Document( - page_content=chunk, - metadata=metadatas + full_content = "" + + if return_full_content: + data: List[Document] = self.loader.load() + full_content = data[0].page_content + + if return_split_text: + pages = self.load() + split_pages = [] + + for page in pages: + page_content = page.page_content + metadatas = page.metadata + metadatas.update({"full_page": page_content}) + + text_chunks = text_splitter.split_text(page_content) + for chunk in text_chunks: + split_pages.append( + Document( + page_content=chunk, + metadata=metadatas + ) ) - ) return (split_pages, full_content) @@ -196,27 +289,89 @@ def load(self) -> List[Document]: def load_and_split_text( self, - text_splitter: ITextSplitter + text_splitter: ITextSplitter, + return_split_text: bool = True, + return_full_content: bool = False, ) -> Tuple[List[Document], str]: - data: List[Document] = self.loader.load() - full_content = data[0].page_content - - pages = self.load() split_pages = [] - for page in pages: - page_content = page.page_content - metadatas = page.metadata - metadatas.update({"full_page": page_content}) - - text_chunks = text_splitter.split_text(page_content) - for chunk in text_chunks: - split_pages.append( - Document( - page_content=chunk, - metadata=metadatas + full_content = "" + + if return_full_content: + data: List[Document] = self.loader.load() + full_content = data[0].page_content + + if return_split_text: + pages = self.load() + split_pages = [] + for page in pages: + page_content = page.page_content + metadatas = page.metadata + metadatas.update({"full_page": page_content}) + + text_chunks = text_splitter.split_text(page_content) + for chunk in text_chunks: + split_pages.append( + Document( + page_content=chunk, + metadata=metadatas + ) ) - ) + return (split_pages, full_content) + + def load_and_get_chunks( + self, + metadata: Dict, + return_split_text: bool = True, + return_full_content: bool = False, + ) -> Tuple[List[Document], str]: + + split_pages = [] + full_content = "" + + if return_full_content: + data: List[Document] = self.loader.load() + for page in data: + full_content += page.page_content + + if return_split_text: + + parent_text_splitter = RecursiveCharacterTextSplitter( + chunk_size=PARENT_CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + length_function=LENGTH_FUNCTION, + ) + + child_text_splitter = RecursiveCharacterTextSplitter( + chunk_size=CHILD_CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + length_function=LENGTH_FUNCTION, + ) + + data: List[Document] = self.loader.load() + + page: Document + child_chunks: List[Document] = [] + + for page in data: + content: str = page.page_content + metadata.update(page.metadata) + + parent_chunks = parent_text_splitter.split_text(content) + + for chunk in parent_chunks: + chunk = chunk.replace("\t", "").replace("\\t", "").replace("\\n", "\n") + metadata.update({"full_page": chunk}) + child_texts: List[str] = child_text_splitter.split_text(chunk) + + for child_text in child_texts: + child_chunk = Document( + page_content=child_text, + metadata=metadata + ) + child_chunks.append(child_chunk) + + split_pages = child_chunks return (split_pages, full_content) @@ -257,27 +412,34 @@ def load(self) -> List[Document]: def load_and_split_text( self, - text_splitter: ITextSplitter + text_splitter: ITextSplitter, + return_split_text: bool = True, + return_full_content: bool = False, ) -> Tuple[List[Document], str]: - data: List[Document] = self.load() - full_content: str = data[0].page_content - - pages = self.load() split_pages = [] - for page in pages: - page_content = page.page_content - metadatas = page.metadata - metadatas.update({"full_page": page_content}) - - text_chunks = text_splitter.split_text(page_content) - for chunk in text_chunks: - split_pages.append( - Document( - page_content=chunk, - metadata=metadatas + full_content = "" + + if return_full_content: + data: List[Document] = self.load() + full_content: str = data[0].page_content + + if return_split_text: + pages = self.load() + split_pages = [] + for page in pages: + page_content = page.page_content + metadatas = page.metadata + metadatas.update({"full_page": page_content}) + + text_chunks = text_splitter.split_text(page_content) + for chunk in text_chunks: + split_pages.append( + Document( + page_content=chunk, + metadata=metadatas + ) ) - ) return (split_pages, full_content) @@ -301,23 +463,31 @@ def load(self) -> List[Document]: def load_and_split_text( self, - text_splitter: ITextSplitter + text_splitter: ITextSplitter, + return_split_text: bool = True, + return_full_content: bool = False, ) -> Tuple[List[Document], str]: + split_pages = [] - data: List[Document] = self.load() - full_content: str = data[0].page_content - pages = self.loader.load_and_split() - for page in pages: - page_content = page.page_content - metadatas = page.metadata - metadatas.update({"full_page": page_content}) - - text_chunks = text_splitter.split_text(page_content) - for chunk in text_chunks: - split_pages.append( - Document( - page_content=chunk, - metadata=metadatas + full_content = "" + + if return_full_content: + data: List[Document] = self.load() + full_content: str = data[0].page_content + + if return_split_text: + pages = self.loader.load_and_split() + for page in pages: + page_content = page.page_content + metadatas = page.metadata + metadatas.update({"full_page": page_content}) + + text_chunks = text_splitter.split_text(page_content) + for chunk in text_chunks: + split_pages.append( + Document( + page_content=chunk, + metadata=metadatas + ) ) - ) return (split_pages, full_content) diff --git a/app/rerank.py b/app/rerank.py new file mode 100644 index 0000000..c8cc94e --- /dev/null +++ b/app/rerank.py @@ -0,0 +1,32 @@ +import cohere +import os +from typing import List, Dict + + +RERANK_MODEL = os.environ.get("RERANK_MODEL") +RERANK_API_KEY: str = os.environ.get("RERANK_API_KEY") +RERANK_THRESHOLD = float(os.environ.get("RERANK_THRESHOLD")) + + +def rerank_chunks( + query: str, + chunks_list: List[Dict], + threshold: float = RERANK_THRESHOLD, + max_docs: int = 5, +): + if chunks_list: + print("[+ Rerank Chunks +]") + co = cohere.Client(RERANK_API_KEY) + responses = co.rerank( + model=RERANK_MODEL, + query=query, + documents=chunks_list, + top_n=max_docs + ) + results = [] + + for r in responses.results: + if r.relevance_score > threshold: + results.append(r.document) + return results[:max_docs] + return chunks_list diff --git a/app/store/elasticsearch_vector_store.py b/app/store/elasticsearch_vector_store.py index cfbec29..e52af83 100644 --- a/app/store/elasticsearch_vector_store.py +++ b/app/store/elasticsearch_vector_store.py @@ -137,7 +137,6 @@ def search(self, search: str, filter=None, threshold=0.1) -> list[Document]: content_base_uuid = filter.get("content_base_uuid") q = {"bool": {"filter": [{"term": {"metadata.content_base_uuid.keyword": content_base_uuid}}]}} - docs = self.vectorstore.similarity_search_with_score(query=search, k=5, filter=q) return [doc[0] for doc in docs if doc[1] > threshold] @@ -149,18 +148,18 @@ def save_doc_content(self, full_content, content_base_uuid, filename, file_uuid) "content": full_content, "content_base_uuid": content_base_uuid, "filename": filename, - "file_uuid":file_uuid + "file_uuid": file_uuid } es_client = self.vectorstore.client - res = es_client.index(index="content_base_documents", body=elasticsearch_doc) + es_client.index(index="content_base_documents", body=elasticsearch_doc) return def search_doc_content(self, file_uuid: str, content_base_uuid: str) -> str: query = { "bool": { "filter": [ - { "term": { "file_uuid.keyword": file_uuid}}, - { "term": { "content_base_uuid.keyword": content_base_uuid}} + {"term": {"file_uuid.keyword": file_uuid}}, + {"term": {"content_base_uuid.keyword": content_base_uuid}} ] } } @@ -181,8 +180,8 @@ def check_if_doc_was_embedded_document(self, file_uuid: str, content_base_uuid: query = { "bool": { "filter": [ - { "term": { "metadata.file_uuid.keyword": file_uuid}}, - { "term": { "metadata.content_base_uuid.keyword": content_base_uuid}} + {"term": {"metadata.file_uuid.keyword": file_uuid}}, + {"term": {"metadata.content_base_uuid.keyword": content_base_uuid}} ] } } diff --git a/app/text_splitters/text_splitters.py b/app/text_splitters/text_splitters.py index de5dddd..686eab9 100644 --- a/app/text_splitters/text_splitters.py +++ b/app/text_splitters/text_splitters.py @@ -4,12 +4,19 @@ from typing import Callable, List, Dict from langchain.schema.document import Document from langchain.text_splitter import CharacterTextSplitter +from langchain.text_splitter import RecursiveCharacterTextSplitter + from app.util import count_words DEFAULT_CHUNK_SIZE = os.environ.get("DEFAULT_CHUNK_SIZE", 75) DEFAULT_CHUNK_OVERLAP = os.environ.get("DEFAULT_CHUNK_OVERLAP", 75) DEFAULT_SEPARATOR = os.environ.get("DEFAULT_SEPARATOR", "\n") +PARENT_CHUNK_SIZE = os.environ.get("PARENT_CHUNK_SIZE", 1125) +CHILD_CHUNK_SIZE = os.environ.get("CHILD_CHUNK_SIZE", 225) +CHUNK_OVERLAP = os.environ.get("CHUNK_OVERLAP", 45) +LENGTH_FUNCTION = count_words + class ITextSplitter(ABC): # pragma: no cover @@ -56,3 +63,16 @@ def get_split_text(raw_data: str): text_splitter = TextSplitter(character_text_splitter()) texts = text_splitter.split_text(raw_data) return texts + + +def recursive_character_text_splitter( + chunk_size=PARENT_CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + length_function=LENGTH_FUNCTION, +): + recursive_text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + length_function=length_function, + ) + return recursive_text_splitter diff --git a/docker-compose.yml b/docker-compose.yml index a691bfb..a2d98b1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3' services: app: build: