diff --git a/backend/app/services/document_processor.py b/backend/app/services/document_processor.py index 02defc5..5b0b509 100644 --- a/backend/app/services/document_processor.py +++ b/backend/app/services/document_processor.py @@ -60,6 +60,127 @@ def __init__(self): self.max_chunk_size = 1000 # maximum characters per chunk self.overlap = 50 # number of characters to overlap between chunks + async def save_upload_file(upload_file: UploadFile) -> str: + """ + Save an uploaded file asynchronously with improved error handling and cleanup. + Args: + upload_file: FastAPI UploadFile object + Returns: + str: Path to the saved file + """ + try: + # Create uploads directory if it doesn't exist + os.makedirs(UPLOAD_DIR, exist_ok=True) + + # Generate unique filename with original extension + ext = Path(upload_file.filename).suffix or '.pdf' + filename = f"{uuid.uuid4()}{ext}" + file_path = os.path.join(UPLOAD_DIR, filename) + + # Save file in chunks + async with aiofiles.open(file_path, 'wb') as out_file: + while chunk := await upload_file.read(CHUNK_SIZE): + await out_file.write(chunk) + + return file_path + + except Exception as e: + # Clean up partial file if save fails + if 'file_path' in locals() and os.path.exists(file_path): + os.remove(file_path) + raise RuntimeError(f"Failed to save upload file: {str(e)}") + + async def _split_pdf_into_chunks( + self, + upload_file: UploadFile, + source_id: int, + metadata: Dict + ) -> List[DocumentChunk]: + """ + Split PDF into semantic chunks, extract text, and return document chunks. + This gets us our chunk #1 (from the entire pdf, we chunk into chunks of size X) + Note that this is different from taking a chunk and splitting into sections + Args: + upload_file: FastAPI UploadFile containing the PDF + source_id: Unique identifier for this document source + metadata: Additional metadata about the document + Returns: + List of DocumentChunk objects containing the processed content + """ + # Save upload to temp file and get path + path = await save_upload_file(upload_file) + chunks = [] + + try: + # Open PDF handler + doc = fitz.open(path) + + # Process PDF in chunks of pages + chunk_size = 5 + + for chunk_start in range(0, len(doc), chunk_size): + # Get page range for this chunk + chunk_end = min(chunk_start + chunk_size, len(doc)) + + # Extract text from pages in this chunk + chunk_text = "" + for page_num in range(chunk_start, chunk_end): + page = doc[page_num] + chunk_text += page.get_text() + + # Clean and normalize the extracted text + chunk_text = self._clean_text(chunk_text) + + # Create chunk object with metadata + chunk = DocumentChunk( + content=chunk_text, + page_number=chunk_start, + chunk_number=len(chunks), + metadata={ + "source_id": source_id, + "page_range": f"{chunk_start}-{chunk_end-1}", + **metadata + } + ) + chunks.append(chunk) + + finally: + # Clean up temp file + os.remove(path) + + return chunks + + def _clean_text(self, text: str) -> str: + return re.sub(r'\s+', ' ', text).strip() + + # TODO: rename to be more descriptive + # TODO: fix the fact that we expect a file upload object, but main.py passes a path (temp_path + async def process_file( + self, + upload_file: UploadFile, + source_id: int, + metadata: Dict = None + ) -> List[DocumentChunk]: + """ + Process an uploaded file and return chunks. + Currently supports PDF, can be extended for other formats. + Args: + upload_file: FastAPI UploadFile object + source_id: Unique identifier for the document + metadata: Optional metadata about the document + Returns: + List[DocumentChunk]: Processed document chunks + """ + content_type = upload_file.content_type or '' + + if 'pdf' in content_type.lower(): + # now we need to take this, and then run marker on it + return await self._split_pdf_into_chunks(upload_file, source_id, metadata or {}) + else: + raise ValueError(f"Unsupported file type: {content_type}") + + + class DocumentProcessingContext: def __init__(self, db: Session, source: "models.KnowledgeBaseSource"): self.db = db @@ -74,6 +195,7 @@ def __init__(self, db: Session, source: "models.KnowledgeBaseSource"): CHUNK_SIZE = 64 * 1024 # 64KB chunks for better performance # ------------------- +# helpers def flush_buffer(context): if context.content_buffer: @@ -81,6 +203,63 @@ def flush_buffer(context): context.db.commit() context.content_buffer.clear() +# TODO: +# Fix: → Inside insert_content, do not call generate_embedding immediately. +# → Instead, insert into DB with embedding=None, and batch them later via +def generate_embedding(text: str) -> List[float]: + """ + Generate an OpenAI embedding for the given text. + """ + response = openai.Embedding.create( + input=text, + model="text-embedding-ada-002" + ) + return response["data"][0]["embedding"] + +async def batch_generate_embeddings_for_source(source_id: int, db: Session): + """ + After uploading a document, batch generate embeddings for all its contents. + """ + # 1. Fetch all contents without an embedding + contents = db.query(models.KnowledgeBaseContent).filter( + models.KnowledgeBaseContent.source_id == source_id, + models.KnowledgeBaseContent.embedding.is_(None) + ).all() + + if not contents: + print(f"No content needing embeddings for source {source_id}") + return + + # 2. Chunk contents into batches (e.g., 100 at a time) + batch_size = 100 + for i in range(0, len(contents), batch_size): + batch = contents[i:i+batch_size] + + texts = [c.content for c in batch] + + # 3. Call OpenAI embedding API + response = await generate_embeddings_batch(texts) + + # 4. Update each KnowledgeBaseContent with its embedding + for content, embedding in zip(batch, response): + content.embedding = embedding + + # 5. Bulk save updates + db.bulk_save_objects(batch) + db.commit() + +async def generate_embeddings_batch(texts: List[str]) -> List[List[float]]: + """ + Generate embeddings for a batch of texts. + """ + response = await openai.Embedding.acreate( + input=texts, + model="text-embedding-ada-002" + ) + return [item["embedding"] for item in response["data"]] + +# ------------------- + # for the sectioning of the chunking def html_to_text(html): @@ -136,7 +315,7 @@ def pdf_to_json(path, output_path): return data -def sub_chunk(json_data: dict[str, Any], db: Session, source_metadata: dict[str, Any]) -> dict[str, Any]: +async def sub_chunk(json_data: dict[str, Any], db: Session, source_metadata: dict[str, Any]) -> dict[str, Any]: """ This function takes our marker JSON, takes the sections, and formulates metadata and extracts content. It saves it into the database with insert_content @@ -178,6 +357,14 @@ def sub_chunk(json_data: dict[str, Any], db: Session, source_metadata: dict[str, context.db.rollback() raise e + # TODO: (later optimization): + # Move batch_generate_embeddings_for_source() to a background task (FastAPI BackgroundTasks or Celery) + # so that we can return immediately after upload and embed asynchronously. + # Right now we await it directly for simplicity and easier debugging. + + # After upload, trigger embeddings all at once + await batch_generate_embeddings_for_source(source.source_id, db) + return {"status": "success", "source_id": source.source_id} def insert_content(block, parent_id=None, context: DocumentProcessingContext = None): @@ -194,12 +381,19 @@ def insert_content(block, parent_id=None, context: DocumentProcessingContext = N if not context or not context.source: raise ValueError("Context with source is required") - # GET THE TEXT - # TODO: update how we get this - # it should be extracting from json + """ + Instead of calling OpenAI inside insert_content() (slow + expensive), + we upload, store all content without embeddings (fast). + + After the upload finishes, query all unembedded blocks from the DB. + Batch the content (e.g., groups of 100 paragraphs). + Send batches to OpenAI’s Embedding API. + Update the DB with the embeddings. + """ + content_text = html_to_text(block.get("html", "")) block_type = block.get("block_type", "Unknown") - # embedding = generate_embedding(content_text) # your embedding function + embedding = generate_embedding(content_text) # the reason why title is None for only text and not all others: blocks can be SectionHeader, ListGroup, ListItem, Page, so it makes sense to keep track of those # we don't want to embed the text itself as a title @@ -214,7 +408,7 @@ def insert_content(block, parent_id=None, context: DocumentProcessingContext = N title=None if block_type == "Text" else content_text[:100], # add a label if it's not pure text content=content_text, # content is as extracted content_type=block_type, # keep track of type - # embedding=embedding, # TODO: add embedding + embedding=embedding, created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) @@ -241,7 +435,7 @@ def traverse_blocks(blocks: list, parent_id=None, context: DocumentProcessingCon if block_type == "Page": traverse_blocks(block.get("children", []), parent_id=parent_id, context=context) elif block_type == "SectionHeader": - section_id = insert_content(block, db=db, parent_id=parent_id) + section_id = insert_content(block, parent_id=parent_id, context=context) # update the stack for this new section level section_stack.append((section_id, block.get("section_hierarchy", {}))) traverse_blocks(block.get("children", []), parent_id=section_id, context=context) @@ -253,123 +447,4 @@ def traverse_blocks(blocks: list, parent_id=None, context: DocumentProcessingCon # 2. Begin traversal - return {"status": "success", "source_id": context.source.source_id} - -# ------------------- - -# for the chunking - -async def save_upload_file(upload_file: UploadFile) -> str: - """ - Save an uploaded file asynchronously with improved error handling and cleanup. - Args: - upload_file: FastAPI UploadFile object - Returns: - str: Path to the saved file - """ - try: - # Create uploads directory if it doesn't exist - os.makedirs(UPLOAD_DIR, exist_ok=True) - - # Generate unique filename with original extension - ext = Path(upload_file.filename).suffix or '.pdf' - filename = f"{uuid.uuid4()}{ext}" - file_path = os.path.join(UPLOAD_DIR, filename) - - # Save file in chunks - async with aiofiles.open(file_path, 'wb') as out_file: - while chunk := await upload_file.read(CHUNK_SIZE): - await out_file.write(chunk) - - return file_path - - except Exception as e: - # Clean up partial file if save fails - if 'file_path' in locals() and os.path.exists(file_path): - os.remove(file_path) - raise RuntimeError(f"Failed to save upload file: {str(e)}") - -async def _split_pdf_into_chunks( - self, - upload_file: UploadFile, - source_id: int, - metadata: Dict -) -> List[DocumentChunk]: - """ - Split PDF into semantic chunks, extract text, and return document chunks. - This gets us our chunk #1 (from the entire pdf, we chunk into chunks of size X) - Note that this is different from taking a chunk and splitting into sections - Args: - upload_file: FastAPI UploadFile containing the PDF - source_id: Unique identifier for this document source - metadata: Additional metadata about the document - Returns: - List of DocumentChunk objects containing the processed content - """ - # Save upload to temp file and get path - path = await save_upload_file(upload_file) - chunks = [] - - try: - # Open PDF handler - doc = fitz.open(path) - - # Process PDF in chunks of pages - chunk_size = 5 - - for chunk_start in range(0, len(doc), chunk_size): - # Get page range for this chunk - chunk_end = min(chunk_start + chunk_size, len(doc)) - - # Extract text from pages in this chunk - chunk_text = "" - for page_num in range(chunk_start, chunk_end): - page = doc[page_num] - chunk_text += page.get_text() - - # Clean and normalize the extracted text - chunk_text = self._clean_text(chunk_text) - - # Create chunk object with metadata - chunk = DocumentChunk( - content=chunk_text, - page_number=chunk_start, - chunk_number=len(chunks), - metadata={ - "source_id": source_id, - "page_range": f"{chunk_start}-{chunk_end-1}", - **metadata - } - ) - chunks.append(chunk) - - finally: - # Clean up temp file - os.remove(path) - - return chunks - -# TODO: rename to be more descriptive -async def process_file( - self, - upload_file: UploadFile, - source_id: int, - metadata: Dict = None -) -> List[DocumentChunk]: - """ - Process an uploaded file and return chunks. - Currently supports PDF, can be extended for other formats. - Args: - upload_file: FastAPI UploadFile object - source_id: Unique identifier for the document - metadata: Optional metadata about the document - Returns: - List[DocumentChunk]: Processed document chunks - """ - content_type = upload_file.content_type or '' - - if 'pdf' in content_type.lower(): - # now we need to take this, and then run marker on it - return await self._split_pdf_into_chunks(upload_file, source_id, metadata or {}) - else: - raise ValueError(f"Unsupported file type: {content_type}") \ No newline at end of file + return {"status": "success", "source_id": context.source.source_id} \ No newline at end of file