Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 202 additions & 127 deletions backend/app/services/document_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,127 @@ def __init__(self):
self.max_chunk_size = 1000 # maximum characters per chunk
self.overlap = 50 # number of characters to overlap between chunks

async def save_upload_file(upload_file: UploadFile) -> str:
"""
Save an uploaded file asynchronously with improved error handling and cleanup.
Args:
upload_file: FastAPI UploadFile object
Returns:
str: Path to the saved file
"""
try:
# Create uploads directory if it doesn't exist
os.makedirs(UPLOAD_DIR, exist_ok=True)

# Generate unique filename with original extension
ext = Path(upload_file.filename).suffix or '.pdf'
filename = f"{uuid.uuid4()}{ext}"
file_path = os.path.join(UPLOAD_DIR, filename)

# Save file in chunks
async with aiofiles.open(file_path, 'wb') as out_file:
while chunk := await upload_file.read(CHUNK_SIZE):
await out_file.write(chunk)

return file_path

except Exception as e:
# Clean up partial file if save fails
if 'file_path' in locals() and os.path.exists(file_path):
os.remove(file_path)
raise RuntimeError(f"Failed to save upload file: {str(e)}")

async def _split_pdf_into_chunks(
self,
upload_file: UploadFile,
source_id: int,
metadata: Dict
) -> List[DocumentChunk]:
"""
Split PDF into semantic chunks, extract text, and return document chunks.
This gets us our chunk #1 (from the entire pdf, we chunk into chunks of size X)
Note that this is different from taking a chunk and splitting into sections
Args:
upload_file: FastAPI UploadFile containing the PDF
source_id: Unique identifier for this document source
metadata: Additional metadata about the document
Returns:
List of DocumentChunk objects containing the processed content
"""
# Save upload to temp file and get path
path = await save_upload_file(upload_file)
chunks = []

try:
# Open PDF handler
doc = fitz.open(path)

# Process PDF in chunks of pages
chunk_size = 5

for chunk_start in range(0, len(doc), chunk_size):
# Get page range for this chunk
chunk_end = min(chunk_start + chunk_size, len(doc))

# Extract text from pages in this chunk
chunk_text = ""
for page_num in range(chunk_start, chunk_end):
page = doc[page_num]
chunk_text += page.get_text()

# Clean and normalize the extracted text
chunk_text = self._clean_text(chunk_text)

# Create chunk object with metadata
chunk = DocumentChunk(
content=chunk_text,
page_number=chunk_start,
chunk_number=len(chunks),
metadata={
"source_id": source_id,
"page_range": f"{chunk_start}-{chunk_end-1}",
**metadata
}
)
chunks.append(chunk)

finally:
# Clean up temp file
os.remove(path)

return chunks

def _clean_text(self, text: str) -> str:
return re.sub(r'\s+', ' ', text).strip()

# TODO: rename to be more descriptive
# TODO: fix the fact that we expect a file upload object, but main.py passes a path (temp_path
async def process_file(
self,
upload_file: UploadFile,
source_id: int,
metadata: Dict = None
) -> List[DocumentChunk]:
"""
Process an uploaded file and return chunks.
Currently supports PDF, can be extended for other formats.
Args:
upload_file: FastAPI UploadFile object
source_id: Unique identifier for the document
metadata: Optional metadata about the document
Returns:
List[DocumentChunk]: Processed document chunks
"""
content_type = upload_file.content_type or ''

if 'pdf' in content_type.lower():
# now we need to take this, and then run marker on it
return await self._split_pdf_into_chunks(upload_file, source_id, metadata or {})
else:
raise ValueError(f"Unsupported file type: {content_type}")



class DocumentProcessingContext:
def __init__(self, db: Session, source: "models.KnowledgeBaseSource"):
self.db = db
Expand All @@ -74,13 +195,71 @@ def __init__(self, db: Session, source: "models.KnowledgeBaseSource"):
CHUNK_SIZE = 64 * 1024 # 64KB chunks for better performance

# -------------------
# helpers

def flush_buffer(context):
if context.content_buffer:
context.db.bulk_save_objects(context.content_buffer)
context.db.commit()
context.content_buffer.clear()

# TODO:
# Fix: → Inside insert_content, do not call generate_embedding immediately.
# → Instead, insert into DB with embedding=None, and batch them later via
def generate_embedding(text: str) -> List[float]:
"""
Generate an OpenAI embedding for the given text.
"""
response = openai.Embedding.create(
input=text,
model="text-embedding-ada-002"
)
return response["data"][0]["embedding"]

async def batch_generate_embeddings_for_source(source_id: int, db: Session):
"""
After uploading a document, batch generate embeddings for all its contents.
"""
# 1. Fetch all contents without an embedding
contents = db.query(models.KnowledgeBaseContent).filter(
models.KnowledgeBaseContent.source_id == source_id,
models.KnowledgeBaseContent.embedding.is_(None)
).all()

if not contents:
print(f"No content needing embeddings for source {source_id}")
return

# 2. Chunk contents into batches (e.g., 100 at a time)
batch_size = 100
for i in range(0, len(contents), batch_size):
batch = contents[i:i+batch_size]

texts = [c.content for c in batch]

# 3. Call OpenAI embedding API
response = await generate_embeddings_batch(texts)

# 4. Update each KnowledgeBaseContent with its embedding
for content, embedding in zip(batch, response):
content.embedding = embedding

# 5. Bulk save updates
db.bulk_save_objects(batch)
db.commit()

async def generate_embeddings_batch(texts: List[str]) -> List[List[float]]:
"""
Generate embeddings for a batch of texts.
"""
response = await openai.Embedding.acreate(
input=texts,
model="text-embedding-ada-002"
)
return [item["embedding"] for item in response["data"]]

# -------------------

# for the sectioning of the chunking

def html_to_text(html):
Expand Down Expand Up @@ -136,7 +315,7 @@ def pdf_to_json(path, output_path):

return data

def sub_chunk(json_data: dict[str, Any], db: Session, source_metadata: dict[str, Any]) -> dict[str, Any]:
async def sub_chunk(json_data: dict[str, Any], db: Session, source_metadata: dict[str, Any]) -> dict[str, Any]:
"""
This function takes our marker JSON, takes the sections,
and formulates metadata and extracts content. It saves it into the database with insert_content
Expand Down Expand Up @@ -178,6 +357,14 @@ def sub_chunk(json_data: dict[str, Any], db: Session, source_metadata: dict[str,
context.db.rollback()
raise e

# TODO: (later optimization):
# Move batch_generate_embeddings_for_source() to a background task (FastAPI BackgroundTasks or Celery)
# so that we can return immediately after upload and embed asynchronously.
# Right now we await it directly for simplicity and easier debugging.

# After upload, trigger embeddings all at once
await batch_generate_embeddings_for_source(source.source_id, db)

return {"status": "success", "source_id": source.source_id}

def insert_content(block, parent_id=None, context: DocumentProcessingContext = None):
Expand All @@ -194,12 +381,19 @@ def insert_content(block, parent_id=None, context: DocumentProcessingContext = N
if not context or not context.source:
raise ValueError("Context with source is required")

# GET THE TEXT
# TODO: update how we get this
# it should be extracting from json
"""
Instead of calling OpenAI inside insert_content() (slow + expensive),
we upload, store all content without embeddings (fast).

After the upload finishes, query all unembedded blocks from the DB.
Batch the content (e.g., groups of 100 paragraphs).
Send batches to OpenAI’s Embedding API.
Update the DB with the embeddings.
"""

content_text = html_to_text(block.get("html", ""))
block_type = block.get("block_type", "Unknown")
# embedding = generate_embedding(content_text) # your embedding function
embedding = generate_embedding(content_text)

# the reason why title is None for only text and not all others: blocks can be SectionHeader, ListGroup, ListItem, Page, so it makes sense to keep track of those
# we don't want to embed the text itself as a title
Expand All @@ -214,7 +408,7 @@ def insert_content(block, parent_id=None, context: DocumentProcessingContext = N
title=None if block_type == "Text" else content_text[:100], # add a label if it's not pure text
content=content_text, # content is as extracted
content_type=block_type, # keep track of type
# embedding=embedding, # TODO: add embedding
embedding=embedding,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
Expand All @@ -241,7 +435,7 @@ def traverse_blocks(blocks: list, parent_id=None, context: DocumentProcessingCon
if block_type == "Page":
traverse_blocks(block.get("children", []), parent_id=parent_id, context=context)
elif block_type == "SectionHeader":
section_id = insert_content(block, db=db, parent_id=parent_id)
section_id = insert_content(block, parent_id=parent_id, context=context)
# update the stack for this new section level
section_stack.append((section_id, block.get("section_hierarchy", {})))
traverse_blocks(block.get("children", []), parent_id=section_id, context=context)
Expand All @@ -253,123 +447,4 @@ def traverse_blocks(blocks: list, parent_id=None, context: DocumentProcessingCon

# 2. Begin traversal

return {"status": "success", "source_id": context.source.source_id}

# -------------------

# for the chunking

async def save_upload_file(upload_file: UploadFile) -> str:
"""
Save an uploaded file asynchronously with improved error handling and cleanup.
Args:
upload_file: FastAPI UploadFile object
Returns:
str: Path to the saved file
"""
try:
# Create uploads directory if it doesn't exist
os.makedirs(UPLOAD_DIR, exist_ok=True)

# Generate unique filename with original extension
ext = Path(upload_file.filename).suffix or '.pdf'
filename = f"{uuid.uuid4()}{ext}"
file_path = os.path.join(UPLOAD_DIR, filename)

# Save file in chunks
async with aiofiles.open(file_path, 'wb') as out_file:
while chunk := await upload_file.read(CHUNK_SIZE):
await out_file.write(chunk)

return file_path

except Exception as e:
# Clean up partial file if save fails
if 'file_path' in locals() and os.path.exists(file_path):
os.remove(file_path)
raise RuntimeError(f"Failed to save upload file: {str(e)}")

async def _split_pdf_into_chunks(
self,
upload_file: UploadFile,
source_id: int,
metadata: Dict
) -> List[DocumentChunk]:
"""
Split PDF into semantic chunks, extract text, and return document chunks.
This gets us our chunk #1 (from the entire pdf, we chunk into chunks of size X)
Note that this is different from taking a chunk and splitting into sections
Args:
upload_file: FastAPI UploadFile containing the PDF
source_id: Unique identifier for this document source
metadata: Additional metadata about the document
Returns:
List of DocumentChunk objects containing the processed content
"""
# Save upload to temp file and get path
path = await save_upload_file(upload_file)
chunks = []

try:
# Open PDF handler
doc = fitz.open(path)

# Process PDF in chunks of pages
chunk_size = 5

for chunk_start in range(0, len(doc), chunk_size):
# Get page range for this chunk
chunk_end = min(chunk_start + chunk_size, len(doc))

# Extract text from pages in this chunk
chunk_text = ""
for page_num in range(chunk_start, chunk_end):
page = doc[page_num]
chunk_text += page.get_text()

# Clean and normalize the extracted text
chunk_text = self._clean_text(chunk_text)

# Create chunk object with metadata
chunk = DocumentChunk(
content=chunk_text,
page_number=chunk_start,
chunk_number=len(chunks),
metadata={
"source_id": source_id,
"page_range": f"{chunk_start}-{chunk_end-1}",
**metadata
}
)
chunks.append(chunk)

finally:
# Clean up temp file
os.remove(path)

return chunks

# TODO: rename to be more descriptive
async def process_file(
self,
upload_file: UploadFile,
source_id: int,
metadata: Dict = None
) -> List[DocumentChunk]:
"""
Process an uploaded file and return chunks.
Currently supports PDF, can be extended for other formats.
Args:
upload_file: FastAPI UploadFile object
source_id: Unique identifier for the document
metadata: Optional metadata about the document
Returns:
List[DocumentChunk]: Processed document chunks
"""
content_type = upload_file.content_type or ''

if 'pdf' in content_type.lower():
# now we need to take this, and then run marker on it
return await self._split_pdf_into_chunks(upload_file, source_id, metadata or {})
else:
raise ValueError(f"Unsupported file type: {content_type}")
return {"status": "success", "source_id": context.source.source_id}
Loading