From d990add80ea856e16a8d7a630c57cdd486f518c0 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Mon, 15 Dec 2025 20:14:18 +0530 Subject: [PATCH 1/3] first stab at refactoring --- backend/app/api/routes/evaluation.py | 500 ++---------------- backend/app/crud/evaluations/__init__.py | 4 + backend/app/services/evaluation/__init__.py | 32 ++ backend/app/services/evaluation/dataset.py | 163 ++++++ backend/app/services/evaluation/evaluation.py | 329 ++++++++++++ backend/app/services/evaluation/validators.py | 174 ++++++ .../app/tests/api/routes/test_evaluation.py | 38 +- 7 files changed, 754 insertions(+), 486 deletions(-) create mode 100644 backend/app/services/evaluation/__init__.py create mode 100644 backend/app/services/evaluation/dataset.py create mode 100644 backend/app/services/evaluation/evaluation.py create mode 100644 backend/app/services/evaluation/validators.py diff --git a/backend/app/api/routes/evaluation.py b/backend/app/api/routes/evaluation.py index 058950d6..fa35c84f 100644 --- a/backend/app/api/routes/evaluation.py +++ b/backend/app/api/routes/evaluation.py @@ -1,50 +1,33 @@ -import csv -import io +"""Evaluation API routes.""" + import logging -import re -from pathlib import Path from fastapi import APIRouter, Body, File, Form, HTTPException, Query, UploadFile from app.api.deps import AuthContextDep, SessionDep -from app.core.cloud import get_cloud_storage -from app.crud.assistants import get_assistant_by_id from app.crud.evaluations import ( - create_evaluation_dataset, - create_evaluation_run, get_dataset_by_id, - get_evaluation_run_by_id, list_datasets, - start_evaluation_batch, - upload_csv_to_object_store, - upload_dataset_to_langfuse, ) from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud -from app.crud.evaluations.core import save_score from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud -from app.crud.evaluations.langfuse import fetch_trace_scores_from_langfuse from app.models.evaluation import ( DatasetUploadResponse, EvaluationRunPublic, ) +from app.services.evaluation import ( + get_evaluation_with_scores, + start_evaluation, + upload_dataset, + validate_csv_file, +) from app.utils import ( APIResponse, - get_langfuse_client, - get_openai_client, load_description, ) logger = logging.getLogger(__name__) -# File upload security constants -MAX_FILE_SIZE = 1024 * 1024 # 1 MB -ALLOWED_EXTENSIONS = {".csv"} -ALLOWED_MIME_TYPES = { - "text/csv", - "application/csv", - "text/plain", # Some systems report CSV as text/plain -} - router = APIRouter(tags=["evaluation"]) @@ -61,60 +44,12 @@ def _dataset_to_response(dataset) -> DatasetUploadResponse: ) -def sanitize_dataset_name(name: str) -> str: - """ - Sanitize dataset name for Langfuse compatibility. - - Langfuse has issues with spaces and special characters in dataset names. - This function ensures the name can be both created and fetched. - - Rules: - - Replace spaces with underscores - - Replace hyphens with underscores - - Keep only alphanumeric characters and underscores - - Convert to lowercase for consistency - - Remove leading/trailing underscores - - Collapse multiple consecutive underscores into one - - Args: - name: Original dataset name - - Returns: - Sanitized dataset name safe for Langfuse - - Examples: - "testing 0001" -> "testing_0001" - "My Dataset!" -> "my_dataset" - "Test--Data__Set" -> "test_data_set" - """ - # Convert to lowercase - sanitized = name.lower() - - # Replace spaces and hyphens with underscores - sanitized = sanitized.replace(" ", "_").replace("-", "_") - - # Keep only alphanumeric characters and underscores - sanitized = re.sub(r"[^a-z0-9_]", "", sanitized) - - # Collapse multiple underscores into one - sanitized = re.sub(r"_+", "_", sanitized) - - # Remove leading/trailing underscores - sanitized = sanitized.strip("_") - - # Ensure name is not empty - if not sanitized: - raise ValueError("Dataset name cannot be empty after sanitization") - - return sanitized - - @router.post( "/evaluations/datasets", description=load_description("evaluation/upload_dataset.md"), response_model=APIResponse[DatasetUploadResponse], ) -async def upload_dataset( +async def upload_dataset_endpoint( _session: SessionDep, auth_context: AuthContextDep, file: UploadFile = File( @@ -129,198 +64,22 @@ async def upload_dataset( description="Number of times to duplicate each item (min: 1, max: 5)", ), ) -> APIResponse[DatasetUploadResponse]: - # Sanitize dataset name for Langfuse compatibility - original_name = dataset_name - try: - dataset_name = sanitize_dataset_name(dataset_name) - except ValueError as e: - raise HTTPException(status_code=422, detail=f"Invalid dataset name: {str(e)}") - - if original_name != dataset_name: - logger.info( - f"[upload_dataset] Dataset name sanitized | '{original_name}' -> '{dataset_name}'" - ) - - logger.info( - f"[upload_dataset] Uploading dataset | dataset={dataset_name} | " - f"duplication_factor={duplication_factor} | org_id={auth_context.organization.id} | " - f"project_id={auth_context.project.id}" - ) + """Upload an evaluation dataset.""" + # Validate and read CSV file + csv_content = await validate_csv_file(file) - # Security validation: Check file extension - file_ext = Path(file.filename).suffix.lower() - if file_ext not in ALLOWED_EXTENSIONS: - raise HTTPException( - status_code=422, - detail=f"Invalid file type. Only CSV files are allowed. Got: {file_ext}", - ) - - # Security validation: Check MIME type - content_type = file.content_type - if content_type not in ALLOWED_MIME_TYPES: - raise HTTPException( - status_code=422, - detail=f"Invalid content type. Expected CSV, got: {content_type}", - ) - - # Security validation: Check file size - file.file.seek(0, 2) # Seek to end - file_size = file.file.tell() - file.file.seek(0) # Reset to beginning - - if file_size > MAX_FILE_SIZE: - raise HTTPException( - status_code=413, - detail=f"File too large. Maximum size: {MAX_FILE_SIZE / (1024 * 1024):.0f}MB", - ) - - if file_size == 0: - raise HTTPException(status_code=422, detail="Empty file uploaded") - - # Read CSV content - csv_content = await file.read() - - # Step 1: Parse and validate CSV - try: - csv_text = csv_content.decode("utf-8") - csv_reader = csv.DictReader(io.StringIO(csv_text)) - - if not csv_reader.fieldnames: - raise HTTPException(status_code=422, detail="CSV file has no headers") - - # Normalize headers for case-insensitive matching - clean_headers = { - field.strip().lower(): field for field in csv_reader.fieldnames - } - - # Validate required headers (case-insensitive) - if "question" not in clean_headers or "answer" not in clean_headers: - raise HTTPException( - status_code=422, - detail=f"CSV must contain 'question' and 'answer' columns " - f"Found columns: {csv_reader.fieldnames}", - ) - - # Get the actual column names from the CSV - question_col = clean_headers["question"] - answer_col = clean_headers["answer"] - - # Count original items - original_items = [] - for row in csv_reader: - question = row.get(question_col, "").strip() - answer = row.get(answer_col, "").strip() - if question and answer: - original_items.append({"question": question, "answer": answer}) - - if not original_items: - raise HTTPException( - status_code=422, detail="No valid items found in CSV file" - ) - - original_items_count = len(original_items) - total_items_count = original_items_count * duplication_factor - - logger.info( - f"[upload_dataset] Parsed items from CSV | original={original_items_count} | " - f"total_with_duplication={total_items_count}" - ) - - except Exception as e: - logger.error(f"[upload_dataset] Failed to parse CSV | {e}", exc_info=True) - raise HTTPException(status_code=422, detail=f"Invalid CSV file: {e}") - - # Step 2: Upload to object store (if credentials configured) - object_store_url = None - try: - storage = get_cloud_storage( - session=_session, project_id=auth_context.project.id - ) - object_store_url = upload_csv_to_object_store( - storage=storage, csv_content=csv_content, dataset_name=dataset_name - ) - if object_store_url: - logger.info( - f"[upload_dataset] Successfully uploaded CSV to object store | {object_store_url}" - ) - else: - logger.info( - "[upload_dataset] Object store upload returned None | continuing without object store storage" - ) - except Exception as e: - logger.warning( - f"[upload_dataset] Failed to upload CSV to object store (continuing without object store) | {e}", - exc_info=True, - ) - object_store_url = None - - # Step 3: Upload to Langfuse - langfuse_dataset_id = None - try: - # Get Langfuse client - langfuse = get_langfuse_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - # Upload to Langfuse - langfuse_dataset_id, _ = upload_dataset_to_langfuse( - langfuse=langfuse, - items=original_items, - dataset_name=dataset_name, - duplication_factor=duplication_factor, - ) - - logger.info( - f"[upload_dataset] Successfully uploaded dataset to Langfuse | " - f"dataset={dataset_name} | id={langfuse_dataset_id}" - ) - - except Exception as e: - logger.error( - f"[upload_dataset] Failed to upload dataset to Langfuse | {e}", - exc_info=True, - ) - raise HTTPException( - status_code=500, detail=f"Failed to upload dataset to Langfuse: {e}" - ) - - # Step 4: Store metadata in database - metadata = { - "original_items_count": original_items_count, - "total_items_count": total_items_count, - "duplication_factor": duplication_factor, - } - - dataset = create_evaluation_dataset( + # Upload dataset using service + dataset = upload_dataset( session=_session, - name=dataset_name, + csv_content=csv_content, + dataset_name=dataset_name, description=description, - dataset_metadata=metadata, - object_store_url=object_store_url, - langfuse_dataset_id=langfuse_dataset_id, + duplication_factor=duplication_factor, organization_id=auth_context.organization.id, project_id=auth_context.project.id, ) - logger.info( - f"[upload_dataset] Successfully created dataset record in database | " - f"id={dataset.id} | name={dataset_name}" - ) - - # Return response - return APIResponse.success_response( - data=DatasetUploadResponse( - dataset_id=dataset.id, - dataset_name=dataset_name, - total_items=total_items_count, - original_items=original_items_count, - duplication_factor=duplication_factor, - langfuse_dataset_id=langfuse_dataset_id, - object_store_url=object_store_url, - ) - ) + return APIResponse.success_response(data=_dataset_to_response(dataset)) @router.get( @@ -334,6 +93,7 @@ def list_datasets_endpoint( limit: int = 50, offset: int = 0, ) -> APIResponse[list[DatasetUploadResponse]]: + """List evaluation datasets.""" # Enforce maximum limit if limit > 100: limit = 100 @@ -361,6 +121,7 @@ def get_dataset( _session: SessionDep, auth_context: AuthContextDep, ) -> APIResponse[DatasetUploadResponse]: + """Get a specific evaluation dataset.""" logger.info( f"[get_dataset] Fetching dataset | id={dataset_id} | " f"org_id={auth_context.organization.id} | " @@ -392,6 +153,7 @@ def delete_dataset( _session: SessionDep, auth_context: AuthContextDep, ) -> APIResponse[dict]: + """Delete an evaluation dataset.""" logger.info( f"[delete_dataset] Deleting dataset | id={dataset_id} | " f"org_id={auth_context.organization.id} | " @@ -406,7 +168,6 @@ def delete_dataset( ) if not success: - # Check if it's a not found error or other error type if "not found" in message.lower(): raise HTTPException(status_code=404, detail=message) else: @@ -436,143 +197,18 @@ def evaluate( None, description="Optional assistant ID to fetch configuration from" ), ) -> APIResponse[EvaluationRunPublic]: - logger.info( - f"[evaluate] Starting evaluation | experiment_name={experiment_name} | " - f"dataset_id={dataset_id} | " - f"org_id={auth_context.organization.id} | " - f"assistant_id={assistant_id} | " - f"config_keys={list(config.keys())}" - ) - - # Step 1: Fetch dataset from database - dataset = get_dataset_by_id( + """Start an evaluation run.""" + eval_run = start_evaluation( session=_session, dataset_id=dataset_id, - organization_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - if not dataset: - raise HTTPException( - status_code=404, - detail=f"Dataset {dataset_id} not found or not accessible to this " - f"organization/project", - ) - - logger.info( - f"[evaluate] Found dataset | id={dataset.id} | name={dataset.name} | " - f"object_store_url={'present' if dataset.object_store_url else 'None'} | " - f"langfuse_id={dataset.langfuse_dataset_id}" - ) - - dataset_name = dataset.name - - # Get API clients - openai_client = get_openai_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - langfuse = get_langfuse_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - # Validate dataset has Langfuse ID (should have been set during dataset creation) - if not dataset.langfuse_dataset_id: - raise HTTPException( - status_code=400, - detail=f"Dataset {dataset_id} does not have a Langfuse dataset ID. " - "Please ensure Langfuse credentials were configured when the dataset was created.", - ) - - # Handle assistant_id if provided - if assistant_id: - # Fetch assistant details from database - assistant = get_assistant_by_id( - session=_session, - assistant_id=assistant_id, - project_id=auth_context.project.id, - ) - - if not assistant: - raise HTTPException( - status_code=404, detail=f"Assistant {assistant_id} not found" - ) - - logger.info( - f"[evaluate] Found assistant in DB | id={assistant.id} | " - f"model={assistant.model} | instructions=" - f"{assistant.instructions[:50] if assistant.instructions else 'None'}..." - ) - - # Build config from assistant (use provided config values to override - # if present) - config = { - "model": config.get("model", assistant.model), - "instructions": config.get("instructions", assistant.instructions), - "temperature": config.get("temperature", assistant.temperature), - } - - # Add tools if vector stores are available - vector_store_ids = config.get( - "vector_store_ids", assistant.vector_store_ids or [] - ) - if vector_store_ids and len(vector_store_ids) > 0: - config["tools"] = [ - { - "type": "file_search", - "vector_store_ids": vector_store_ids, - } - ] - - logger.info("[evaluate] Using config from assistant") - else: - logger.info("[evaluate] Using provided config directly") - # Validate that config has minimum required fields - if not config.get("model"): - raise HTTPException( - status_code=400, - detail="Config must include 'model' when assistant_id is not provided", - ) - - # Create EvaluationRun record - eval_run = create_evaluation_run( - session=_session, - run_name=experiment_name, - dataset_name=dataset_name, - dataset_id=dataset_id, + experiment_name=experiment_name, config=config, + assistant_id=assistant_id, organization_id=auth_context.organization.id, project_id=auth_context.project.id, ) - # Start the batch evaluation - try: - eval_run = start_evaluation_batch( - langfuse=langfuse, - openai_client=openai_client, - session=_session, - eval_run=eval_run, - config=config, - ) - - logger.info( - f"[evaluate] Evaluation started successfully | " - f"batch_job_id={eval_run.batch_job_id} | total_items={eval_run.total_items}" - ) - - return APIResponse.success_response(data=eval_run) - - except Exception as e: - logger.error( - f"[evaluate] Failed to start evaluation | run_id={eval_run.id} | {e}", - exc_info=True, - ) - # Error is already handled in start_evaluation_batch - _session.refresh(eval_run) - return APIResponse.success_response(data=eval_run) + return APIResponse.success_response(data=eval_run) @router.get( @@ -586,6 +222,7 @@ def list_evaluation_runs( limit: int = 50, offset: int = 0, ) -> APIResponse[list[EvaluationRunPublic]]: + """List evaluation runs.""" logger.info( f"[list_evaluation_runs] Listing evaluation runs | " f"org_id={auth_context.organization.id} | " @@ -629,26 +266,20 @@ def get_evaluation_run_status( ), ), ) -> APIResponse[EvaluationRunPublic]: - logger.info( - f"[get_evaluation_run_status] Fetching status for evaluation run | " - f"evaluation_id={evaluation_id} | " - f"org_id={auth_context.organization.id} | " - f"project_id={auth_context.project.id} | " - f"get_trace_info={get_trace_info} | " - f"resync_score={resync_score}" - ) - + """Get evaluation run status with optional trace info.""" if resync_score and not get_trace_info: raise HTTPException( status_code=400, detail="resync_score=true requires get_trace_info=true", ) - eval_run = get_evaluation_run_by_id( + eval_run, error = get_evaluation_with_scores( session=_session, evaluation_id=evaluation_id, organization_id=auth_context.organization.id, project_id=auth_context.project.id, + get_trace_info=get_trace_info, + resync_score=resync_score, ) if not eval_run: @@ -660,72 +291,7 @@ def get_evaluation_run_status( ), ) - if get_trace_info: - # Only fetch trace info for completed evaluations - if eval_run.status != "completed": - return APIResponse.failure_response( - error=f"Trace info is only available for completed evaluations. " - f"Current status: {eval_run.status}", - data=eval_run, - ) - - # Check if we already have cached scores (before any slow operations) - has_cached_score = eval_run.score is not None and "traces" in eval_run.score - if not resync_score and has_cached_score: - return APIResponse.success_response(data=eval_run) - - # Get Langfuse client (needs session for credentials lookup) - langfuse = get_langfuse_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - # Capture data needed for Langfuse fetch and DB update - dataset_name = eval_run.dataset_name - run_name = eval_run.run_name - eval_run_id = eval_run.id - org_id = auth_context.organization.id - project_id = auth_context.project.id - - # Session is no longer needed - slow Langfuse API calls happen here - # without holding the DB connection - try: - score = fetch_trace_scores_from_langfuse( - langfuse=langfuse, - dataset_name=dataset_name, - run_name=run_name, - ) - except ValueError as e: - # Run not found in Langfuse - return eval_run with error - logger.warning( - f"[get_evaluation_run_status] Run not found in Langfuse | " - f"evaluation_id={evaluation_id} | error={e}" - ) - return APIResponse.failure_response(error=str(e), data=eval_run) - except Exception as e: - logger.error( - f"[get_evaluation_run_status] Failed to fetch trace info | " - f"evaluation_id={evaluation_id} | error={e}", - exc_info=True, - ) - return APIResponse.failure_response( - error=f"Failed to fetch trace info from Langfuse: {str(e)}", - data=eval_run, - ) - - # Open new session just for the score commit - eval_run = save_score( - eval_run_id=eval_run_id, - organization_id=org_id, - project_id=project_id, - score=score, - ) - - if not eval_run: - raise HTTPException( - status_code=404, - detail=f"Evaluation run {evaluation_id} not found after score update", - ) + if error: + return APIResponse.failure_response(error=error, data=eval_run) return APIResponse.success_response(data=eval_run) diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index 5ca0aacd..4e797e9f 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -5,6 +5,7 @@ create_evaluation_run, get_evaluation_run_by_id, list_evaluation_runs, + save_score, ) from app.crud.evaluations.cron import ( process_all_pending_evaluations, @@ -24,6 +25,7 @@ ) from app.crud.evaluations.langfuse import ( create_langfuse_dataset_run, + fetch_trace_scores_from_langfuse, update_traces_with_cosine_scores, upload_dataset_to_langfuse, ) @@ -39,6 +41,7 @@ "create_evaluation_run", "get_evaluation_run_by_id", "list_evaluation_runs", + "save_score", # Cron "process_all_pending_evaluations", "process_all_pending_evaluations_sync", @@ -61,6 +64,7 @@ "start_embedding_batch", # Langfuse "create_langfuse_dataset_run", + "fetch_trace_scores_from_langfuse", "update_traces_with_cosine_scores", "upload_dataset_to_langfuse", ] diff --git a/backend/app/services/evaluation/__init__.py b/backend/app/services/evaluation/__init__.py new file mode 100644 index 00000000..6e3aac41 --- /dev/null +++ b/backend/app/services/evaluation/__init__.py @@ -0,0 +1,32 @@ +"""Evaluation services.""" + +from app.services.evaluation.dataset import upload_dataset +from app.services.evaluation.evaluation import ( + build_evaluation_config, + get_evaluation_with_scores, + start_evaluation, +) +from app.services.evaluation.validators import ( + ALLOWED_EXTENSIONS, + ALLOWED_MIME_TYPES, + MAX_FILE_SIZE, + parse_csv_items, + sanitize_dataset_name, + validate_csv_file, +) + +__all__ = [ + # Dataset + "upload_dataset", + # Evaluation + "build_evaluation_config", + "get_evaluation_with_scores", + "start_evaluation", + # Validators + "ALLOWED_EXTENSIONS", + "ALLOWED_MIME_TYPES", + "MAX_FILE_SIZE", + "parse_csv_items", + "sanitize_dataset_name", + "validate_csv_file", +] diff --git a/backend/app/services/evaluation/dataset.py b/backend/app/services/evaluation/dataset.py new file mode 100644 index 00000000..5ac9470e --- /dev/null +++ b/backend/app/services/evaluation/dataset.py @@ -0,0 +1,163 @@ +"""Dataset management service for evaluations.""" + +import logging + +from fastapi import HTTPException +from sqlmodel import Session + +from app.core.cloud import get_cloud_storage +from app.crud.evaluations import ( + create_evaluation_dataset, + upload_csv_to_object_store, + upload_dataset_to_langfuse, +) +from app.models.evaluation import EvaluationDataset +from app.services.evaluation.validators import ( + parse_csv_items, + sanitize_dataset_name, +) +from app.utils import get_langfuse_client + +logger = logging.getLogger(__name__) + + +def upload_dataset( + session: Session, + csv_content: bytes, + dataset_name: str, + description: str | None, + duplication_factor: int, + organization_id: int, + project_id: int, +) -> EvaluationDataset: + """ + Orchestrate dataset upload workflow. + + Steps: + 1. Sanitize dataset name + 2. Parse and validate CSV + 3. Upload to object store + 4. Upload to Langfuse + 5. Store metadata in database + + Args: + session: Database session + csv_content: Raw CSV file content + dataset_name: Name for the dataset + description: Optional dataset description + duplication_factor: Number of times to duplicate each item + organization_id: Organization ID + project_id: Project ID + + Returns: + Created EvaluationDataset record + + Raises: + HTTPException: If upload fails at any step + """ + # Step 1: Sanitize dataset name for Langfuse compatibility + original_name = dataset_name + try: + dataset_name = sanitize_dataset_name(dataset_name) + except ValueError as e: + raise HTTPException(status_code=422, detail=f"Invalid dataset name: {str(e)}") + + if original_name != dataset_name: + logger.info( + f"[upload_dataset] Dataset name sanitized | '{original_name}' -> '{dataset_name}'" + ) + + logger.info( + f"[upload_dataset] Uploading dataset | dataset={dataset_name} | " + f"duplication_factor={duplication_factor} | org_id={organization_id} | " + f"project_id={project_id}" + ) + + # Step 2: Parse CSV and extract items + original_items = parse_csv_items(csv_content) + original_items_count = len(original_items) + total_items_count = original_items_count * duplication_factor + + logger.info( + f"[upload_dataset] Parsed items from CSV | original={original_items_count} | " + f"total_with_duplication={total_items_count}" + ) + + # Step 3: Upload to object store (if credentials configured) + object_store_url = None + try: + storage = get_cloud_storage(session=session, project_id=project_id) + object_store_url = upload_csv_to_object_store( + storage=storage, csv_content=csv_content, dataset_name=dataset_name + ) + if object_store_url: + logger.info( + f"[upload_dataset] Successfully uploaded CSV to object store | {object_store_url}" + ) + else: + logger.info( + "[upload_dataset] Object store upload returned None | " + "continuing without object store storage" + ) + except Exception as e: + logger.warning( + f"[upload_dataset] Failed to upload CSV to object store " + f"(continuing without object store) | {e}", + exc_info=True, + ) + object_store_url = None + + # Step 4: Upload to Langfuse + langfuse_dataset_id = None + try: + langfuse = get_langfuse_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + + langfuse_dataset_id, _ = upload_dataset_to_langfuse( + langfuse=langfuse, + items=original_items, + dataset_name=dataset_name, + duplication_factor=duplication_factor, + ) + + logger.info( + f"[upload_dataset] Successfully uploaded dataset to Langfuse | " + f"dataset={dataset_name} | id={langfuse_dataset_id}" + ) + + except Exception as e: + logger.error( + f"[upload_dataset] Failed to upload dataset to Langfuse | {e}", + exc_info=True, + ) + raise HTTPException( + status_code=500, detail=f"Failed to upload dataset to Langfuse: {e}" + ) + + # Step 5: Store metadata in database + metadata = { + "original_items_count": original_items_count, + "total_items_count": total_items_count, + "duplication_factor": duplication_factor, + } + + dataset = create_evaluation_dataset( + session=session, + name=dataset_name, + description=description, + dataset_metadata=metadata, + object_store_url=object_store_url, + langfuse_dataset_id=langfuse_dataset_id, + organization_id=organization_id, + project_id=project_id, + ) + + logger.info( + f"[upload_dataset] Successfully created dataset record in database | " + f"id={dataset.id} | name={dataset_name}" + ) + + return dataset diff --git a/backend/app/services/evaluation/evaluation.py b/backend/app/services/evaluation/evaluation.py new file mode 100644 index 00000000..ea091538 --- /dev/null +++ b/backend/app/services/evaluation/evaluation.py @@ -0,0 +1,329 @@ +"""Evaluation run orchestration service.""" + +import logging + +from fastapi import HTTPException +from sqlmodel import Session + +from app.crud.assistants import get_assistant_by_id +from app.crud.evaluations import ( + create_evaluation_run, + fetch_trace_scores_from_langfuse, + get_dataset_by_id, + get_evaluation_run_by_id, + save_score, + start_evaluation_batch, +) +from app.models.evaluation import EvaluationRun +from app.utils import get_langfuse_client, get_openai_client + +logger = logging.getLogger(__name__) + + +def build_evaluation_config( + session: Session, + config: dict, + assistant_id: str | None, + project_id: int, +) -> dict: + """ + Build evaluation configuration from assistant or provided config. + + If assistant_id is provided, fetch assistant and merge with config. + Config values take precedence over assistant values. + + Args: + session: Database session + config: Provided configuration dict + assistant_id: Optional assistant ID to fetch configuration from + project_id: Project ID for assistant lookup + + Returns: + Complete evaluation configuration dict + + Raises: + HTTPException: If assistant not found or model missing + """ + if assistant_id: + # Fetch assistant details from database + assistant = get_assistant_by_id( + session=session, + assistant_id=assistant_id, + project_id=project_id, + ) + + if not assistant: + raise HTTPException( + status_code=404, detail=f"Assistant {assistant_id} not found" + ) + + logger.info( + f"[build_evaluation_config] Found assistant in DB | id={assistant.id} | " + f"model={assistant.model} | instructions=" + f"{assistant.instructions[:50] if assistant.instructions else 'None'}..." + ) + + # Build config from assistant (use provided config values to override if present) + merged_config = { + "model": config.get("model", assistant.model), + "instructions": config.get("instructions", assistant.instructions), + "temperature": config.get("temperature", assistant.temperature), + } + + # Add tools if vector stores are available + vector_store_ids = config.get( + "vector_store_ids", assistant.vector_store_ids or [] + ) + if vector_store_ids and len(vector_store_ids) > 0: + merged_config["tools"] = [ + { + "type": "file_search", + "vector_store_ids": vector_store_ids, + } + ] + + logger.info("[build_evaluation_config] Using config from assistant") + return merged_config + + # Using provided config directly + logger.info("[build_evaluation_config] Using provided config directly") + + # Validate that config has minimum required fields + if not config.get("model"): + raise HTTPException( + status_code=400, + detail="Config must include 'model' when assistant_id is not provided", + ) + + return config + + +def start_evaluation( + session: Session, + dataset_id: int, + experiment_name: str, + config: dict, + assistant_id: str | None, + organization_id: int, + project_id: int, +) -> EvaluationRun: + """ + Start an evaluation run. + + Steps: + 1. Validate dataset exists and has Langfuse ID + 2. Build config (from assistant or direct) + 3. Create evaluation run record + 4. Start batch processing + + Args: + session: Database session + dataset_id: ID of the evaluation dataset + experiment_name: Name for this evaluation experiment/run + config: Evaluation configuration + assistant_id: Optional assistant ID to fetch configuration from + organization_id: Organization ID + project_id: Project ID + + Returns: + EvaluationRun instance + + Raises: + HTTPException: If dataset not found or evaluation fails to start + """ + logger.info( + f"[start_evaluation] Starting evaluation | experiment_name={experiment_name} | " + f"dataset_id={dataset_id} | " + f"org_id={organization_id} | " + f"assistant_id={assistant_id} | " + f"config_keys={list(config.keys())}" + ) + + # Step 1: Fetch dataset from database + dataset = get_dataset_by_id( + session=session, + dataset_id=dataset_id, + organization_id=organization_id, + project_id=project_id, + ) + + if not dataset: + raise HTTPException( + status_code=404, + detail=f"Dataset {dataset_id} not found or not accessible to this " + f"organization/project", + ) + + logger.info( + f"[start_evaluation] Found dataset | id={dataset.id} | name={dataset.name} | " + f"object_store_url={'present' if dataset.object_store_url else 'None'} | " + f"langfuse_id={dataset.langfuse_dataset_id}" + ) + + # Validate dataset has Langfuse ID + if not dataset.langfuse_dataset_id: + raise HTTPException( + status_code=400, + detail=f"Dataset {dataset_id} does not have a Langfuse dataset ID. " + "Please ensure Langfuse credentials were configured when the dataset was created.", + ) + + # Step 2: Build evaluation config + eval_config = build_evaluation_config( + session=session, + config=config, + assistant_id=assistant_id, + project_id=project_id, + ) + + # Get API clients + openai_client = get_openai_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + langfuse = get_langfuse_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + + # Step 3: Create EvaluationRun record + eval_run = create_evaluation_run( + session=session, + run_name=experiment_name, + dataset_name=dataset.name, + dataset_id=dataset_id, + config=eval_config, + organization_id=organization_id, + project_id=project_id, + ) + + # Step 4: Start the batch evaluation + try: + eval_run = start_evaluation_batch( + langfuse=langfuse, + openai_client=openai_client, + session=session, + eval_run=eval_run, + config=eval_config, + ) + + logger.info( + f"[start_evaluation] Evaluation started successfully | " + f"batch_job_id={eval_run.batch_job_id} | total_items={eval_run.total_items}" + ) + + return eval_run + + except Exception as e: + logger.error( + f"[start_evaluation] Failed to start evaluation | run_id={eval_run.id} | {e}", + exc_info=True, + ) + # Error is already handled in start_evaluation_batch + session.refresh(eval_run) + return eval_run + + +def get_evaluation_with_scores( + session: Session, + evaluation_id: int, + organization_id: int, + project_id: int, + get_trace_info: bool, + resync_score: bool, +) -> tuple[EvaluationRun | None, str | None]: + """ + Get evaluation run, optionally with trace scores from Langfuse. + + Handles caching logic for trace scores - scores are fetched on first request + and cached in the database. + + Args: + session: Database session + evaluation_id: ID of the evaluation run + organization_id: Organization ID + project_id: Project ID + get_trace_info: If true, fetch trace scores + resync_score: If true, clear cached scores and re-fetch + + Returns: + Tuple of (EvaluationRun or None, error_message or None) + """ + logger.info( + f"[get_evaluation_with_scores] Fetching status for evaluation run | " + f"evaluation_id={evaluation_id} | " + f"org_id={organization_id} | " + f"project_id={project_id} | " + f"get_trace_info={get_trace_info} | " + f"resync_score={resync_score}" + ) + + eval_run = get_evaluation_run_by_id( + session=session, + evaluation_id=evaluation_id, + organization_id=organization_id, + project_id=project_id, + ) + + if not eval_run: + return None, None + + if not get_trace_info: + return eval_run, None + + # Only fetch trace info for completed evaluations + if eval_run.status != "completed": + return eval_run, ( + f"Trace info is only available for completed evaluations. " + f"Current status: {eval_run.status}" + ) + + # Check if we already have cached scores + has_cached_score = eval_run.score is not None and "traces" in eval_run.score + if not resync_score and has_cached_score: + return eval_run, None + + # Get Langfuse client + langfuse = get_langfuse_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + + # Capture data needed for Langfuse fetch and DB update + dataset_name = eval_run.dataset_name + run_name = eval_run.run_name + eval_run_id = eval_run.id + + # Fetch scores from Langfuse + try: + score = fetch_trace_scores_from_langfuse( + langfuse=langfuse, + dataset_name=dataset_name, + run_name=run_name, + ) + except ValueError as e: + logger.warning( + f"[get_evaluation_with_scores] Run not found in Langfuse | " + f"evaluation_id={evaluation_id} | error={e}" + ) + return eval_run, str(e) + except Exception as e: + logger.error( + f"[get_evaluation_with_scores] Failed to fetch trace info | " + f"evaluation_id={evaluation_id} | error={e}", + exc_info=True, + ) + return eval_run, f"Failed to fetch trace info from Langfuse: {str(e)}" + + # Save score to database (uses its own session) + eval_run = save_score( + eval_run_id=eval_run_id, + organization_id=organization_id, + project_id=project_id, + score=score, + ) + + return eval_run, None diff --git a/backend/app/services/evaluation/validators.py b/backend/app/services/evaluation/validators.py new file mode 100644 index 00000000..e4adc0b2 --- /dev/null +++ b/backend/app/services/evaluation/validators.py @@ -0,0 +1,174 @@ +"""Validation utilities for evaluation datasets.""" + +import csv +import io +import logging +import re +from pathlib import Path + +from fastapi import HTTPException, UploadFile + +logger = logging.getLogger(__name__) + +# File upload security constants +MAX_FILE_SIZE = 1024 * 1024 # 1 MB +ALLOWED_EXTENSIONS = {".csv"} +ALLOWED_MIME_TYPES = { + "text/csv", + "application/csv", + "text/plain", # Some systems report CSV as text/plain +} + + +def sanitize_dataset_name(name: str) -> str: + """ + Sanitize dataset name for Langfuse compatibility. + + Langfuse has issues with spaces and special characters in dataset names. + This function ensures the name can be both created and fetched. + + Rules: + - Replace spaces with underscores + - Replace hyphens with underscores + - Keep only alphanumeric characters and underscores + - Convert to lowercase for consistency + - Remove leading/trailing underscores + - Collapse multiple consecutive underscores into one + + Args: + name: Original dataset name + + Returns: + Sanitized dataset name safe for Langfuse + + Examples: + "testing 0001" -> "testing_0001" + "My Dataset!" -> "my_dataset" + "Test--Data__Set" -> "test_data_set" + """ + # Convert to lowercase + sanitized = name.lower() + + # Replace spaces and hyphens with underscores + sanitized = sanitized.replace(" ", "_").replace("-", "_") + + # Keep only alphanumeric characters and underscores + sanitized = re.sub(r"[^a-z0-9_]", "", sanitized) + + # Collapse multiple underscores into one + sanitized = re.sub(r"_+", "_", sanitized) + + # Remove leading/trailing underscores + sanitized = sanitized.strip("_") + + # Ensure name is not empty + if not sanitized: + raise ValueError("Dataset name cannot be empty after sanitization") + + return sanitized + + +async def validate_csv_file(file: UploadFile) -> bytes: + """ + Validate CSV file extension, MIME type, and size. + + Args: + file: The uploaded file + + Returns: + CSV content as bytes if valid + + Raises: + HTTPException: If validation fails + """ + # Security validation: Check file extension + file_ext = Path(file.filename).suffix.lower() + if file_ext not in ALLOWED_EXTENSIONS: + raise HTTPException( + status_code=422, + detail=f"Invalid file type. Only CSV files are allowed. Got: {file_ext}", + ) + + # Security validation: Check MIME type + content_type = file.content_type + if content_type not in ALLOWED_MIME_TYPES: + raise HTTPException( + status_code=422, + detail=f"Invalid content type. Expected CSV, got: {content_type}", + ) + + # Security validation: Check file size + file.file.seek(0, 2) # Seek to end + file_size = file.file.tell() + file.file.seek(0) # Reset to beginning + + if file_size > MAX_FILE_SIZE: + raise HTTPException( + status_code=413, + detail=f"File too large. Maximum size: {MAX_FILE_SIZE / (1024 * 1024):.0f}MB", + ) + + if file_size == 0: + raise HTTPException(status_code=422, detail="Empty file uploaded") + + # Read and return content + return await file.read() + + +def parse_csv_items(csv_content: bytes) -> list[dict[str, str]]: + """ + Parse CSV and extract question/answer pairs. + + Args: + csv_content: CSV file content as bytes + + Returns: + List of dicts with 'question' and 'answer' keys + + Raises: + HTTPException: If CSV is invalid or empty + """ + try: + csv_text = csv_content.decode("utf-8") + csv_reader = csv.DictReader(io.StringIO(csv_text)) + + if not csv_reader.fieldnames: + raise HTTPException(status_code=422, detail="CSV file has no headers") + + # Normalize headers for case-insensitive matching + clean_headers = { + field.strip().lower(): field for field in csv_reader.fieldnames + } + + # Validate required headers (case-insensitive) + if "question" not in clean_headers or "answer" not in clean_headers: + raise HTTPException( + status_code=422, + detail=f"CSV must contain 'question' and 'answer' columns " + f"Found columns: {csv_reader.fieldnames}", + ) + + # Get the actual column names from the CSV + question_col = clean_headers["question"] + answer_col = clean_headers["answer"] + + # Extract items + items = [] + for row in csv_reader: + question = row.get(question_col, "").strip() + answer = row.get(answer_col, "").strip() + if question and answer: + items.append({"question": question, "answer": answer}) + + if not items: + raise HTTPException( + status_code=422, detail="No valid items found in CSV file" + ) + + return items + + except HTTPException: + raise + except Exception as e: + logger.error(f"[parse_csv_items] Failed to parse CSV | {e}", exc_info=True) + raise HTTPException(status_code=422, detail=f"Invalid CSV file: {e}") diff --git a/backend/app/tests/api/routes/test_evaluation.py b/backend/app/tests/api/routes/test_evaluation.py index c4eb3f0b..00aef2f5 100644 --- a/backend/app/tests/api/routes/test_evaluation.py +++ b/backend/app/tests/api/routes/test_evaluation.py @@ -54,13 +54,13 @@ def test_upload_dataset_valid_csv( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): # Mock object store upload @@ -140,13 +140,13 @@ def test_upload_dataset_empty_rows( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): # Mock object store and Langfuse uploads @@ -186,13 +186,13 @@ def test_upload_with_default_duplication( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -227,13 +227,13 @@ def test_upload_with_custom_duplication( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -268,13 +268,13 @@ def test_upload_with_description( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -360,13 +360,13 @@ def test_upload_with_duplication_factor_boundary_minimum( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -405,7 +405,7 @@ def test_upload_langfuse_configuration_fails( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch("app.crud.credentials.get_provider_credential") as mock_get_cred, ): From 7ba9b4339e0d0e2e57210e2feef99b22526dc89f Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 18 Dec 2025 12:39:28 +0530 Subject: [PATCH 2/3] cleanups --- backend/app/crud/evaluations/__init__.py | 33 --------------------- backend/app/services/evaluation/__init__.py | 16 ---------- 2 files changed, 49 deletions(-) diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index 4e797e9f..bb095413 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -35,36 +35,3 @@ process_completed_embedding_batch, process_completed_evaluation, ) - -__all__ = [ - # Core - "create_evaluation_run", - "get_evaluation_run_by_id", - "list_evaluation_runs", - "save_score", - # Cron - "process_all_pending_evaluations", - "process_all_pending_evaluations_sync", - # Dataset - "create_evaluation_dataset", - "delete_dataset", - "get_dataset_by_id", - "list_datasets", - "upload_csv_to_object_store", - # Batch - "start_evaluation_batch", - # Processing - "check_and_process_evaluation", - "poll_all_pending_evaluations", - "process_completed_embedding_batch", - "process_completed_evaluation", - # Embeddings - "calculate_average_similarity", - "calculate_cosine_similarity", - "start_embedding_batch", - # Langfuse - "create_langfuse_dataset_run", - "fetch_trace_scores_from_langfuse", - "update_traces_with_cosine_scores", - "upload_dataset_to_langfuse", -] diff --git a/backend/app/services/evaluation/__init__.py b/backend/app/services/evaluation/__init__.py index 6e3aac41..8c5a3690 100644 --- a/backend/app/services/evaluation/__init__.py +++ b/backend/app/services/evaluation/__init__.py @@ -14,19 +14,3 @@ sanitize_dataset_name, validate_csv_file, ) - -__all__ = [ - # Dataset - "upload_dataset", - # Evaluation - "build_evaluation_config", - "get_evaluation_with_scores", - "start_evaluation", - # Validators - "ALLOWED_EXTENSIONS", - "ALLOWED_MIME_TYPES", - "MAX_FILE_SIZE", - "parse_csv_items", - "sanitize_dataset_name", - "validate_csv_file", -] From 49a2063d621ae236501ac60202574535f648f8c4 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 18 Dec 2025 12:52:55 +0530 Subject: [PATCH 3/3] run pre commit --- backend/app/api/routes/evaluation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/app/api/routes/evaluation.py b/backend/app/api/routes/evaluation.py index ce7f3f9a..fa35c84f 100644 --- a/backend/app/api/routes/evaluation.py +++ b/backend/app/api/routes/evaluation.py @@ -30,6 +30,7 @@ router = APIRouter(tags=["evaluation"]) + def _dataset_to_response(dataset) -> DatasetUploadResponse: """Convert a dataset model to a DatasetUploadResponse.""" return DatasetUploadResponse(