Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions backend/app/api/endpoints/chat.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
"""
Chat API endpoints
"""
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Body, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import Dict, Any
from typing import Dict, Any, Optional
import uuid
from datetime import datetime
import logging
import os
import asyncio

from app.db.session import get_db
from app.schemas.chat import (
ChatQueryRequest,
Expand All @@ -17,6 +21,8 @@
from app.services.query_engine import QueryEngine

router = APIRouter()
logger = logging.getLogger(__name__)
qe = QueryEngine()

# In-memory conversation storage (replace with Redis/DB in production)
conversations: Dict[str, Dict[str, Any]] = {}
Expand All @@ -35,13 +41,19 @@ async def process_chat_query(
conversation_history = conversations[request.conversation_id]["messages"]

# Process query
query_engine = QueryEngine(db)
response = await query_engine.process_query(
query=request.query,
fund_id=request.fund_id,
conversation_history=conversation_history
)

try:
query_engine = QueryEngine()
# attach DB session to the engine instance (safe whether __init__ accepts db or not)
query_engine.db = db
response = await query_engine.process_query(
query=request.query,
fund_id=request.fund_id,
conversation_history=conversation_history
)
except Exception as e:
logger.exception("Chat query processing failed: %s", e)
raise HTTPException(status_code=500, detail="Internal server error")

# Update conversation history
if request.conversation_id:
if request.conversation_id not in conversations:
Expand Down
83 changes: 53 additions & 30 deletions backend/app/api/endpoints/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
)
from app.services.document_processor import DocumentProcessor
from app.core.config import settings
from app.db.session import SessionLocal
import asyncio

router = APIRouter()

Expand Down Expand Up @@ -66,13 +68,21 @@ async def upload_document(
db.commit()
db.refresh(document)

# Start background processing
background_tasks.add_task(
process_document_task,
document.id,
file_path,
fund_id or 1 # Default fund_id if not provided
)
# Prefer Celery if available, otherwise use BackgroundTasks
try:
# import locally to avoid mandatory celery dependency
from backend.app.tasks.parse_tasks import parse_document # type: ignore

# enqueue Celery task
parse_document.delay(str(file_path), document.id)
except Exception:
# fallback to in-process background task
background_tasks.add_task(
process_document_task,
document.id,
file_path,
fund_id or 1 # Default fund_id if not provided
)

return DocumentUploadResponse(
document_id=document.id,
Expand All @@ -83,34 +93,47 @@ async def upload_document(


async def process_document_task(document_id: int, file_path: str, fund_id: int):
"""Background task to process document"""
from app.db.session import SessionLocal

"""Background task to process document (used when Celery not available)"""
db = SessionLocal()

doc = None
try:
# Update status to processing
document = db.query(Document).filter(Document.id == document_id).first()
document.parsing_status = "processing"
db.commit()

# Process document
if document_id is not None:
doc = db.query(Document).filter(Document.id == document_id).first()
if doc:
try:
doc.parsing_status = "processing"
db.commit()
except Exception:
db.rollback()

processor = DocumentProcessor()
result = await processor.process_document(file_path, document_id, fund_id)

# Update status
document.parsing_status = result["status"]
if result["status"] == "failed":
document.error_message = result.get("error")
db.commit()

# run the async processor
res = await processor.process_document(file_path, document_id or 0, fund_id)

if document_id is not None and doc:
try:
doc.parsing_status = "done"
parsed_path = res.get("parsed_json") or res.get("parsed_json_path")
if parsed_path and hasattr(doc, "parsed_json"):
setattr(doc, "parsed_json", parsed_path)
db.commit()
except Exception:
db.rollback()
except Exception as e:
document = db.query(Document).filter(Document.id == document_id).first()
document.parsing_status = "failed"
document.error_message = str(e)
db.commit()
logger.exception("Background processing failed: %s", e)
if document_id is not None and doc:
try:
doc.parsing_status = "error"
if hasattr(doc, "error_message"):
setattr(doc, "error_message", str(e))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
try:
db.close()
except Exception:
pass


@router.get("/{document_id}/status", response_model=DocumentStatus)
Expand Down
24 changes: 24 additions & 0 deletions backend/app/services/chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import List, Dict
import uuid


def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[Dict]:
"""
Simple whitespace chunker with overlap.
Returns list of {id: str, text: str}
"""
if not text:
return []
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk_words = words[start:end]
chunk_text = " ".join(chunk_words).strip()
if chunk_text:
chunks.append({"id": str(uuid.uuid4()), "text": chunk_text})
if end >= len(words):
break
start = max(0, end - overlap)
return chunks
Loading