diff --git a/tools/harness/README.md b/tools/harness/README.md index 63934a845..e31ee68dd 100644 --- a/tools/harness/README.md +++ b/tools/harness/README.md @@ -290,6 +290,7 @@ DATASET_DIR=/custom/path uv run nv-ingest-harness-run --case=e2e | `e2e_with_llm_summary` | E2E with LLM summarization via UDF | `active` section only | ✅ Available (YAML config) | | `recall` | Recall evaluation against existing collections | `active` + `recall` sections | ✅ Available (YAML config) | | `e2e_recall` | Fresh ingestion + recall evaluation | `active` + `recall` sections | ✅ Available (YAML config) | +| `qa_eval` | LLM answer quality evaluation (RAG) | `active` + `qa_eval` sections | ✅ Available (YAML config) | | `page_elements` | nemotron-page-elements-v3 model benchmarking (PyPi) | None | ✅ Available | | `graphic_elements` | nemotron-graphic-elements-v1 model benchmarking (PyPi) | None | ✅ Available | | `table_structure` | nemotron-table-structure-v1 model benchmarking (PyPi) | None | ✅ Available | @@ -360,6 +361,31 @@ active: enable_caption: true ``` +## QA Evaluation + +QA evaluation measures **LLM answer quality** over the full RAG pipeline: retrieve context from a VDB, generate answers with one or more LLMs, and score answers against ground truth using **multi-tier scoring** (retrieval signal, programmatic metrics, LLM-as-judge). The recommended **bo767** workflow uses **full-page markdown** context for fair comparison with research baselines. + +The eval harness is **pluggable**: your retrieval stack (vector, hybrid, agentic, or anything else) only needs to emit a JSON file that matches the **specification** consumed by `run_qa_eval.py` (via `FileRetriever`). If the JSON shape and query keys line up with the ground-truth dataset, you can compare methods without changing the evaluator. See [`src/nv_ingest_harness/utils/qa/README.md`](src/nv_ingest_harness/utils/qa/README.md) for the full contract. + +**Full documentation** (reproduction commands, env vars, retrieval JSON specification, architecture, harness CLI notes): [`src/nv_ingest_harness/utils/qa/README.md`](src/nv_ingest_harness/utils/qa/README.md). + +**At a glance** + +- **Default ground truth (standalone scripts):** [`data/bo767_annotations.csv`](../../data/bo767_annotations.csv) at the repo root -- the **bo767 annotations subset** we maintain for this benchmark (multi-modality Q&A over the bo767 PDFs). `QA_CSV` / `QA_DATASET` default to this path (resolved relative to the repo root from `tools/harness`). +- **Standalone scripts** in `tools/harness/`: `ingest_bo767.py`, `extract_bo767_parquet.py`, `build_page_markdown_index.py`, `export_retrieval_nemo.py`, `run_qa_eval.py`; optional `retrieve_and_export.py` when using the harness VDB stack. +- **Eval requires** `RETRIEVAL_FILE` and `NVIDIA_API_KEY`; all other knobs are in the QA README. +- **Full bo767 repro (ingest / LanceDB / NeMo Retriever):** requires a **Python 3.12 venv** with `nemo_retriever`, LanceDB, and `litellm`. See [Python environment](src/nv_ingest_harness/utils/qa/README.md#python-environment) in the QA README. + +**Harness CLI (alternative)** -- uses `test_configs.yaml`; dataset and retrieval file paths may differ from standalone defaults. See the QA README. + +```bash +uv run nv-ingest-harness-run --case=e2e --dataset=bo767 +uv run python retrieve_and_export.py +uv run nv-ingest-harness-run --case=qa_eval --dataset=bo767 +``` + +**Retrieval JSON** -- minimal shape: a top-level `queries` object mapping each ground-truth question string to `{ "chunks": ["...", ...] }` (plus optional metadata). Meet the full specification in the QA README so `run_qa_eval.py` can load it unchanged. + ## Recall Testing Recall testing evaluates retrieval accuracy against ground truth query sets. Two test cases are available: diff --git a/tools/harness/build_page_markdown_index.py b/tools/harness/build_page_markdown_index.py new file mode 100644 index 000000000..a32fe2188 --- /dev/null +++ b/tools/harness/build_page_markdown_index.py @@ -0,0 +1,119 @@ +"""Build a page-level markdown index from extracted Parquet files. + +Loads extraction results saved by extract_bo767_parquet.py, groups records +by (source document, page number), renders each page via to_markdown_by_page, +and writes a JSON index mapping source_id -> page_number -> markdown. + +Usage: + python build_page_markdown_index.py + +Env vars: + PARQUET_DIR Directory containing Parquet files (default: data/bo767_extracted) + OUTPUT_FILE Where to write the JSON index (default: data/bo767_page_markdown.json) +""" + +import json +import os +import sys +import time +from collections import defaultdict +from pathlib import Path + +import numpy as np + +_HERE = os.path.dirname(os.path.abspath(__file__)) + + +def main() -> int: + parquet_dir = os.environ.get( + "PARQUET_DIR", + os.path.join(_HERE, "data", "bo767_extracted"), + ) + output_file = os.environ.get( + "OUTPUT_FILE", + os.path.join(_HERE, "data", "bo767_page_markdown.json"), + ) + + print("=" * 60) + print("Build Page Markdown Index") + print("=" * 60) + print(f"Parquet dir: {parquet_dir}") + print(f"Output file: {output_file}") + + if not os.path.isdir(parquet_dir): + print(f"ERROR: Parquet directory not found: {parquet_dir}", file=sys.stderr) + return 1 + + import pandas as pd + from nemo_retriever.io.markdown import to_markdown_by_page + + parquet_files = sorted(Path(parquet_dir).rglob("*.parquet")) + if not parquet_files: + print(f"ERROR: No .parquet files found in {parquet_dir}", file=sys.stderr) + return 1 + + print(f"Found {len(parquet_files)} Parquet file(s)") + + t0 = time.monotonic() + dfs = [pd.read_parquet(f) for f in parquet_files] + df = pd.concat(dfs, ignore_index=True) + print(f"Loaded {len(df)} records in {time.monotonic() - t0:.1f}s") + print(f"Columns: {list(df.columns)}") + + path_col = "path" if "path" in df.columns else "source_id" + if path_col not in df.columns: + print("ERROR: Neither 'path' nor 'source_id' found in columns", file=sys.stderr) + return 1 + + def _ndarray_to_list(record: dict) -> dict: + """Pandas reads Parquet list columns as numpy arrays. + to_markdown_by_page checks isinstance(items, list), so convert them.""" + for key in ("table", "chart", "infographic", "tables", "charts", "infographics"): + val = record.get(key) + if isinstance(val, np.ndarray): + record[key] = val.tolist() + return record + + docs_grouped = defaultdict(list) + for _, row in df.iterrows(): + source = str(row.get(path_col, "")) + if source: + docs_grouped[source].append(_ndarray_to_list(row.to_dict())) + + print(f"Grouped into {len(docs_grouped)} documents") + + t1 = time.monotonic() + index: dict[str, dict[str, str]] = {} + total_pages = 0 + + for source_id, records in docs_grouped.items(): + try: + pages = to_markdown_by_page(records) + except Exception as exc: + print(f" WARNING: Failed to render {source_id}: {exc}") + continue + + page_map: dict[str, str] = {} + for page_number, markdown in pages.items(): + page_map[str(page_number)] = markdown + total_pages += 1 + + index[source_id] = page_map + + elapsed_render = time.monotonic() - t1 + print(f"Rendered {total_pages} pages from {len(index)} documents in {elapsed_render:.1f}s") + + os.makedirs(os.path.dirname(output_file), exist_ok=True) + with open(output_file, "w", encoding="utf-8") as f: + json.dump(index, f, ensure_ascii=False) + + size_mb = os.path.getsize(output_file) / 1024 / 1024 + print(f"\nIndex written to {output_file} ({size_mb:.1f} MB)") + print(f" Documents: {len(index)}") + print(f" Pages: {total_pages}") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/harness/export_retrieval_nemo.py b/tools/harness/export_retrieval_nemo.py new file mode 100644 index 000000000..2be76a483 --- /dev/null +++ b/tools/harness/export_retrieval_nemo.py @@ -0,0 +1,287 @@ +"""Export retrieval results from NeMo Retriever LanceDB to FileRetriever JSON. + +Uses nemo_retriever.retriever.Retriever to query the LanceDB table populated +by ingest_bo767.py, producing a JSON file compatible with the QA eval +pipeline's FileRetriever. + +Full-page markdown mode (optional): + Set PAGE_MARKDOWN_INDEX to a JSON file produced by build_page_markdown_index.py. + When set, vector search hits are expanded to full-page markdown -- multiple + hits from the same page are deduplicated into a single chunk containing the + complete page rendered via to_markdown_by_page. + +Usage: + python export_retrieval_nemo.py + +The output JSON lands at data/test_retrieval/bo767_retrieval.json by default. +""" + +import csv +import json +import os +import sys +import time + +_HERE = os.path.dirname(os.path.abspath(__file__)) + +DEFAULT_EMBEDDER = "nvidia/llama-nemotron-embed-1b-v2" +BATCH_SIZE = 50 + + +def load_queries(csv_path: str) -> list[dict]: + pairs = [] + with open(csv_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + query = row.get("query", "").strip() + answer = row.get("answer", "").strip() + if query and answer: + pairs.append(row) + return pairs + + +def parse_json_field(raw): + """Safely parse a field that may be a JSON string or already a dict.""" + if isinstance(raw, dict): + return raw + if isinstance(raw, str): + try: + return json.loads(raw) + except (json.JSONDecodeError, TypeError): + return {} + return {} + + +def _load_page_index(path: str) -> dict[str, dict[str, str]]: + """Load the page markdown index JSON: {source_id: {page_number_str: markdown}}.""" + with open(path, encoding="utf-8") as f: + return json.load(f) + + +def _expand_hits_to_pages( + hits: list[dict], + page_index: dict[str, dict[str, str]], +) -> tuple[list[str], list[dict], int]: + """Deduplicate hits by (source_id, page_number) and look up full-page markdown. + + Returns (chunks, metadata, miss_count) where miss_count is the number of + (source_id, page) pairs that had no entry in the page index. + """ + seen: dict[tuple[str, int], float] = {} + ordered_pages: list[tuple[str, int]] = [] + + for hit in hits: + source = parse_json_field(hit.get("source", "{}")) + meta = parse_json_field(hit.get("metadata", "{}")) + source_id = source.get("source_id", "") + page_number = meta.get("page_number", hit.get("page_number", -1)) + try: + page_number = int(page_number) + except (TypeError, ValueError): + page_number = -1 + distance = hit.get("_distance") + + key = (source_id, page_number) + if key not in seen: + seen[key] = distance + ordered_pages.append(key) + elif distance is not None and (seen[key] is None or distance < seen[key]): + seen[key] = distance + + chunks = [] + metadata = [] + miss_count = 0 + for source_id, page_number in ordered_pages: + page_str = str(page_number) + doc_pages = page_index.get(source_id, {}) + md = doc_pages.get(page_str) + if md is None: + miss_count += 1 + continue + chunks.append(md) + metadata.append( + { + "source_id": source_id, + "page_number": page_number, + "distance": seen[(source_id, page_number)], + } + ) + + return chunks, metadata, miss_count + + +def _validate_page_index_keys( + first_batch_hits: list[list[dict]], + page_index: dict[str, dict[str, str]], +) -> None: + """Sample source_ids from LanceDB hits and verify they exist in the page index. + + Aborts with an actionable error if none of the sampled keys match, which + indicates a source_id convention mismatch between ingest and extract. + """ + sampled: set[str] = set() + for hits in first_batch_hits: + for hit in hits: + source = parse_json_field(hit.get("source", "{}")) + sid = source.get("source_id", "") + if sid: + sampled.add(sid) + if len(sampled) >= 5: + break + if len(sampled) >= 5: + break + + if not sampled: + return + + matched = sum(1 for sid in sampled if sid in page_index) + if matched == 0: + sample_list = list(sampled)[:3] + index_sample = list(page_index.keys())[:3] + print( + f"ERROR: None of {len(sampled)} sampled LanceDB source_ids found in " + f"page index.\n" + f" LanceDB samples: {sample_list}\n" + f" Index samples: {index_sample}\n" + f" This usually means ingest and extract used different path " + f"conventions for source_id.\n" + f" Re-run extract_bo767_parquet.py with the same --dataset-dir " + f"used for ingest_bo767.py.", + file=sys.stderr, + ) + sys.exit(1) + print(f" Page index key check: {matched}/{len(sampled)} sampled source_ids found") + + +def main() -> int: + lancedb_uri = os.environ.get("LANCEDB_URI", os.path.join(_HERE, "lancedb")) + lancedb_table = os.environ.get("LANCEDB_TABLE", "nv-ingest") + top_k = int(os.environ.get("TOP_K", "5")) + embedder = os.environ.get("EMBEDDER", DEFAULT_EMBEDDER) + _repo_root = os.path.normpath(os.path.join(_HERE, "..", "..")) + csv_path = os.environ.get( + "QA_CSV", + os.path.join(_repo_root, "data", "bo767_annotations.csv"), + ) + output_file = os.environ.get( + "OUTPUT_FILE", + os.path.join(_HERE, "data", "test_retrieval", "bo767_retrieval.json"), + ) + page_markdown_index_path = os.environ.get("PAGE_MARKDOWN_INDEX", "") + + use_fullpage = bool(page_markdown_index_path) + page_index: dict[str, dict[str, str]] = {} + + print("=" * 60) + print("NeMo Retriever -> FileRetriever JSON Export") + print("=" * 60) + print(f"LanceDB URI: {lancedb_uri}") + print(f"LanceDB Table: {lancedb_table}") + print(f"Top-K: {top_k}") + print(f"Embedder: {embedder}") + print(f"QA CSV: {csv_path}") + print(f"Output: {output_file}") + print(f"Full-page mode: {'ON (' + page_markdown_index_path + ')' if use_fullpage else 'OFF'}") + print("=" * 60) + + if use_fullpage: + print("\nLoading page markdown index ...") + page_index = _load_page_index(page_markdown_index_path) + total_pages = sum(len(pages) for pages in page_index.values()) + print(f" {len(page_index)} documents, {total_pages} pages") + + qa_pairs = load_queries(csv_path) + query_strings = [p["query"] for p in qa_pairs] + print(f"\nLoaded {len(query_strings)} queries") + + from nemo_retriever.retriever import Retriever + + retriever = Retriever( + lancedb_uri=lancedb_uri, + lancedb_table=lancedb_table, + embedder=embedder, + top_k=top_k, + reranker=False, + ) + + all_results: dict[str, dict] = {} + total_page_misses = 0 + t0 = time.monotonic() + validated_index = False + + for batch_start in range(0, len(query_strings), BATCH_SIZE): + batch = query_strings[batch_start : batch_start + BATCH_SIZE] + batch_hits = retriever.queries(batch) + + if use_fullpage and not validated_index: + _validate_page_index_keys(batch_hits, page_index) + validated_index = True + + for query, hits in zip(batch, batch_hits): + if use_fullpage: + chunks, metadata, misses = _expand_hits_to_pages(hits, page_index) + total_page_misses += misses + else: + chunks = [] + metadata = [] + for hit in hits: + chunks.append(hit.get("text", "")) + + source = parse_json_field(hit.get("source", "{}")) + meta = parse_json_field(hit.get("metadata", "{}")) + + metadata.append( + { + "source_id": source.get("source_id", ""), + "page_number": meta.get("page_number", hit.get("page_number", "")), + "distance": hit.get("_distance"), + } + ) + + all_results[query] = {"chunks": chunks, "metadata": metadata} + + elapsed = time.monotonic() - t0 + done = min(batch_start + BATCH_SIZE, len(query_strings)) + print(f" Progress: {done}/{len(query_strings)} queries ({elapsed:.1f}s)") + + total_elapsed = time.monotonic() - t0 + + empty_count = sum(1 for r in all_results.values() if not r["chunks"]) + avg_chunks = sum(len(r["chunks"]) for r in all_results.values()) / max(len(all_results), 1) + + print(f"\nRetrieval complete in {total_elapsed:.1f}s") + print(f" Queries: {len(all_results)}") + print(f" Avg chunks: {avg_chunks:.1f}") + print(f" Empty results: {empty_count}") + if use_fullpage: + print(f" Page index misses: {total_page_misses}") + + chunk_mode = "full-page markdown" if use_fullpage else "sub-page chunks" + os.makedirs(os.path.dirname(output_file), exist_ok=True) + meta = { + "vdb_backend": "lancedb", + "collection_name": lancedb_table, + "top_k": top_k, + "embedding_model": embedder, + "chunk_mode": chunk_mode, + "query_count": len(all_results), + "elapsed_s": round(total_elapsed, 1), + } + if use_fullpage: + meta["page_index_misses"] = total_page_misses + output = { + "metadata": meta, + "queries": all_results, + } + + with open(output_file, "w") as f: + json.dump(output, f, indent=2) + + print(f"\nExported to {output_file}") + print(f"File size: {os.path.getsize(output_file) / 1024 / 1024:.1f} MB") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/harness/extract_bo767_parquet.py b/tools/harness/extract_bo767_parquet.py new file mode 100644 index 000000000..11918a811 --- /dev/null +++ b/tools/harness/extract_bo767_parquet.py @@ -0,0 +1,96 @@ +"""Extract bo767 PDFs to Parquet (no embed, no VDB upload). + +Runs only the extraction stage of the NeMo Retriever pipeline and saves +the full records (including table/chart/infographic columns) to Parquet. +These records are consumed by build_page_markdown_index.py to reconstruct +full-page markdown for the QA eval pipeline. + +Usage: + python extract_bo767_parquet.py + +Env vars: + DATASET_DIR Path to bo767 PDF directory (required) + OUTPUT_DIR Where to write Parquet files (default: data/bo767_extracted) +""" + +import argparse +import glob +import json +import os +import sys +import time + + +def main(): + parser = argparse.ArgumentParser(description="Extract bo767 PDFs to Parquet") + parser.add_argument( + "--dataset-dir", + default=os.environ.get("DATASET_DIR", ""), + help="Path to bo767 PDF directory (or set DATASET_DIR env var)", + ) + parser.add_argument( + "--output-dir", + default=os.environ.get( + "OUTPUT_DIR", + os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "bo767_extracted"), + ), + help="Where to write Parquet files", + ) + parser.add_argument( + "--smoke-test", + action="store_true", + help="Extract only the first file as a quick check", + ) + args = parser.parse_args() + + if not args.dataset_dir: + parser.error("--dataset-dir is required (or set DATASET_DIR env var)") + + pdf_pattern = os.path.join(args.dataset_dir, "*.pdf") + all_pdfs = sorted(glob.glob(pdf_pattern)) + if not all_pdfs: + print(f"No PDFs found at {pdf_pattern}", file=sys.stderr) + sys.exit(1) + + if args.smoke_test: + documents = [all_pdfs[0]] + print(f"Smoke test: extracting 1 file: {documents[0]}") + else: + documents = all_pdfs + print(f"Extracting {len(documents)} PDFs from {args.dataset_dir}") + + print(f"Output dir: {args.output_dir}") + + from nemo_retriever import create_ingestor + + ingestor = create_ingestor(run_mode="batch") + ingestor = ingestor.files(documents).extract( + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=True, + ) + + print("Running extraction (no embed, no vdb_upload) ...") + t0 = time.time() + ingestor.ingest() + elapsed_ingest = time.time() - t0 + print(f"Extraction done in {elapsed_ingest:.1f}s") + + print(f"Saving intermediate results to {args.output_dir} ...") + t1 = time.time() + ingestor.save_intermediate_results(args.output_dir) + elapsed_save = time.time() - t1 + print(f"Parquet saved in {elapsed_save:.1f}s") + + summary = { + "documents": len(documents), + "output_dir": args.output_dir, + "extract_elapsed_s": round(elapsed_ingest, 2), + "save_elapsed_s": round(elapsed_save, 2), + } + print(f"\nSummary: {json.dumps(summary, indent=2)}") + + +if __name__ == "__main__": + main() diff --git a/tools/harness/ingest_bo767.py b/tools/harness/ingest_bo767.py new file mode 100644 index 000000000..2cd593bb5 --- /dev/null +++ b/tools/harness/ingest_bo767.py @@ -0,0 +1,92 @@ +"""Ingest bo767 PDFs into LanceDB using NeMo Retriever Library (no containers). + +Usage (single-file smoke test): + python ingest_bo767.py --smoke-test + +Usage (full dataset): + python ingest_bo767.py + +The LanceDB table is written to ./lancedb/nv-ingest by default. +""" + +import argparse +import glob +import json +import os +import sys +import time + + +def main(): + parser = argparse.ArgumentParser(description="Ingest bo767 via nemo-retriever library") + parser.add_argument( + "--dataset-dir", + default=os.environ.get("DATASET_DIR", ""), + help="Path to bo767 PDF directory (or set DATASET_DIR env var)", + ) + parser.add_argument("--lancedb-uri", default="lancedb", help="LanceDB URI (directory)") + parser.add_argument( + "--lancedb-table", + default="nv-ingest", + help="LanceDB table name (must match export_retrieval_nemo.py LANCEDB_TABLE)", + ) + parser.add_argument("--smoke-test", action="store_true", help="Ingest only the first file as a quick check") + args = parser.parse_args() + + if not args.dataset_dir: + parser.error("--dataset-dir is required (or set DATASET_DIR env var)") + + pdf_pattern = os.path.join(args.dataset_dir, "*.pdf") + all_pdfs = sorted(glob.glob(pdf_pattern)) + if not all_pdfs: + print(f"No PDFs found at {pdf_pattern}", file=sys.stderr) + sys.exit(1) + + if args.smoke_test: + documents = [all_pdfs[0]] + print(f"Smoke test: ingesting 1 file: {documents[0]}") + else: + documents = all_pdfs + print(f"Ingesting {len(documents)} PDFs from {args.dataset_dir}") + + from nemo_retriever import create_ingestor + + ingestor = create_ingestor(run_mode="batch") + ingestor = ( + ingestor.files(documents) + .extract( + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=True, + ) + .embed() + .vdb_upload( + lancedb_uri=args.lancedb_uri, + lancedb_table=args.lancedb_table, + ) + ) + + print(f"LanceDB: {args.lancedb_uri}/{args.lancedb_table}") + t0 = time.time() + ray_dataset = ingestor.ingest() + elapsed = time.time() - t0 + + chunks = ray_dataset.get_dataset().take_all() + print(f"\nDone in {elapsed:.1f}s") + print(f"Total chunks: {len(chunks)}") + if chunks: + print(f"First chunk preview: {chunks[0].get('text', '')[:200]}...") + + summary = { + "documents": len(documents), + "chunks": len(chunks), + "elapsed_s": round(elapsed, 2), + "lancedb_uri": args.lancedb_uri, + "lancedb_table": args.lancedb_table, + } + print(f"\nSummary: {json.dumps(summary, indent=2)}") + + +if __name__ == "__main__": + main() diff --git a/tools/harness/pyproject.toml b/tools/harness/pyproject.toml index 9b2721ba3..37a660e7c 100644 --- a/tools/harness/pyproject.toml +++ b/tools/harness/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "docker>=7.1.0", "pyyaml>=6.0", "requests>=2.32.5", + "tenacity>=8.2.0", "pynvml>=11.5.0", "nv-ingest", "nv-ingest-api", @@ -19,6 +20,8 @@ dependencies = [ "nemotron-graphic-elements-v1>=0.dev0", "nemotron-table-structure-v1>=0.dev0", "nemotron-ocr>=0.dev0", + "litellm>=1.40.0", + "datasets>=2.19.0", ] [project.scripts] @@ -31,6 +34,11 @@ nv-ingest-harness-stats = "nv_ingest_harness.cli.stats:main" [tool.uv] package = true +# fastparquet>=2026.3.0 dropped x86_64 wheels (manylinux_2_39 only gets i686/aarch64). +# Override to last known-good version until upstream fixes their wheel builds. +override-dependencies = [ + "fastparquet<2026.3.0", +] [tool.uv.sources] nv-ingest = { path = "../../src", editable = true } diff --git a/tools/harness/retrieve_and_export.py b/tools/harness/retrieve_and_export.py new file mode 100644 index 000000000..8d0735829 --- /dev/null +++ b/tools/harness/retrieve_and_export.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" +Export VDB retrieval results for all ground-truth queries to a FileRetriever JSON. + +This bridges ingestion (e2e case) and QA evaluation (qa_eval case): + 1. Run e2e to ingest PDFs into VDB. + 2. Run this script to query the VDB for every ground-truth question. + 3. Run qa_eval with qa_retriever=file pointing at the exported JSON. + +Separating retrieval from generation/judging means you can: + - Iterate on LLM configs without re-running expensive VDB queries. + - Cache retrieval results for reproducibility. + - Inspect retrieved chunks before spending money on API calls. + +Usage: + uv run python retrieve_and_export.py + +Environment variables: + HOSTNAME Service hostname (default: localhost) + VDB_BACKEND "lancedb" or "milvus" (default: lancedb) + COLLECTION_NAME VDB collection / LanceDB table name (default: bo767_multimodal) + LANCEDB_DIR LanceDB base directory (default: tools/harness/lancedb) + TOP_K Chunks per query (default: 5) + QA_DATASET Dataset key or csv: path (default: csv:data/bo767_annotations.csv) + GROUND_TRUTH_DIR Directory with the CSV (default: tools/harness/data) + BATCH_SIZE Queries per VDB batch call (default: 50) + OUTPUT_FILE Where to write the JSON (default: data/test_retrieval/bo767_retrieval.json) + SPARSE Enable sparse/hybrid for Milvus (default: false) + GPU_SEARCH Use GPU search for Milvus (default: false) + HYBRID LanceDB hybrid retrieval (default: false) +""" + +import json +import os +import sys +import time +from pathlib import Path + +_HERE = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(_HERE, "src")) + + +def _bool_env(name: str, default: bool = False) -> bool: + return os.environ.get(name, str(default)).strip().lower() in ("1", "true", "yes") + + +def main() -> int: + hostname = os.environ.get("HOSTNAME", "localhost") + vdb_backend = os.environ.get("VDB_BACKEND", "lancedb") + collection_name = os.environ.get("COLLECTION_NAME", "bo767_multimodal") + lancedb_dir = os.environ.get("LANCEDB_DIR", os.path.join(_HERE, "lancedb")) + top_k = int(os.environ.get("TOP_K", "5")) + qa_dataset = os.environ.get("QA_DATASET", "csv:data/bo767_annotations.csv") + ground_truth_dir = os.environ.get("GROUND_TRUTH_DIR", os.path.join(_HERE, "data")) + batch_size = int(os.environ.get("BATCH_SIZE", "50")) + output_file = os.environ.get("OUTPUT_FILE", os.path.join(_HERE, "data", "test_retrieval", "bo767_retrieval.json")) + sparse = _bool_env("SPARSE") + gpu_search = _bool_env("GPU_SEARCH") + hybrid = _bool_env("HYBRID") + + embedding_endpoint = f"http://{hostname}:8012/v1" + + print("=" * 60) + print("Retrieve & Export") + print("=" * 60) + print(f"VDB Backend: {vdb_backend}") + print(f"Collection: {collection_name}") + print(f"Hostname: {hostname}") + print(f"Embedding: {embedding_endpoint}") + print(f"Top-K: {top_k}") + print(f"Batch size: {batch_size}") + print(f"Dataset: {qa_dataset}") + print(f"Ground truth: {ground_truth_dir}") + print(f"Output: {output_file}") + print("=" * 60) + + # Load ground truth queries + from nv_ingest_harness.utils.qa.ground_truth import get_qa_dataset_loader + + loader = get_qa_dataset_loader(qa_dataset) + qa_pairs = loader(data_dir=ground_truth_dir) + queries = [pair["query"] for pair in qa_pairs] + print(f"\nLoaded {len(queries)} queries from '{qa_dataset}'") + + # Detect embedding model + from nv_ingest_harness.utils.interact import embed_info + + model_name, _ = embed_info() + print(f"Embedding model: {model_name}") + + # Build retrieval function + table_path = None + if vdb_backend == "lancedb": + table_path = str(Path(lancedb_dir) / collection_name) + if not os.path.exists(table_path): + print(f"ERROR: LanceDB table not found at {table_path}", file=sys.stderr) + print("Run the e2e ingestion case first:", file=sys.stderr) + print(" uv run nv-ingest-harness-run --case=e2e --dataset=bo767", file=sys.stderr) + return 1 + + from nv_ingest_harness.utils.recall import get_retrieval_func + + retrieval_func = get_retrieval_func( + vdb_backend=vdb_backend, + table_path=table_path, + table_name=collection_name, + hybrid=hybrid, + ) + + # Batch retrieval + all_results: dict[str, dict] = {} + total = len(queries) + t0 = time.monotonic() + + for batch_start in range(0, total, batch_size): + batch_end = min(batch_start + batch_size, total) + batch_queries = queries[batch_start:batch_end] + + if vdb_backend == "lancedb": + raw = retrieval_func( + batch_queries, + embedding_endpoint=embedding_endpoint, + model_name=model_name, + top_k=top_k, + ) + else: + raw = retrieval_func( + batch_queries, + collection_name, + hybrid=sparse, + embedding_endpoint=embedding_endpoint, + model_name=model_name, + top_k=top_k, + gpu_search=gpu_search, + ) + + for i, query in enumerate(batch_queries): + hits = raw[i] if i < len(raw) else [] + chunks = [] + metadata = [] + + for hit in hits: + entity = hit.get("entity", {}) + text = entity.get("text", "") + chunks.append(text) + + source = entity.get("source", {}) + content_meta = entity.get("content_metadata", {}) + metadata.append( + { + "source_id": source.get("source_id", ""), + "page_number": content_meta.get("page_number", ""), + "distance": hit.get("distance"), + } + ) + + all_results[query] = { + "chunks": chunks, + "metadata": metadata, + } + + elapsed = time.monotonic() - t0 + print(f" Progress: {batch_end}/{total} queries retrieved ({elapsed:.1f}s)") + + total_elapsed = time.monotonic() - t0 + + # Stats + empty_count = sum(1 for r in all_results.values() if not r["chunks"]) + avg_chunks = sum(len(r["chunks"]) for r in all_results.values()) / max(len(all_results), 1) + + print(f"\nRetrieval complete in {total_elapsed:.1f}s") + print(f" Queries: {len(all_results)}") + print(f" Avg chunks: {avg_chunks:.1f}") + print(f" Empty results: {empty_count}") + + # Write output + os.makedirs(os.path.dirname(output_file), exist_ok=True) + + output = { + "metadata": { + "vdb_backend": vdb_backend, + "collection_name": collection_name, + "top_k": top_k, + "embedding_model": model_name, + "query_count": len(all_results), + "dataset": qa_dataset, + "elapsed_s": round(total_elapsed, 1), + }, + "queries": all_results, + } + + with open(output_file, "w") as f: + json.dump(output, f, indent=2) + + print(f"\nExported to {output_file}") + print(f"File size: {os.path.getsize(output_file) / 1024 / 1024:.1f} MB") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/harness/run_qa_eval.py b/tools/harness/run_qa_eval.py new file mode 100644 index 000000000..d11af9175 --- /dev/null +++ b/tools/harness/run_qa_eval.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +""" +Standalone QA evaluation runner for FileRetriever mode. + +Reads all config from environment variables so it can be used directly +inside the minimal Docker image without the full harness CLI/config stack. + +Required env vars: + RETRIEVAL_FILE Path to the retrieval results JSON + e.g. /data/test_retrieval/bo767_sample.json + NVIDIA_API_KEY API key for NVIDIA NIM endpoints + +Optional env vars: + GROUND_TRUTH_DIR Directory used by some dataset loaders (ViDoRe, etc.). + For csv: datasets, paths are taken from QA_DATASET. + QA_DATASET Dataset key or csv: path (default: csv:/data/bo767_annotations.csv). + Use "csv:/path/to/file.csv" for custom CSVs. + QA_TOP_K Chunks per query (default: 5) + QA_MAX_WORKERS Concurrent API calls (default: 4) + QA_LIMIT Only evaluate the first N queries (0 = all, default: 0) + OUTPUT_FILE Where to write results JSON (default: /tmp/qa_results.json) + JUDGE_MODEL litellm model string for judge + (default: nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1) + JUDGE_API_BASE Override endpoint for judge model + GEN_MODEL_NAME Short label for the generator (default: generator) + GEN_MODEL litellm model string for answer generation + (default: nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5) + GEN_API_BASE Override endpoint for generator model + GEN_MODELS Multi-model sweep: comma-separated name:model pairs. + Overrides GEN_MODEL/GEN_MODEL_NAME when set. + e.g. "nemotron:nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5, + qwen3:nvidia_nim/qwen/qwen3-235b-a22b" + LITELLM_DEBUG Set to 1 to enable full litellm request/response logging +""" + +import json +import os +import sys + +# Allow running from repo root: add the harness src/ to sys.path +_HERE = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(_HERE, "src")) + + +def _require(name: str) -> str: + value = os.environ.get(name, "") + if not value: + print(f"ERROR: {name} environment variable is required but not set", file=sys.stderr) + sys.exit(1) + return value + + +def _print_errors(eval_results: dict, qa_pairs: list) -> None: + """Print per-query error details so they are visible without the results JSON.""" + per_query = eval_results.get("per_query", []) + errors_found = False + for i, qr in enumerate(per_query): + query_text = qr.get("query", "")[:60] + for model_name, gen in qr.get("generations", {}).items(): + if gen.get("error"): + if not errors_found: + print("\n--- Generation errors ---") + errors_found = True + print(f" [query {i}] {query_text!r}") + print(f" model={model_name} error={gen['error']}") + for model_name, jdg in qr.get("judgements", {}).items(): + if jdg.get("error"): + if not errors_found: + print("\n--- Judge errors ---") + errors_found = True + print(f" [query {i}] {query_text!r}") + print(f" model={model_name} error={jdg['error']}") + if not errors_found and per_query: + print("\nNo per-query errors.") + + +def _print_multi_tier_summary(eval_results: dict, total_queries: int) -> None: + """Print the multi-tier evaluation summary to stdout.""" + print("\n" + "=" * 60) + print("Multi-Tier Results") + print("=" * 60) + + tier1 = eval_results.get("tier1_retrieval", {}) + aic_rate = tier1.get("answer_in_context_rate", 0) + aic_count = tier1.get("answer_in_context_count", 0) + aic_total = tier1.get("total", total_queries) + print("\nTier 1 - Retrieval Quality:") + print(f" Answer-in-Context rate: {aic_rate:.1%} ({aic_count}/{aic_total})") + + tier2 = eval_results.get("tier2_programmatic", {}) + if tier2: + print("\nTier 2 - Programmatic Answer Quality:") + for name, metrics in tier2.items(): + em = metrics.get("mean_exact_match", 0) + f1 = metrics.get("mean_token_f1", 0) + print(f" {name:20s} exact_match={em:.1%} token_f1={f1:.3f}") + + by_model = eval_results.get("by_model", {}) + if by_model: + print("\nTier 3 - LLM Judge:") + for name, stats in by_model.items(): + ms = stats.get("mean_score", 0) + ml = stats.get("mean_latency_s", 0) + sc = stats.get("scored_count", 0) + ec = stats.get("error_count", 0) + print(f" {name:20s} mean={ms:.2f}/5 latency={ml:.1f}s scored={sc} errors={ec}") + dist = stats.get("score_distribution", {}) + if dist: + print(f" {'':20s} dist: " + " ".join(f"{k}:{v}" for k, v in sorted(dist.items()))) + + fb = eval_results.get("failure_breakdown", {}) + if fb: + print("\nFailure Breakdown:") + for name, counts in fb.items(): + parts = " ".join(f"{k}:{v}" for k, v in sorted(counts.items())) + print(f" {name:20s} {parts}") + + print("=" * 60) + + +def main() -> int: + retrieval_file = _require("RETRIEVAL_FILE") + ground_truth_dir = os.environ.get("GROUND_TRUTH_DIR", os.path.join(_HERE, "data")) + _repo_root = os.path.normpath(os.path.join(_HERE, "..", "..")) + qa_dataset = os.environ.get( + "QA_DATASET", + "csv:" + os.path.join(_repo_root, "data", "bo767_annotations.csv"), + ) + qa_top_k = int(os.environ.get("QA_TOP_K", "5")) + qa_max_workers = int(os.environ.get("QA_MAX_WORKERS", "4")) + qa_limit = int(os.environ.get("QA_LIMIT", "0")) + output_file = os.environ.get("OUTPUT_FILE", "/tmp/qa_results.json") + litellm_debug = os.environ.get("LITELLM_DEBUG", "0").strip() in ("1", "true", "yes") + + judge_model = os.environ.get("JUDGE_MODEL", "nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1") + judge_api_base = os.environ.get("JUDGE_API_BASE") + + gen_models_str = os.environ.get("GEN_MODELS", "") + gen_name = os.environ.get("GEN_MODEL_NAME", "generator") + gen_model = os.environ.get("GEN_MODEL", "nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5") + gen_api_base = os.environ.get("GEN_API_BASE") + + # Validate API key looks real before spending time loading everything + api_key = os.environ.get("NVIDIA_API_KEY", "") + if not api_key: + print("WARNING: NVIDIA_API_KEY is not set. API calls will fail.", file=sys.stderr) + elif api_key.startswith("nvapi-") and len(api_key) < 20: + print("WARNING: NVIDIA_API_KEY looks like a placeholder. Did you paste the real key?", file=sys.stderr) + + if litellm_debug: + import litellm + + litellm._turn_on_debug() + + from nv_ingest_harness.utils.qa.retrievers import FileRetriever + from nv_ingest_harness.utils.qa.generators import LiteLLMClient + from nv_ingest_harness.utils.qa.judges import LLMJudge + from nv_ingest_harness.utils.qa.orchestrator import QAEvalPipeline + from nv_ingest_harness.utils.qa.ground_truth import get_qa_dataset_loader + + gen_model_pairs: list[tuple[str, str]] = [] + if gen_models_str: + for entry in gen_models_str.split(","): + entry = entry.strip() + if ":" not in entry: + print(f"ERROR: GEN_MODELS entry '{entry}' must be name:model", file=sys.stderr) + sys.exit(1) + name, model = entry.split(":", 1) + gen_model_pairs.append((name.strip(), model.strip())) + else: + gen_model_pairs.append((gen_name, gen_model)) + + print("=" * 60) + print("QA Evaluation (standalone runner)") + print("=" * 60) + print(f"Dataset: {qa_dataset}") + print(f"Retrieval file: {retrieval_file}") + print(f"Ground truth: {ground_truth_dir}") + print(f"Top-K: {qa_top_k}") + print(f"Max workers: {qa_max_workers}") + for gn, gm in gen_model_pairs: + print(f"Generator: {gn} ({gm})") + print(f" api_base: {gen_api_base}") + print(f"Judge: {judge_model}") + print(f" api_base: {judge_api_base}") + print(f"NVIDIA_API_KEY: {'set (' + api_key[:12] + '...)' if api_key else 'NOT SET'}") + print("=" * 60) + + loader = get_qa_dataset_loader(qa_dataset) + qa_pairs = loader(data_dir=ground_truth_dir) + print(f"\nLoaded {len(qa_pairs)} Q&A pairs from '{qa_dataset}'") + + if qa_limit and qa_limit > 0: + qa_pairs = qa_pairs[:qa_limit] + print(f"QA_LIMIT={qa_limit}: evaluating first {len(qa_pairs)} pairs") + + retriever = FileRetriever(file_path=retrieval_file) + coverage = retriever.check_coverage(qa_pairs) + if coverage < 0.5: + print( + f"WARNING: retrieval file covers only {coverage:.0%} of queries -- " + "results will be unreliable. Check that the retrieval JSON was " + "generated from the same query set.", + file=sys.stderr, + ) + + llm_clients = {} + for gn, gm in gen_model_pairs: + llm_clients[gn] = LiteLLMClient( + model=gm, + api_base=gen_api_base or None, + api_key=api_key or None, + ) + + judge = LLMJudge( + model=judge_model, + api_base=judge_api_base or None, + api_key=api_key or None, + ) + + pipeline = QAEvalPipeline( + retriever=retriever, + llm_clients=llm_clients, + judge=judge, + top_k=qa_top_k, + max_workers=qa_max_workers, + ) + + print(f"\nRunning evaluation ({len(qa_pairs)} queries, {len(llm_clients)} model(s)) ...") + eval_results = pipeline.evaluate(qa_pairs) + + _print_multi_tier_summary(eval_results, len(qa_pairs)) + + # Always print error details so they are visible without the results JSON + _print_errors(eval_results, qa_pairs) + + output_dir = os.path.dirname(output_file) + if output_dir: + os.makedirs(output_dir, exist_ok=True) + with open(output_file, "w") as f: + json.dump( + { + "dataset": qa_dataset, + "retrieval_file": retrieval_file, + "top_k": qa_top_k, + "qa_results": eval_results, + }, + f, + indent=2, + ) + print(f"\nResults written to {output_file}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/harness/src/nv_ingest_harness/cases/e2e_qa_eval.py b/tools/harness/src/nv_ingest_harness/cases/e2e_qa_eval.py new file mode 100644 index 000000000..8787b9dd6 --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/cases/e2e_qa_eval.py @@ -0,0 +1,136 @@ +""" +E2E QA evaluation test case - fresh ingestion followed by QA evaluation. + +Calls e2e.py to handle ingestion and collection creation, then qa_eval.py +to evaluate LLM answer quality against the newly ingested collection. + +Use this to measure end-to-end: extract_tables=true vs false, or different +ingestion settings, and see how they affect downstream answer accuracy. +""" + +import json +import os + +from nv_ingest_harness.cases.e2e import main as e2e_main +from nv_ingest_harness.cases.qa_eval import main as qa_eval_main +from nv_ingest_harness.utils.recall import get_recall_collection_name + + +def main(config=None, log_path: str = "test_results") -> int: + """ + Main entry point for E2E QA evaluation. + + Args: + config: TestConfig object with qa_* fields populated. + log_path: Directory for result artifacts. + + Returns: + 0 on success, non-zero on error. + """ + if config is None: + print("ERROR: No configuration provided") + return 2 + + qa_dataset = getattr(config, "qa_dataset", None) + if not qa_dataset: + print("ERROR: qa_dataset must be specified in configuration") + print("Set qa_dataset in test_configs.yaml qa_eval section or via QA_DATASET env var") + return 1 + + test_name = config.test_name or os.path.basename(config.dataset_dir.rstrip("/")) + collection_name = get_recall_collection_name(test_name) + + # Ensure TopKRetriever uses the collection created by this e2e run + original_collection_name = config.collection_name + config.collection_name = collection_name + + print("=" * 60) + print("E2E QA Evaluation Configuration") + print("=" * 60) + print(f"Dataset: {config.dataset_dir}") + print(f"Test Name: {test_name}") + print(f"Collection: {collection_name}") + print(f"QA Dataset: {qa_dataset}") + print("=" * 60) + + # Step 1: Ingestion + print("\n" + "=" * 60) + print("Step 1: Running Ingestion (via e2e)") + print("=" * 60) + + e2e_rc = e2e_main(config=config, log_path=log_path) + if e2e_rc != 0: + print(f"ERROR: Ingestion failed with exit code: {e2e_rc}") + config.collection_name = original_collection_name + return e2e_rc + + # Load e2e results before qa_eval overwrites _test_results.json + results_file = os.path.join(log_path, "_test_results.json") + e2e_results = {} + if os.path.exists(results_file): + try: + with open(results_file) as f: + e2e_data = json.load(f) + e2e_results = { + "test_config": e2e_data.get("test_config", {}), + "results": e2e_data.get("results", {}), + } + except (json.JSONDecodeError, IOError): + pass + + # Step 2: QA evaluation + print("\n" + "=" * 60) + print("Step 2: Running QA Evaluation (via qa_eval)") + print("=" * 60) + + qa_rc = qa_eval_main(config=config, log_path=log_path) + if qa_rc != 0: + print(f"Warning: QA evaluation returned non-zero exit code: {qa_rc}") + + # Restore original collection_name + config.collection_name = original_collection_name + + # Load QA results and build combined output + qa_results = {} + if os.path.exists(results_file): + try: + with open(results_file) as f: + qa_data = json.load(f) + qa_results = qa_data.get("qa_results", {}) + except (json.JSONDecodeError, IOError): + pass + + combined = { + "test_type": "e2e_qa_eval", + "test_config": { + "test_name": test_name, + "collection_name": collection_name, + "qa_dataset": qa_dataset, + }, + "ingestion_results": e2e_results.get("results", {}), + "qa_results": qa_results, + } + + # Carry over relevant ingestion config fields + for key in ["api_version", "dataset_dir", "hostname", "model_name", "dense_dim", "sparse", "gpu_search"]: + if key in e2e_results.get("test_config", {}): + combined["test_config"][key] = e2e_results["test_config"][key] + + with open(results_file, "w") as f: + json.dump(combined, f, indent=2) + + print("\n" + "=" * 60) + print(f"{test_name} e2e_qa_eval Summary") + print("=" * 60) + + by_model = qa_results.get("by_model", {}) + for name, stats in by_model.items(): + print( + f" {name}: mean_score={stats.get('mean_score', 0):.3f} " f"latency={stats.get('mean_latency_s', 0):.2f}s" + ) + + return 0 if qa_rc == 0 else qa_rc + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/harness/src/nv_ingest_harness/cases/qa_eval.py b/tools/harness/src/nv_ingest_harness/cases/qa_eval.py new file mode 100644 index 000000000..57fdddd02 --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/cases/qa_eval.py @@ -0,0 +1,234 @@ +""" +QA evaluation test case - measures LLM answer quality given retrieved context. + +Requires an existing VDB collection (run e2e or e2e_qa_eval first when using +TopKRetriever). For FileRetriever, no collection is needed -- point +qa_retriever_config.file_path at a pre-computed retrieval results JSON. + +Outputs _test_results.json with per-model mean scores, score distributions, +latency stats, and full per-query detail. +""" + +import json +import os +import re + +from nv_ingest_harness.utils.qa.generators import LiteLLMClient +from nv_ingest_harness.utils.qa.ground_truth import get_qa_dataset_loader +from nv_ingest_harness.utils.qa.judges import LLMJudge +from nv_ingest_harness.utils.qa.orchestrator import QAEvalPipeline +from nv_ingest_harness.utils.qa.retrievers import FileRetriever, TopKRetriever + + +def _expand_env_vars(value): + """Recursively expand ${VAR} references in config values.""" + if isinstance(value, str): + return re.sub( + r"\$\{(\w+)\}", + lambda m: os.environ.get(m.group(1), m.group(0)), + value, + ) + if isinstance(value, dict): + return {k: _expand_env_vars(v) for k, v in value.items()} + if isinstance(value, list): + return [_expand_env_vars(item) for item in value] + return value + + +def _build_retriever(config, collection_name: str, model_name: str): + """Construct the appropriate retriever from config.""" + qa_retriever = getattr(config, "qa_retriever", "topk") + qa_retriever_config = getattr(config, "qa_retriever_config", None) or {} + + if qa_retriever == "file": + file_path = qa_retriever_config.get("file_path") + if not file_path: + raise ValueError("qa_retriever=file requires qa_retriever_config.file_path to be set") + return FileRetriever(file_path=file_path) + + # Default: topk -- only import the heavy harness utils when actually needed. + from nv_ingest_harness.utils.vdb import get_lancedb_path + + vdb_backend = config.vdb_backend + table_path = None + if vdb_backend == "lancedb": + table_path = get_lancedb_path(config, collection_name) + + return TopKRetriever( + collection_name=collection_name, + hostname=config.hostname, + model_name=model_name, + sparse=config.sparse, + gpu_search=config.gpu_search, + vdb_backend=vdb_backend, + table_path=table_path, + hybrid=config.hybrid, + ) + + +def _build_llm_clients(qa_llm_configs: list) -> dict: + """Build the name -> LiteLLMClient map from config list.""" + clients = {} + for entry in qa_llm_configs: + entry = _expand_env_vars(entry) + name = entry.get("name") or entry.get("model", "llm") + clients[name] = LiteLLMClient( + model=entry["model"], + api_base=entry.get("api_base"), + api_key=entry.get("api_key"), + temperature=entry.get("temperature", 0.0), + max_tokens=entry.get("max_tokens", 512), + extra_params=entry.get("extra_params", {}), + ) + return clients + + +def _build_judge(qa_judge_config: dict) -> LLMJudge: + """Build the LLMJudge from config dict.""" + cfg = _expand_env_vars(qa_judge_config) + return LLMJudge( + model=cfg.get("model", "nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1"), + api_base=cfg.get("api_base"), + api_key=cfg.get("api_key"), + extra_params=cfg.get("extra_params", {}), + ) + + +def main(config=None, log_path: str = "test_results") -> int: + """ + Main entry point for standalone QA evaluation. + + Args: + config: TestConfig object with qa_* fields populated. + log_path: Directory for result artifacts. + + Returns: + 0 on success, non-zero on error. + """ + if config is None: + print("ERROR: No configuration provided") + return 2 + + qa_dataset = getattr(config, "qa_dataset", None) + if not qa_dataset: + print("ERROR: qa_dataset must be specified in configuration") + print("Set qa_dataset in test_configs.yaml qa_eval section or via QA_DATASET env var") + return 1 + + qa_llm_configs = getattr(config, "qa_llm_configs", None) or [] + if not qa_llm_configs: + print("ERROR: qa_llm_configs must be specified with at least one LLM") + return 1 + + qa_judge_config = getattr(config, "qa_judge_config", None) or {} + if not qa_judge_config: + print("WARNING: qa_judge_config not set, using default judge model") + qa_judge_config = {} + + qa_top_k = getattr(config, "qa_top_k", 5) + qa_max_workers = getattr(config, "qa_max_workers", 8) + qa_retriever = getattr(config, "qa_retriever", "topk") + ground_truth_dir = getattr(config, "ground_truth_dir", None) + + # Derive collection name (same logic as recall.py for consistency). + # Only import the heavy harness utils when actually running topk mode. + test_name = config.test_name or os.path.basename(config.dataset_dir.rstrip("/")) + if qa_retriever == "topk": + from nv_ingest_harness.utils.recall import get_recall_collection_name + from nv_ingest_harness.utils.interact import embed_info + + collection_name = config.collection_name or get_recall_collection_name(test_name) + model_name, _ = embed_info() + else: + collection_name = config.collection_name or test_name + model_name = None + + print("=" * 60) + print("QA Evaluation Configuration") + print("=" * 60) + print(f"Dataset: {qa_dataset}") + print(f"Retriever: {qa_retriever}") + print(f"Collection: {collection_name}") + print(f"VDB Backend: {config.vdb_backend}") + print(f"Top-K: {qa_top_k}") + print(f"Max Workers: {qa_max_workers}") + print(f"LLMs: {[e.get('name', e.get('model', '?')) for e in qa_llm_configs]}") + print(f"Judge: {qa_judge_config.get('model', '(default)')}") + print("=" * 60) + + try: + # Load ground truth -- all loaders accept data_dir uniformly + loader = get_qa_dataset_loader(qa_dataset) + qa_pairs = loader(data_dir=ground_truth_dir) + + print(f"\nLoaded {len(qa_pairs)} Q&A pairs from '{qa_dataset}'") + + # Build pipeline components + retriever = _build_retriever(config, collection_name, model_name) + if hasattr(retriever, "check_coverage"): + coverage = retriever.check_coverage(qa_pairs) + if coverage < 0.5: + print( + f"WARNING: retrieval file covers only {coverage:.0%} of queries -- " + "results will be unreliable. Check that the retrieval JSON was " + "generated from the same query set." + ) + + llm_clients = _build_llm_clients(qa_llm_configs) + judge = _build_judge(qa_judge_config) + + pipeline = QAEvalPipeline( + retriever=retriever, + llm_clients=llm_clients, + judge=judge, + top_k=qa_top_k, + max_workers=qa_max_workers, + ) + + print(f"\nRunning QA evaluation ({len(qa_pairs)} queries, {len(llm_clients)} LLMs)...") + eval_results = pipeline.evaluate(qa_pairs) + + # Print summary + print("\n" + "=" * 60) + print("QA Evaluation Results") + print("=" * 60) + by_model = eval_results.get("by_model", {}) + for name, stats in by_model.items(): + print(f"\nModel: {name}") + print(f" Mean Score: {stats['mean_score']:.3f} / 5.0") + print(f" Mean Latency: {stats['mean_latency_s']:.2f}s") + print(f" Scored: {stats['scored_count']} / {len(qa_pairs)}") + print(f" Errors: {stats['error_count']}") + dist = stats.get("score_distribution", {}) + print(" Distribution: " + " ".join(f"{k}:{v}" for k, v in sorted(dist.items()))) + + # Write results + os.makedirs(log_path, exist_ok=True) + results_file = os.path.join(log_path, "_test_results.json") + test_results = { + "test_type": "qa_eval", + "dataset": qa_dataset, + "test_name": test_name, + "collection_name": collection_name, + "retriever": qa_retriever, + "top_k": qa_top_k, + "qa_results": eval_results, + } + with open(results_file, "w") as f: + json.dump(test_results, f, indent=2) + + print("\n" + "=" * 60) + print("QA Evaluation Complete") + print("=" * 60) + return 0 + + except Exception as exc: + print(f"ERROR: QA evaluation failed: {exc}") + import traceback + + traceback.print_exc() + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/harness/src/nv_ingest_harness/cli/run.py b/tools/harness/src/nv_ingest_harness/cli/run.py index 51d903fd5..6c92a2f91 100644 --- a/tools/harness/src/nv_ingest_harness/cli/run.py +++ b/tools/harness/src/nv_ingest_harness/cli/run.py @@ -20,6 +20,8 @@ "e2e_with_llm_summary", "recall", "e2e_recall", + "qa_eval", + "e2e_qa_eval", "page_elements", "table_structure", "graphic_elements", @@ -320,6 +322,26 @@ def run_datasets( test_name_for_collection = config.test_name or os.path.basename(config.dataset_dir.rstrip("/")) config.collection_name = get_recall_collection_name(test_name_for_collection) + # For qa_eval case, validate qa_dataset + if case in ("qa_eval", "e2e_qa_eval"): + qa_dataset = getattr(config, "qa_dataset", None) + if not qa_dataset: + print(f"ERROR: Dataset '{dataset_name}' does not have qa_dataset configured", file=sys.stderr) + print(f" This dataset cannot be used with --case={case}", file=sys.stderr) + print( + " Set qa_dataset in test_configs.yaml datasets section or qa_eval section", + file=sys.stderr, + ) + results.append({"dataset": dataset_name, "status": "config_error", "rc": 1, "artifact_dir": "N/A"}) + continue + + # Set collection_name for standalone qa_eval (no preceding e2e run) + if case == "qa_eval" and not config.collection_name: + from nv_ingest_harness.utils.recall import get_recall_collection_name + + test_name_for_collection = config.test_name or os.path.basename(config.dataset_dir.rstrip("/")) + config.collection_name = get_recall_collection_name(test_name_for_collection) + # Run the test case if case in CASES: rc = run_case(case, stdout_path, config, doc_analysis) diff --git a/tools/harness/src/nv_ingest_harness/config.py b/tools/harness/src/nv_ingest_harness/config.py index 420533525..b3b38e805 100644 --- a/tools/harness/src/nv_ingest_harness/config.py +++ b/tools/harness/src/nv_ingest_harness/config.py @@ -96,6 +96,23 @@ class TestConfig: ground_truth_dir: Optional[str] = None recall_dataset: Optional[str] = None + # QA evaluation configuration + # qa_dataset: dataset key ("bo767_infographic") or HF dataset id ("vidore/vidore_v3_finance_en") + qa_dataset: Optional[str] = None + # qa_top_k: number of chunks to retrieve per query + qa_top_k: int = 5 + # qa_max_workers: thread pool size for concurrent API calls + qa_max_workers: int = 8 + # qa_retriever: "topk" (queries an existing VDB collection) or "file" (reads JSON) + qa_retriever: str = "topk" + # qa_retriever_config: extra config for the retriever (e.g. {"file_path": "..."} for FileRetriever) + qa_retriever_config: Optional[dict] = None + # qa_llm_configs: list of LLM configs, each with {name, model, api_base, extra_params, ...} + # model uses litellm provider-prefix routing (nvidia_nim/..., openai/..., huggingface/...) + qa_llm_configs: Optional[List[dict]] = None + # qa_judge_config: judge LLM config {model, api_base, ...} + qa_judge_config: Optional[dict] = None + def validate(self) -> List[str]: """Validate configuration and return list of errors""" errors = [] @@ -240,6 +257,12 @@ def load_config(config_file: str = "test_configs.yaml", case: Optional[str] = No # Merge recall section (recall section overrides active section for conflicts) config_dict.update(recall_section) + # Merge qa_eval section when running QA evaluation test cases + if case in ("qa_eval", "e2e_qa_eval"): + qa_eval_section = yaml_data.get("qa_eval", {}) + if qa_eval_section: + config_dict.update(qa_eval_section) + # Handle dataset shortcuts and apply dataset-specific extraction configs if "dataset" in cli_overrides: dataset_name = cli_overrides.pop("dataset") @@ -346,6 +369,10 @@ def parse_list(value: str) -> List[str]: "RECALL_TOP_K": ("recall_top_k", parse_int), "GROUND_TRUTH_DIR": ("ground_truth_dir", str), "RECALL_DATASET": ("recall_dataset", str), + "QA_DATASET": ("qa_dataset", str), + "QA_TOP_K": ("qa_top_k", parse_int), + "QA_MAX_WORKERS": ("qa_max_workers", parse_int), + "QA_RETRIEVER": ("qa_retriever", str), } overrides = {} diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/README.md b/tools/harness/src/nv_ingest_harness/utils/qa/README.md new file mode 100644 index 000000000..85210bd4a --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/README.md @@ -0,0 +1,774 @@ +# QA Evaluation Pipeline + +**This document is the canonical QA evaluation guide** for the harness. The top-level [harness README](../../../../README.md) (`tools/harness/README.md`) only summarizes and links here so we avoid duplicating steps and env vars in two places. + +The evaluation framework lives in **`nemo_retriever.evaluation`** (install with `pip install nemo-retriever[eval]`). The harness `utils/qa/` package only retains `TopKRetriever` (which depends on harness-specific recall utilities). All other evaluation components -- types, scoring, generators, judges, orchestrator, operators -- are imported from `nemo_retriever.evaluation`. + +Measures LLM answer quality over a RAG pipeline: retrieve context from a VDB, generate answers with one or more LLMs, and score each answer against ground-truth references using multi-tier scoring and an LLM-as-judge. + +**Pluggable retrieval:** The eval harness does not care how you retrieved chunks -- only that you produce a JSON file that matches the **[retrieval JSON specification](#retrieval-json-format-interface-contract)** expected by `run_qa_eval.py` / `FileRetriever`. Vector search, hybrid, agentic pipelines, or any custom system can plug in as long as the file format and query strings align with your chosen ground-truth dataset. + +**Default ground truth:** Standalone runs default to **`data/bo767_annotations.csv`** at the repo root -- the **bo767 annotations subset** maintained for this benchmark (multi-modality Q&A over the bo767 PDFs). Override with `QA_DATASET` / `QA_CSV` or another registered loader when comparing different corpora. + +Designed to be **plug-and-play** -- swap retrievers, generators, or judges independently via Python Protocols without touching the orchestrator. + +## Table of Contents + +- [Pipeline File Map and Data Flow](#pipeline-file-map-and-data-flow) +- [Reproducing the bo767 Run](#reproducing-the-bo767-run) +- [Quick Start (Harness CLI)](#quick-start-harness-cli) +- [Retrieval JSON Format (Interface Contract)](#retrieval-json-format-interface-contract) +- [Custom Datasets (CSV Loader)](#custom-datasets-csv-loader) +- [Architecture](#architecture) +- [Configuration](#configuration) +- [Scoring System (Three-Tier Hierarchy)](#scoring-system-three-tier-hierarchy) +- [Adding a New Component](#adding-a-new-component) +- [Output Format](#output-format) +- [Dataset Limitations](#dataset-limitations) +- [Standalone Runner (Docker / CI)](#standalone-runner) + +## Pipeline File Map and Data Flow + +End-to-end bo767 + LanceDB + full-page markdown touches these **artifacts** and **library code**: + +| Stage | Artifacts produced | Code / APIs involved | +|-------|-------------------|----------------------| +| **1. Vector index (retrieval)** | `lancedb///` (embedded sub-page chunks) | `python -m nemo_retriever.examples.batch_pipeline` (extract, embed, VDB upload to LanceDB). **Table name must match** `export_retrieval_nemo.py` (`LANCEDB_TABLE`, default `nv-ingest`). | +| **2. Rich extract (parallel path)** | `data/bo767_extracted/*.parquet` | `python -m nemo_retriever.examples.batch_pipeline /path/to/bo767 --extract-only --save-intermediate data/bo767_extracted` — same extract flags, **no** embed/VDB; `save_intermediate_results` preserves table/chart/infographic columns for rendering. | +| **3. Full-page markdown index** | `data/bo767_page_markdown.json` (`source_id` → page → markdown) | `build_page_markdown_index.py` → `nemo_retriever.io.markdown.to_markdown_by_page()`; numpy list columns are coerced so structured content is not dropped. | +| **4. Retrieval export** | `data/test_retrieval/bo767_retrieval_fullpage.json` (or sub-page JSON) | `export_retrieval_nemo.py` → `nemo_retriever.retriever.Retriever` queries LanceDB; if `PAGE_MARKDOWN_INDEX` is set, hits are expanded/deduped by `(source_id, page)` and replaced with full-page markdown strings. | +| **5. Ground truth** | `data/bo767_annotations.csv` (repo root) | Questions/answers for export and eval; must align with **query string normalization** in `FileRetriever` (see retrieval JSON rules). | +| **6. Evaluation** | `qa_results_*.json` | `run_qa_eval.py` → `nemo_retriever.evaluation`: `FileRetriever`, `QAEvalPipeline`, `LiteLLMClient`, `LLMJudge`, scoring functions (tier 1-2 + failure modes). | + +**Data flow (conceptual):** PDFs → (A) **chunked embeddings in LanceDB** for similarity search; (B) **Parquet** for full-page reconstruction. **Export** runs search on (A), then **replaces** hit chunks with pages from (B) via the index. **Eval** never talks to LanceDB -- it only reads the retrieval JSON + ground-truth CSV. + +``` + NeMo Retriever (steps 1-3) Universal (step 4) + ────────────────────────── ────────────────── + Step 1 Step 2 Step 3 + Ingest + Extract Index Export QA Eval ++-------------------------+ +----------+ +----------+ +-----------------+ +| batch_pipeline | | Parquet | | LanceDB | | For each query: | +| --save-intermediate |->| -> page | | queries |->| generate(LLM) | +| --lancedb-uri lancedb | | markdown | | + pages | | judge(LLM) | +| (extract+embed+upload) | | index | | -> JSON | | score (3 tiers)| ++-------------------------+ +----------+ +----------+ +-----------------+ + | | | | | +lancedb/ *.parquet page_markdown.json retrieval.json qa_results.json + + Bring Your Own Retrieval +---------+--------+ + ───────────────────────── | Any pipeline that | + Skip steps 1-3 entirely. | outputs retrieval | + Produce a JSON matching | JSON (see spec) | + the interface contract. --> +---------+--------+ +``` + +Steps 1-3 are one reference implementation (NeMo Retriever + LanceDB). +Any retrieval system that produces a conforming JSON can replace them. +Step 4 can be re-run with different LLM configs without repeating retrieval. + +**Harness CLI alternative:** `cases/qa_eval.py` + `test_configs.yaml` can drive the same eval with different defaults; align `qa_dataset` and `file_path` with the standalone flow when comparing results. + +## Reproducing the bo767 Run + +Exact commands to reproduce the full-page markdown QA evaluation from scratch. + +**Working directory:** Step 1 (`batch_pipeline.py`) runs from the **repo root**; Steps 2-4 run from `tools/harness/`. + +
+Quick reference (all commands) + +```bash +# ── repo root ── +cd /path/to/nv-ingest + +# 1. Ingest + extract in one pass (~45-90 min) +# Produces LanceDB index AND Parquet files (needed by steps 2 and 3). +python -m nemo_retriever.examples.batch_pipeline /path/to/bo767 \ + --lancedb-uri lancedb \ + --save-intermediate tools/harness/data/bo767_extracted + +# ── tools/harness ── +cd tools/harness + +# 2. Build page markdown index (~5-10 min) +python build_page_markdown_index.py + +# 3. Export retrieval results (~5-15 min) +export PAGE_MARKDOWN_INDEX=data/bo767_page_markdown.json +export OUTPUT_FILE=data/test_retrieval/bo767_retrieval_fullpage.json +python export_retrieval_nemo.py + +# 4. Run QA evaluation (~1-2 hrs) +export NVIDIA_API_KEY="nvapi-..." +export RETRIEVAL_FILE=data/test_retrieval/bo767_retrieval_fullpage.json +export OUTPUT_FILE=data/test_retrieval/qa_results_bo767_annotations.json +export QA_MAX_WORKERS=8 +python run_qa_eval.py +``` + +
+ +### Bring your own retrieval (skip steps 1-3) + +Steps 1-3 below are the **NeMo Retriever + LanceDB** reference implementation +for ingestion, extraction, indexing, and retrieval. If your team already has a +retrieval pipeline (agentic, hybrid, BM25, or any custom system), **skip +steps 1-3 entirely** and produce a retrieval JSON file that conforms to the +[Retrieval JSON Format (Interface Contract)](#retrieval-json-format-interface-contract). +Then proceed directly to [Step 4: Run QA evaluation](#step-4-run-qa-evaluation). + +The only requirement is that your JSON contains a top-level `queries` object +mapping each ground-truth question string to `{ "chunks": ["...", ...] }`. +See the [interface contract](#retrieval-json-format-interface-contract) for the +full schema, required fields, and a worked example. + +### Python environment + +Steps 1-3 (ingest, build index, export) require the **`nemo_retriever`** library with LanceDB, CUDA, and Ray support. Step 4 (QA eval) additionally requires **`litellm`**. These are **not** part of the minimal harness install (`uv pip install -e .`). + +**Recommended setup:** create an isolated Python 3.12 virtual environment and install both stacks: + +```bash +uv venv qa-retriever --python 3.12 +source qa-retriever/bin/activate +uv pip install "nemo_retriever[eval]" +``` + +The `[eval]` extra installs `litellm` for LLM generation and judging. For the full harness install (includes Milvus-lite, nemotron models, etc.), see **Installation** in the [harness README](../../../../README.md) and [`tools/harness/pyproject.toml`](../../pyproject.toml). + +**Eval-only path:** if you already have a retrieval JSON and only need to run `run_qa_eval.py`, an environment with `nemo_retriever[eval]` and the harness package (`cd tools/harness && uv pip install -e .`) is sufficient. + +### Prerequisites (data and keys) + +```bash +# bo767 PDFs (767 files) +ls /path/to/bo767/*.pdf | wc -l # should be 767 + +# Ground truth: data/bo767_annotations.csv (1007 Q&A pairs across all modalities) +# Located at the repo root: /data/bo767_annotations.csv +``` + +### Step 1: Ingest and extract PDFs (NeMo Retriever) + +A single `batch_pipeline` invocation performs extraction, embedding, VDB +upload, **and** saves the raw Parquet files needed by step 2. Add +`--save-intermediate` to the same command -- no separate extraction run is +required. +**Estimated time: ~45-90 min** (767 PDFs, GPU-accelerated extraction + embedding). +Run from the **repo root**: + +```bash +# from repo root -- one command produces both LanceDB index and Parquet files +python -m nemo_retriever.examples.batch_pipeline /path/to/bo767 \ + --lancedb-uri lancedb \ + --save-intermediate tools/harness/data/bo767_extracted +``` + +Output: +- `lancedb/nv-ingest/` (~84k chunks) -- used by step 3 for retrieval queries. +- `tools/harness/data/bo767_extracted/*.parquet` -- used by step 2 for page markdown. + +
+Extract-only alternative (skip embed + LanceDB) + +Use this if you already have your own retrieval stack (agentic, hybrid, +BM25, etc.) and only need the raw Parquet records to build the page markdown +index. Since embedding and LanceDB upload are skipped, this runs faster +(~30-60 min vs ~45-90 min). You would then skip step 3 (export retrieval) +and supply your own retrieval JSON for step 4. + +```bash +python -m nemo_retriever.examples.batch_pipeline /path/to/bo767 \ + --extract-only --save-intermediate tools/harness/data/bo767_extracted +``` + +
+ +### Step 2: Build page markdown index (NeMo Retriever) + +Steps 2-4 are run from `tools/harness/`. + +Groups Parquet records by (document, page number) and renders each page via +`nemo_retriever.io.markdown.to_markdown_by_page()`. Outputs a JSON index +mapping `source_id -> page_number -> markdown`. +**Estimated time: ~5-10 min** (CPU-only, reads Parquet and renders markdown). + +```bash +python build_page_markdown_index.py +``` + +Output: `data/bo767_page_markdown.json` (~180 MB, ~6k pages across 767 docs). + +### Step 3: Export retrieval results (NeMo Retriever) + +Queries LanceDB for each ground-truth question, then looks up the full-page +markdown for each hit's page. Multiple sub-page hits from the same page are +deduplicated into a single full-page chunk. +**Estimated time: ~5-15 min** (1005 LanceDB queries + page index lookup). + +```bash +export PAGE_MARKDOWN_INDEX=data/bo767_page_markdown.json +export OUTPUT_FILE=data/test_retrieval/bo767_retrieval_fullpage.json +python export_retrieval_nemo.py +``` + +| Env Var | Default | Purpose | +|---------|---------|---------| +| `LANCEDB_URI` | `./lancedb` | LanceDB directory | +| `LANCEDB_TABLE` | `nv-ingest` | LanceDB table name | +| `TOP_K` | `5` | Chunks per query | +| `EMBEDDER` | `nvidia/llama-nemotron-embed-1b-v2` | Embedding model | +| `QA_CSV` | `data/bo767_annotations.csv` (repo root) | Ground-truth query/answer CSV | +| `PAGE_MARKDOWN_INDEX` | _(unset)_ | Set to enable full-page mode | +| `OUTPUT_FILE` | `data/test_retrieval/bo767_retrieval.json` | Output path | + +Output: `data/test_retrieval/bo767_retrieval_fullpage.json` (~50 MB, 1005 queries). + +### Step 4: Run QA evaluation + +**Estimated time: ~1-2 hours** (1005 queries, ~12s per query for generation + judge, 8 concurrent workers). + +```bash +export NVIDIA_API_KEY="nvapi-..." +export RETRIEVAL_FILE=data/test_retrieval/bo767_retrieval_fullpage.json +export OUTPUT_FILE=data/test_retrieval/qa_results_bo767_annotations.json +export QA_MAX_WORKERS=8 +python run_qa_eval.py +``` + +| Env Var | Default | Purpose | +|---------|---------|---------| +| `RETRIEVAL_FILE` | _(required)_ | Retrieval JSON from step 3 | +| `NVIDIA_API_KEY` | _(required)_ | API key for NVIDIA NIM endpoints | +| `QA_DATASET` | `csv:data/bo767_annotations.csv` (repo root) | Ground-truth dataset | +| `QA_TOP_K` | `5` | Chunks per query | +| `QA_MAX_WORKERS` | `4` | Concurrent API calls | +| `QA_LIMIT` | `0` (all) | Evaluate only first N queries | +| `OUTPUT_FILE` | `/tmp/qa_results.json` | Where to write results JSON | +| `GEN_MODEL` | `nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5` | Generator (single) | +| `GEN_MODELS` | _(unset)_ | Multi-model sweep: `name:model,...` (overrides `GEN_MODEL`) | +| `JUDGE_MODEL` | `nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1` | Judge model | +| `LITELLM_DEBUG` | `0` | Set `1` for full request/response logging | + +**API cost:** Each query costs ~$0.01-0.02 (generation + judge) on NIM pay-as-you-go. A full 1005-query run is approximately $10-20. Set `QA_LIMIT` to cap during development. + +### Results (March 2026 -- full-page markdown, bo767_annotations.csv) + +``` +1005 queries evaluated (Nemotron Super 49B generator, Mixtral 8x22B judge) + +Tier 1 - Retrieval Quality: + Answer-in-Context rate: 88.2% (886/1005) + +Tier 2 - Programmatic Answer Quality: + generator exact_match=0.0% token_f1=0.120 + +Tier 3 - LLM Judge: + generator mean=3.74/5 scored=970 errors=35 + dist: 1:251 2:44 3:28 4:26 5:621 + +Failure Breakdown: + correct: 647 no_context: 132 generation_miss: 89 + partial: 65 retrieval_miss: 37 thinking_truncated: 35 +``` + +**Interpretation:** 88% of queries had the answer present in the retrieved +chunks (Tier 1). The generator answered correctly ~64% of the time. The gap +is primarily due to `no_context` (model said "not found" when it was there) +and `generation_miss` (model had the context but answered incorrectly). + +### Sub-page chunk mode + +To skip full-page markdown and use raw sub-page chunks instead, omit step 2 +and do not set `PAGE_MARKDOWN_INDEX` in step 3. This produces smaller context +windows and may result in lower scores for queries that span structured content +(tables, charts, infographics). + +## Quick Start (Harness CLI) + +```bash +cd tools/harness + +# 1. Ingest PDFs into VDB +uv run nv-ingest-harness-run --case=e2e --dataset=bo767 + +# 2. Export retrieval results for all ground-truth queries +uv run python retrieve_and_export.py + +# 3. Run QA evaluation (requires NVIDIA_API_KEY) +export NVIDIA_API_KEY="nvapi-..." +export NVIDIA_NIM_API_KEY="$NVIDIA_API_KEY" +uv run nv-ingest-harness-run --case=qa_eval --dataset=bo767 +``` + +Results are written to `artifacts//_test_results.json`. + +## Retrieval JSON Format (Interface Contract) + +The retrieval JSON is the **only interface** between your retrieval system and the +QA eval harness. Any retrieval method -- vector search, agentic retrieval, hybrid +pipelines, BM25, reranked, or a fully custom system -- can plug in by producing +a single JSON file that **matches this specification** (what `run_qa_eval.py` loads via `FileRetriever`). The harness takes it from there: generates answers with one +or more LLMs and scores them with the judge. If your JSON does not match, the +eval script will not load or align queries correctly. + +### Minimal format (all you need) + +```json +{ + "queries": { + "What is the range of the 767?": { + "chunks": ["First retrieved chunk text...", "Second chunk text..."] + }, + "How many engines does it have?": { + "chunks": ["The 767 is powered by two..."] + } + } +} +``` + +Rules: +- **`"queries"`** (required): dict mapping query strings to result objects. +- **`"chunks"`** (required per query): list of plain-text strings, one per retrieved passage. Order matters -- put the best/most relevant chunk first. The harness uses the first `qa_top_k` entries (default 5). +- **`"metadata"`** (optional per query): list of per-chunk provenance dicts (e.g. `{"source_id": "file.pdf", "page_number": 3}`). Carried through to the results JSON for traceability but not used for scoring. +- Top-level **`"metadata"`** (optional): free-form dict for your records (retrieval method, model, timing, etc.). Ignored by FileRetriever. +- Query matching is normalized (NFKC unicode, case-folded, whitespace-collapsed) so trivial formatting differences between the ground-truth CSV and the retrieval JSON don't cause misses. + +### Full format example + +```json +{ + "metadata": { + "retrieval_method": "agentic_rag", + "model": "nvidia/llama-nemotron-embed-1b-v2", + "top_k": 5, + "notes": "Used multi-step agent with query decomposition" + }, + "queries": { + "What percentage of infections occur without eyewear?": { + "chunks": [ + "According to the infographic, 16% of infections...", + "Protective eyewear reduces transmission by..." + ], + "metadata": [ + {"source_id": "1000360.pdf", "page_number": 3, "distance": 0.31}, + {"source_id": "1000360.pdf", "page_number": 3, "distance": 0.45} + ] + } + } +} +``` + +### Using it with the harness + +```bash +# Point the harness at your retrieval JSON and run +export RETRIEVAL_FILE="path/to/my_retrieval_results.json" +export NVIDIA_API_KEY="nvapi-..." +python run_qa_eval.py +``` + +This means you can compare retrieval strategies head-to-head by running +the same eval against different retrieval JSONs -- the generator and judge +stay constant, so any score difference is purely from retrieval quality. + +## Custom Datasets (CSV Loader) + +Bring your own Q&A dataset without writing code. Any CSV with `query` and `answer` columns works: + +```csv +query,answer,category +"What is the capital of France?","Paris","geography" +"What year was Python released?","1991","tech" +``` + +Point the harness at it with the `csv:` prefix: + +```bash +export QA_DATASET="csv:/path/to/my_questions.csv" +export RETRIEVAL_FILE="path/to/my_retrieval.json" +python run_qa_eval.py +``` + +All columns beyond `query` and `answer` are preserved as metadata in the output. Rows with empty query or answer are silently skipped. + +Built-in datasets: `bo767_infographic`, `vidore/`, `csv:/path/to/file.csv`. The default dataset is `csv:data/bo767_annotations.csv` (1007 Q&A pairs across text, table, chart, and infographic modalities). + +## Architecture + +``` +nemo_retriever.evaluation.orchestrator (QAEvalPipeline) + | + |-- retriever : RetrieverStrategy protocol + | |-- FileRetriever (cached JSON -- recommended, in nemo_retriever.evaluation) + | |-- TopKRetriever (live VDB query -- in nv_ingest_harness.utils.qa) + | + |-- llm_clients : dict[str, LLMClient protocol] + | |-- LiteLLMClient (NVIDIA NIM, OpenAI, vLLM, Ollama) + | + |-- judge : AnswerJudge protocol + |-- LLMJudge (1-5 rubric via LLM-as-judge) + +Standalone operators (DataFrame-in/out, usable without the pipeline): + |-- QAGenerationOperator (wraps LiteLLMClient for batch generation) + |-- JudgingOperator (wraps LLMJudge for batch judging) + |-- score_dataframe() (programmatic scoring -- no LLM cost) +``` + +All three interfaces are Python `Protocol` classes defined in `nemo_retriever.evaluation.types`. +Any object that implements the right method signature works -- no inheritance +required. + +### Files (`nemo_retriever.evaluation`) + +The evaluation framework lives in `nemo_retriever/src/nemo_retriever/evaluation/`. The harness `utils/qa/` package only contains `TopKRetriever` (which depends on harness-specific recall utilities). + +| Module | Purpose | +|--------|---------| +| `types.py` | Protocol definitions (`RetrieverStrategy`, `LLMClient`, `AnswerJudge`) and dataclasses | +| `operator.py` | `AbstractOperator` base class (DataFrame-in/out contract, `setup`/`process`/`teardown` lifecycle) | +| `retrievers.py` | `FileRetriever` (cached JSON with normalized query matching) | +| `generators.py` | `LiteLLMClient` -- unified LLM client via litellm (NIM, OpenAI, vLLM, HF) | +| `judges.py` | `LLMJudge` -- 1-5 scoring with key-term anchoring rubric | +| `scoring.py` | Programmatic scoring: `answer_in_context`, `token_f1`, `classify_failure`, `score_dataframe` | +| `orchestrator.py` | `QAEvalPipeline` -- multi-tier orchestrator with `evaluate()` (dict API) and `process()` (DataFrame API) | +| `generation.py` | `QAGenerationOperator` -- standalone DataFrame-in/out answer generation operator | +| `judging.py` | `JudgingOperator` -- standalone DataFrame-in/out LLM-as-judge operator | +| `ground_truth.py` | Dataset loaders: `bo767_infographic`, `vidore/*`, and generic `csv:` loader | +| `text_utils.py` | Shared text processing (`strip_think_tags`) | + +| Harness module | Purpose | +|----------------|---------| +| `nv_ingest_harness.utils.qa.TopKRetriever` | Live VDB retriever (Milvus/LanceDB, depends on harness recall utilities) | + +### Entry Points + +| Entry Point | Use Case | +|-------------|----------| +| `python -m nemo_retriever.examples.batch_pipeline` | Ingest PDFs into LanceDB (extract + embed + VDB upload). Use `--extract-only --save-intermediate` for Parquet-only extraction. | +| `build_page_markdown_index.py` | Build full-page markdown index from Parquet | +| `export_retrieval_nemo.py` | Export retrieval from NeMo Retriever LanceDB (supports full-page markdown) | +| `run_qa_eval.py` | Standalone QA eval runner -- reads env vars, no harness CLI needed | +| `retrieve_and_export.py` | Export retrieval via harness stack (Milvus or LanceDB) | +| `cases/qa_eval.py` | Harness CLI integration (`--case=qa_eval`) -- reads `test_configs.yaml` | + +## Configuration + +All QA settings live in the `qa_eval` section of `test_configs.yaml`: + +```yaml +qa_eval: + qa_dataset: csv:data/bo767_annotations.csv # ground-truth dataset (or bo767_infographic, vidore/...) + qa_top_k: 5 # chunks per query + qa_retriever: file # "file" or "topk" + qa_retriever_config: + file_path: data/test_retrieval/bo767_retrieval_fullpage.json + qa_max_workers: 8 # concurrent threads + + qa_llm_configs: + - name: nemotron_super_49b + model: nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5 + api_key: ${NVIDIA_API_KEY} + - name: qwen3_next_80b + model: nvidia_nim/qwen/qwen3-next-80b-a3b-instruct + api_key: ${NVIDIA_API_KEY} + + qa_judge_config: + model: nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1 + api_key: ${NVIDIA_API_KEY} +``` + +### Model Strings + +LiteLLM routes by prefix: + +| Prefix | Provider | Example | +|--------|----------|---------| +| `nvidia_nim/` | NVIDIA NIM (build.nvidia.com) | `nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5` | +| `openai/` | OpenAI or any OpenAI-compatible server | `openai/gpt-4o` | +| `huggingface/` | HuggingFace Inference Endpoints | `huggingface/meta-llama/Llama-3-70b-instruct` | + +For local vLLM/Ollama, use `openai/` with `api_base: http://localhost:8000/v1`. + +### Environment Variables + +| Variable | Used By | Purpose | +|----------|---------|---------| +| `NVIDIA_API_KEY` | Config expansion (`${NVIDIA_API_KEY}`) | API key for NIM models | +| `NVIDIA_NIM_API_KEY` | litellm's `nvidia_nim` provider | Alias -- set to same value as above | + +## Scoring System (Three-Tier Hierarchy) + +Each (query, model) pair is scored by three independent tiers. Each tier tests a different layer of the RAG pipeline: + +| Tier | Name | What it measures | Method | Key question | +|------|------|-----------------|--------|-------------| +| **1** | Retrieval quality | Did the retriever return chunks containing the answer? | Programmatic | "Are the reference answer's content words present in the retrieved context?" | +| **2** | Answer quality (token) | Does the generated answer overlap with the reference at the token level? | Programmatic | "How precisely did the model reproduce the expected facts?" | +| **3** | Answer quality (semantic) | Is the generated answer factually correct given the reference? | LLM-as-judge | "Did the model get the right facts, allowing paraphrasing and numeric equivalence?" | + +### Tier 1: `answer_in_context` (retrieval signal) + +Returns `True` if >= 50% of the reference answer's **content words** (stopwords removed, normalized) appear in the concatenated retrieved chunks. If `False`, the retriever failed to surface the answer -- any generation failure is a retrieval miss, not a model miss. + +### Tier 2: `token_f1` (SQuAD-style token metrics) + +After normalizing both reference and candidate (lowercase, strip punctuation, numeric equivalence like `16.00%` -> `16%`): + +| Field | Formula | Meaning | +|-------|---------|---------| +| `precision` | `common_tokens / candidate_tokens` | Fraction of the model's output tokens that are relevant | +| `recall` | `common_tokens / reference_tokens` | Fraction of the reference's tokens the model captured | +| `f1` | `2 * P * R / (P + R)` | Harmonic mean of precision and recall | +| `exact_match` | `normalized_ref == normalized_candidate` | Strict string equality after normalization | + +### Tier 3: `judge_score` (LLM-as-judge, 1-5 scale) + +An LLM scores the candidate against the reference using a structured rubric: + +| Score | Label | Criteria | +|-------|-------|----------| +| **5** | Fully correct | All required facts present | +| **4** | Nearly correct | Nearly all facts present; one minor trivial difference | +| **3** | Mostly correct | Most facts present but at least one non-trivial fact missing or slightly wrong | +| **2** | Partially correct | Some facts present but core answer incomplete or has a significant error | +| **1** | Incorrect | None/almost none of the required facts; includes wrong answer, irrelevant response, or "context does not contain" when the answer exists | + +
+Current judge prompt + +**System:** + +``` +You are an expert evaluator for factual question answering. + +You will receive a QUESTION, a REFERENCE answer, and a CANDIDATE answer. + +Step 1 -- Identify required facts: + Break the REFERENCE into its key terms: specific numbers, names, dates, + percentages, units, or short phrases that constitute the factual core. + Example: "16% of adults" -> required facts = ["16%", "adults"]. + +Step 2 -- Check each required fact in the CANDIDATE: + - Allow numeric equivalence: "16.00%" = "16%", "1,000" = "1000". + - Allow paraphrasing: "Peers" matches "Peers of those adults". + - Allow additional correct detail: extra facts do NOT reduce the score. + - Short but correct answers are fine: "Peers" is valid for "Peers". + +Step 3 -- Score on a 1-5 scale based on the fraction of required facts present: + 5 - All required facts present. Answer is fully correct. + 4 - Nearly all required facts present. One minor fact may differ trivially. + 3 - Most required facts present but at least one non-trivial fact is missing + or slightly wrong. + 2 - Some required facts present but the core answer is incomplete or has a + significant factual error. + 1 - None or almost none of the required facts present. + +Respond ONLY with valid JSON: +{"score": , "reasoning": ""} +``` + +**User:** + +``` +Question: {query} + +Reference answer: {reference} + +Candidate answer: {candidate} +``` + +
+ +### Output fields per query + +| Field | Source | Type | Description | +|-------|--------|------|-------------| +| `query` | Input | `str` | The ground-truth question | +| `reference_answer` | Input | `str` | Ground-truth answer from the dataset | +| `context` | Tier 1 | `list[str]` | Retrieved chunks passed to the generator | +| `answer_in_context` | **Tier 1** | `bool` | >= 50% of reference content words found in chunks | +| `answer` | Generation | `str` | The model's generated answer | +| `gen_latency_s` | Generation | `float` | Generation API call latency (seconds) | +| `gen_error` | Generation | `str?` | Error string if generation failed, else `null` | +| `token_f1` | **Tier 2** | `float` | SQuAD-style F1 (0.0-1.0) | +| `exact_match` | **Tier 2** | `bool` | Normalized exact string match | +| `judge_score` | **Tier 3** | `int` | LLM judge score (1-5) | +| `judge_reasoning` | **Tier 3** | `str` | One-sentence explanation from the judge | +| `judge_error` | Tier 3 | `str?` | Error string if judging failed, else `null` | +| `failure_mode` | Derived | `str` | Classification (see below) | + +### Failure mode classification + +Combines Tier 1 + Tier 3 to diagnose **where** a failure occurred: + +| `failure_mode` | Condition | +|----------------|-----------| +| `correct` | `judge_score >= 4` | +| `partial` | `judge_score` 2-3, answer does not say "no context" | +| `no_context` | `judge_score` 1-3, answer says "context does not contain..." | +| `retrieval_miss` | `judge_score <= 1` AND `answer_in_context == False` | +| `generation_miss` | `judge_score <= 1` AND `answer_in_context == True` (retriever had it, model missed) | +| `thinking_truncated` | `gen_error == "thinking_truncated"` or `judge_score` is `null` | + +### Extending the scoring system + +All scoring is applied by `score_dataframe()` in `nemo_retriever.evaluation.scoring`. To add a custom metric: + +1. **Add a scoring function** in `scoring.py` (or your own module) that takes a reference answer, candidate answer, and/or context and returns a value: + +```python +def my_custom_metric(reference: str, candidate: str) -> float: + """Example: character-level Levenshtein similarity.""" + # your logic here + return similarity_score +``` + +2. **Wire it into `score_dataframe()`** by appending a new column in the iteration loop, or create your own post-processing function that takes the output DataFrame and adds columns: + +```python +def add_custom_scores(df: pd.DataFrame) -> pd.DataFrame: + df = df.copy() + df["my_metric"] = df.apply( + lambda row: my_custom_metric(row["reference_answer"], row["answer"]), + axis=1, + ) + return df +``` + +3. **Apply after the pipeline** -- `QAEvalPipeline.process()` returns a DataFrame. Chain your scoring on top: + +```python +result_df = pipeline.process(input_df) +result_df = add_custom_scores(result_df) +``` + +The same pattern works for custom failure classifiers, alternative judge prompts (subclass `LLMJudge` and override the system prompt), or entirely new tiers. All components follow the Protocol pattern -- no inheritance or registration required. + +## Adding a New Component + +### Custom Retriever + +```python +from nemo_retriever.evaluation.types import RetrieverStrategy, RetrievalResult + +class MyRetriever: + def retrieve(self, query: str, top_k: int) -> RetrievalResult: + chunks = my_search(query, top_k) + return RetrievalResult(chunks=chunks, metadata=[]) +``` + +### Custom LLM Client + +```python +from nemo_retriever.evaluation.types import LLMClient, GenerationResult + +class MyClient: + def generate(self, query: str, chunks: list[str]) -> GenerationResult: + answer = my_llm(query, chunks) + return GenerationResult(answer=answer, latency_s=0.0, model="my-model") +``` + +### Custom Judge + +```python +from nemo_retriever.evaluation.types import AnswerJudge, JudgeResult + +class MyJudge: + def judge(self, query: str, reference: str, candidate: str) -> JudgeResult: + score = my_scoring_logic(reference, candidate) + return JudgeResult(score=score, reasoning="...") +``` + +No registration step needed -- pass the instance directly to `QAEvalPipeline`. + +## Output Format + +`_test_results.json` structure: + +```json +{ + "dataset": "csv:data/bo767_annotations.csv", + "retrieval_file": "data/test_retrieval/bo767_retrieval_fullpage.json", + "top_k": 5, + "qa_results": { + "summary": { + "total_submitted": 1005, + "total_completed": 1005, + "dropped_queries": 0 + }, + "tier1_retrieval": { + "answer_in_context_rate": 0.8816, + "answer_in_context_count": 886, + "total": 1005 + }, + "tier2_programmatic": { + "generator": { + "mean_exact_match": 0.0, + "mean_token_f1": 0.1196 + } + }, + "tier3_llm_judge": { + "generator": { + "mean_score": 3.74, + "score_distribution": {"1": 251, "2": 44, "3": 28, "4": 26, "5": 621}, + "mean_latency_s": 11.7, + "scored_count": 970, + "error_count": 35 + } + }, + "failure_breakdown": { + "generator": { + "correct": 647, "partial": 65, + "retrieval_miss": 37, "generation_miss": 89, + "thinking_truncated": 35, "no_context": 132 + } + }, + "per_query": [ + { + "query": "How much did Pendleton County spend out of their COVID-19 fund for the month of April 2021?", + "reference_answer": "$205.43", + "retrieved_chunk_count": 2, + "answer_in_context": true, + "token_f1": {"generator": {"exact_match": false, "f1": 0.057, "precision": 0.029, "recall": 1.0}}, + "failure_mode": {"generator": "correct"}, + "retrieved_chunks": ["## Page 2\n\n..."], + "retrieval_metadata": [{"source_id": "1003421.pdf", "page_number": 2, "distance": 1.037}], + "generations": {"generator": {"answer": "...$205.43...", "latency_s": 12.2}}, + "judgements": {"generator": {"score": 5, "reasoning": "The required fact '$205.43' is present..."}} + } + ] + } +} +``` + +## Dataset Limitations + +### bo767_annotations.csv (default) + +The default ground truth (`data/bo767_annotations.csv`) contains 1007 Q&A pairs +across all modalities (text, table, chart, infographic) for 767 bo767 PDFs. + +1. **Short factual answers**: Most reference answers are 1-5 words (e.g., "$205.43", "5"). Tier 2 programmatic metrics (exact match, F1) carry strong signal. For open-ended datasets, Tier 3 (LLM judge) becomes primary. + +2. **Retrieval =/= QA quality**: A retrieval method that returns the correct page may still get a low QA score if the extracted text is garbled or incomplete. Always check Tier 1 first -- if `answer_in_context_rate` is low, the problem is retrieval or extraction, not the generator. + +3. **Full-page markdown recommended**: Sub-page chunks may split structured content (tables, charts) across multiple records. The full-page markdown pipeline (step 2 in reproduction) reconstructs complete pages, matching the research team's approach and improving generation accuracy. + +4. **Reasoning model truncation**: Models with extended thinking (e.g., Nemotron Super) may spend their token budget reasoning and never produce a final answer. The pipeline detects this (`thinking_truncated`) and nullifies the score. + +5. **`no_context` failures**: The model sometimes responds "no information found" even when the answer is in the retrieved chunks. This accounts for ~13% of queries in the current run and is a generator behavior issue, not a retrieval problem. + +## Standalone Runner + +For Docker or CI environments without the full harness CLI: + +```bash +RETRIEVAL_FILE=/data/bo767_retrieval.json \ +NVIDIA_API_KEY=nvapi-... \ +python run_qa_eval.py +``` + +All config is via environment variables. See the docstring in `run_qa_eval.py` for +the full list. diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/__init__.py b/tools/harness/src/nv_ingest_harness/utils/qa/__init__.py new file mode 100644 index 000000000..3ff22b943 --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/__init__.py @@ -0,0 +1,26 @@ +""" +QA evaluation utilities for nv-ingest test harness. + +Provides pluggable retrieval, generation, judging, and orchestration +components for measuring LLM answer quality given retrieved context. +""" + +from nv_ingest_harness.utils.qa.types import ( + AnswerJudge, + GenerationResult, + JudgeResult, + LLMClient, + QAQueryResult, + RetrievalResult, + RetrieverStrategy, +) + +__all__ = [ + "RetrieverStrategy", + "LLMClient", + "AnswerJudge", + "RetrievalResult", + "GenerationResult", + "JudgeResult", + "QAQueryResult", +] diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/generators.py b/tools/harness/src/nv_ingest_harness/utils/qa/generators.py new file mode 100644 index 000000000..a24a56890 --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/generators.py @@ -0,0 +1,176 @@ +""" +LLM answer generation client for the QA evaluation pipeline. + +LiteLLMClient wraps the litellm library which provides a single interface +for routing to NVIDIA NIM, OpenAI, HuggingFace Inference Endpoints, and +local vLLM / Ollama servers via a model name prefix convention: + + nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5 -> NVIDIA NIM + openai/gpt-4o -> OpenAI + openai/my-model -> any OpenAI-spec server (+ api_base) + huggingface/meta-llama/Llama-3-70b-instruct -> HF Inference Endpoints + +Provider-specific API keys are read automatically from environment variables +(NVIDIA_API_KEY, OPENAI_API_KEY, etc.). Do not embed keys in config files. +""" + +from __future__ import annotations + +import re +import time +from typing import Any, Optional + +from nv_ingest_harness.utils.qa.types import GenerationResult + + +def strip_think_tags(text: str) -> str: + """Remove ... reasoning blocks from model output. + + Handles both closed tags (...) and unclosed tags where the + model hit the token limit mid-reasoning and never emitted . + Returns empty string if nothing remains after stripping so callers can + detect thinking_truncated. + """ + stripped = re.sub(r".*?", "", text, flags=re.DOTALL) + stripped = re.sub(r".*", "", stripped, flags=re.DOTALL) + return stripped.strip() + + +_RAG_SYSTEM_PROMPT = ( + "You are a precise question-answering assistant. " + "Answer the question using ONLY the information provided in the context below. " + "If the context does not contain enough information to answer, say so clearly. " + "Be concise and factual." +) + +_RAG_USER_TEMPLATE = """\ +Context: +{context} + +Question: {query} + +Answer:""" + + +def _build_rag_prompt(query: str, chunks: list[str]) -> list[dict]: + """Build the OpenAI-style messages list for a RAG prompt.""" + context = "\n\n---\n\n".join(chunks) if chunks else "(no context retrieved)" + user_content = _RAG_USER_TEMPLATE.format(context=context, query=query) + return [ + {"role": "system", "content": _RAG_SYSTEM_PROMPT}, + {"role": "user", "content": user_content}, + ] + + +class LiteLLMClient: + """ + Unified LLM client backed by litellm. + + A single model string change routes to any supported provider: + - NVIDIA NIM: nvidia_nim// + - OpenAI: openai/ + - Any OpenAI-compatible server (vLLM, Ollama): openai/ + api_base + - HuggingFace: huggingface// + + Provider API keys are read from environment variables automatically + (NVIDIA_API_KEY, OPENAI_API_KEY, HUGGINGFACE_API_KEY, etc.). + + Args: + model: litellm model string with provider prefix. + api_base: Override endpoint URL for private / local deployments. + api_key: Explicit API key (prefer env vars; only use for non-standard setups). + temperature: Sampling temperature (0.0 = deterministic). + max_tokens: Maximum tokens in the generated response. + extra_params: Additional kwargs forwarded verbatim to litellm.completion. + Use this for provider-specific options such as reasoning mode: + {"thinking": {"type": "enabled", "budget_tokens": 2048}} + num_retries: Number of retry attempts on transient errors. + """ + + def __init__( + self, + model: str, + api_base: Optional[str] = None, + api_key: Optional[str] = None, + temperature: float = 0.0, + max_tokens: int = 4096, + extra_params: Optional[dict[str, Any]] = None, + num_retries: int = 3, + ): + self.model = model + self.api_base = api_base + self.api_key = api_key + self.temperature = temperature + self.max_tokens = max_tokens + self.extra_params = extra_params or {} + self.num_retries = num_retries + + def complete(self, messages: list[dict], max_tokens: Optional[int] = None) -> tuple[str, float]: + """ + Raw litellm completion call. Returns (content_text, latency_s). + + This is the single place where the litellm API is called. Both + generate() and external callers (e.g. LLMJudge) use this method so + retry logic, auth, and extra_params stay in one place. + + Args: + messages: OpenAI-style messages list. + max_tokens: Override max_tokens for this call (uses self.max_tokens if None). + + Returns: + Tuple of (response text, wall-clock latency in seconds). + + Raises: + Exception: Re-raises litellm errors after exhausting retries. + """ + import litellm + + call_kwargs: dict[str, Any] = { + "model": self.model, + "messages": messages, + "temperature": self.temperature, + "max_tokens": max_tokens if max_tokens is not None else self.max_tokens, + "num_retries": self.num_retries, + } + if self.api_base: + call_kwargs["api_base"] = self.api_base + if self.api_key: + call_kwargs["api_key"] = self.api_key + call_kwargs.update(self.extra_params) + + t0 = time.monotonic() + response = litellm.completion(**call_kwargs) + latency = time.monotonic() - t0 + content = (response.choices[0].message.content or "").strip() + return content, latency + + def generate(self, query: str, chunks: list[str]) -> GenerationResult: + """ + Generate an answer for the given query using retrieved chunks as context. + + Args: + query: The question to answer. + chunks: Retrieved text chunks providing context. + + Returns: + GenerationResult with answer text and wall-clock latency. + """ + messages = _build_rag_prompt(query, chunks) + try: + raw_answer, latency = self.complete(messages) + answer = strip_think_tags(raw_answer) + if not answer: + return GenerationResult( + answer="", + latency_s=latency, + model=self.model, + error="thinking_truncated", + ) + return GenerationResult(answer=answer, latency_s=latency, model=self.model) + except Exception as exc: + return GenerationResult( + answer="", + latency_s=0.0, + model=self.model, + error=str(exc), + ) diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/ground_truth.py b/tools/harness/src/nv_ingest_harness/utils/qa/ground_truth.py new file mode 100644 index 000000000..35deead1d --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/ground_truth.py @@ -0,0 +1,236 @@ +""" +Ground truth dataset loaders for the QA evaluation pipeline. + +Loaders return a uniform list of dicts with at least "query" and "answer" keys. +Additional metadata columns are preserved for debugging and analysis. + +Supported datasets: + - bo767_infographic: 369 infographic Q&A pairs from the Digital Corpora bo767 corpus. + - vidore/: Any ViDoRe v3 dataset hosted on HuggingFace, loaded via + the `datasets` library (e.g., "vidore/vidore_v3_finance_en"). + - csv:/path/to/file.csv: Any CSV with "query" and "answer" columns. All other + columns are preserved as metadata. + +Registry: + get_qa_dataset_loader("bo767_infographic") -> load_infographic_qa + get_qa_dataset_loader("vidore/...") -> load_vidore_v3_qa + get_qa_dataset_loader("csv:/path/to.csv") -> load_generic_csv (partial) + +To add a new dataset, implement a loader function with signature: + def my_loader(data_dir: Optional[str] = None) -> list[dict] +and register it in get_qa_dataset_loader(). +""" + +from __future__ import annotations + +import csv +import os +from typing import Callable, Optional + + +def load_infographic_qa(data_dir: Optional[str] = None) -> list[dict]: + """ + Load bo767 infographic Q&A pairs from the Digital Corpora CSV. + + The CSV (digital_corpora_infographic_query_answer.csv) contains 369 rows + with human-authored questions and answers for infographic pages from the + bo767 PDF corpus. + + Columns: modality, query, answer, pdf, page + + Args: + data_dir: Directory containing the CSV file. + Defaults to the repo's tools/harness/data/ directory. + + Returns: + List of dicts with keys: query, answer, pdf, page, modality. + """ + if data_dir is None: + # Lazy import: get_repo_root lives in the heavy harness utils; only needed + # when data_dir is not explicitly provided (e.g. inside Docker it always is). + from nv_ingest_harness.utils.cases import get_repo_root + + data_dir = os.path.join(get_repo_root(), "tools", "harness", "data") + + csv_path = os.path.join(data_dir, "digital_corpora_infographic_query_answer.csv") + if not os.path.exists(csv_path): + raise FileNotFoundError( + f"Infographic QA CSV not found at {csv_path}. " + "Expected at tools/harness/data/digital_corpora_infographic_query_answer.csv" + ) + + required = {"query", "answer"} + records = [] + with open(csv_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + missing = required - set(reader.fieldnames or []) + if missing: + raise ValueError(f"Infographic QA CSV missing required columns: {missing}") + for row in reader: + query = row.get("query", "").strip() + answer = row.get("answer", "").strip() + if query and answer: + records.append({**row, "query": query, "answer": answer}) + + return records + + +def load_vidore_v3_qa(dataset_name: str, cache_dir: Optional[str] = None) -> list[dict]: + """ + Load Q&A pairs from a ViDoRe v3 dataset hosted on HuggingFace. + + ViDoRe v3 datasets include human-verified reference answers in the + "queries" split alongside the query text. This loader extracts those + pairs in the same format as load_infographic_qa. + + The qrels split (relevance judgements) is intentionally not loaded here. + It is only needed for building a FileRetriever JSON pre-seeded with + ground-truth pages, which is a separate offline step. + + Args: + dataset_name: HuggingFace dataset identifier, e.g. + "vidore/vidore_v3_finance_en". + cache_dir: Local directory for the HuggingFace datasets cache. + Defaults to the HuggingFace default (~/.cache/huggingface). + + Returns: + List of dicts with keys: query, answer, query_id (and any other + columns present in the queries split). + + Raises: + ImportError: If the `datasets` library is not installed. + ValueError: If the dataset's queries split lacks "query" or "answer" columns. + """ + try: + from datasets import load_dataset + except ImportError as exc: + raise ImportError( + "The 'datasets' library is required for ViDoRe v3 loading. " "Install it: pip install datasets>=2.19.0" + ) from exc + + load_kwargs: dict = {"split": "queries"} + if cache_dir: + load_kwargs["cache_dir"] = cache_dir + + ds = load_dataset(dataset_name, **load_kwargs) + + column_names = ds.column_names + required = ["query", "answer"] + missing = [c for c in required if c not in column_names] + if missing: + raise ValueError( + f"ViDoRe v3 dataset '{dataset_name}' queries split is missing columns: {missing}. " + f"Available columns: {column_names}" + ) + + records = [] + skipped = 0 + for row in ds: + query = str(row["query"]).strip() + answer = str(row["answer"]).strip() + if not query or not answer: + skipped += 1 + continue + record = {"query": query, "answer": answer} + for col in column_names: + if col not in record: + record[col] = row[col] + records.append(record) + + if skipped: + print(f" [ViDoRe loader] Skipped {skipped} rows with empty query or answer") + + return records + + +def load_generic_csv(csv_path: str) -> list[dict]: + """ + Load Q&A pairs from any CSV file with 'query' and 'answer' columns. + + All other columns are preserved as metadata in each record. This allows + researchers to bring their own dataset without writing code -- just point + the harness at a CSV. + + Args: + csv_path: Absolute or relative path to the CSV file. + + Returns: + List of dicts with at least keys: query, answer. + + Raises: + FileNotFoundError: If csv_path does not exist. + ValueError: If required columns are missing. + """ + if not os.path.exists(csv_path): + raise FileNotFoundError(f"CSV file not found: {csv_path}") + + required = {"query", "answer"} + records = [] + with open(csv_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + missing = required - set(reader.fieldnames or []) + if missing: + raise ValueError( + f"CSV {csv_path} missing required columns: {missing}. " f"Available columns: {reader.fieldnames}" + ) + for row in reader: + query = row.get("query", "").strip() + answer = row.get("answer", "").strip() + if query and answer: + records.append({**row, "query": query, "answer": answer}) + + return records + + +def get_qa_dataset_loader(dataset_name: str) -> Callable[[Optional[str]], list[dict]]: + """ + Return the loader function for a given dataset name. + + Contract: all returned callables have signature (data_dir: Optional[str] = None) + so callers can always invoke loader(data_dir=...) uniformly without knowing the + dataset type. This keeps the caller free of dataset-specific branching. + + Built-in mappings: + "bo767_infographic" -> load_infographic_qa(data_dir) + "vidore/" -> load_vidore_v3_qa(dataset_name) -- data_dir accepted but + maps to HuggingFace cache_dir if provided + "csv:/path/to.csv" -> load_generic_csv(path) -- data_dir is ignored + + Args: + dataset_name: Dataset identifier string. Use "csv:/path/to/file.csv" for + custom datasets with query and answer columns. + + Returns: + Callable with signature (data_dir: Optional[str] = None) -> list[dict]. + + Raises: + ValueError: If the dataset name is not recognized. + """ + lower = dataset_name.lower() + + if lower == "bo767_infographic": + return load_infographic_qa + + if lower.startswith("vidore/"): + _ds_name = dataset_name + + def _vidore_loader(data_dir: Optional[str] = None) -> list[dict]: + return load_vidore_v3_qa(_ds_name, cache_dir=data_dir) + + return _vidore_loader + + if dataset_name.startswith("csv:"): + _csv_path = dataset_name[4:] + + def _csv_loader(data_dir: Optional[str] = None) -> list[dict]: + return load_generic_csv(_csv_path) + + return _csv_loader + + raise ValueError( + f"Unknown QA dataset: '{dataset_name}'. " + "Built-in datasets: 'bo767_infographic', 'vidore/', " + "'csv:/path/to/file.csv'. " + "To add a new dataset, implement a loader in utils/qa/ground_truth.py " + "and register it in get_qa_dataset_loader()." + ) diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/judges.py b/tools/harness/src/nv_ingest_harness/utils/qa/judges.py new file mode 100644 index 000000000..101deef71 --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/judges.py @@ -0,0 +1,165 @@ +""" +LLM-as-judge scoring for the QA evaluation pipeline. + +LLMJudge uses a strong LLM to score generated answers on a 1-5 scale +against a ground-truth reference answer. Scores are returned as structured +JSON so they can be aggregated programmatically. + +Default judge model: nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1 +""" + +from __future__ import annotations + +import json +import re + +from typing import Any, Optional + +from nv_ingest_harness.utils.qa.generators import LiteLLMClient +from nv_ingest_harness.utils.qa.types import JudgeResult + +_JUDGE_SYSTEM_PROMPT = """\ +You are an expert evaluator for factual question answering. + +You will receive a QUESTION, a REFERENCE answer, and a CANDIDATE answer. + +Step 1 -- Identify required facts: + Break the REFERENCE into its key terms: specific numbers, names, dates, + percentages, units, or short phrases that constitute the factual core. + Example: "16% of adults" -> required facts = ["16%", "adults"]. + +Step 2 -- Check each required fact in the CANDIDATE: + - Allow numeric equivalence: "16.00%" = "16%", "1,000" = "1000". + - Allow paraphrasing: "Peers" matches "Peers of those adults". + - Allow additional correct detail: extra facts do NOT reduce the score. + - Short but correct answers are fine: "Peers" is valid for "Peers". + +Step 3 -- Score on a 1-5 scale based on the fraction of required facts present: + 5 - All required facts present. Answer is fully correct. + 4 - Nearly all required facts present. One minor fact may differ trivially. + 3 - Most required facts present but at least one non-trivial fact is missing + or slightly wrong. + 2 - Some required facts present but the core answer is incomplete or has a + significant factual error. + 1 - None or almost none of the required facts present. Includes: wrong answer, + irrelevant response, or stating "the context does not contain this + information" when the reference answer exists. + +Respond ONLY with valid JSON: +{"score": , "reasoning": ""} + +No text outside the JSON object.""" + +_JUDGE_USER_TEMPLATE = """\ +Question: {query} + +Reference answer: {reference} + +Candidate answer: {candidate}""" + + +class LLMJudge: + """ + LLM-as-judge that scores candidate answers on a 1-5 scale. + + Uses a LiteLLMClient under the hood so the judge model is swappable + via the same model-string convention as the generator. + + Args: + model: litellm model string for the judge LLM. + api_base: Override endpoint URL for private / local deployments. + api_key: Explicit API key (prefer env vars). + extra_params: Additional kwargs forwarded to litellm.completion. + """ + + def __init__( + self, + model: str = "nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1", + api_base: Optional[str] = None, + api_key: Optional[str] = None, + extra_params: Optional[dict[str, Any]] = None, + ): + self._client = LiteLLMClient( + model=model, + api_base=api_base, + api_key=api_key, + temperature=0.0, + max_tokens=256, + extra_params=extra_params or {}, + num_retries=3, + ) + self.model = model + + def judge(self, query: str, reference: str, candidate: str) -> JudgeResult: + """ + Score a candidate answer against the reference answer. + + Uses self._client.complete() so retry logic, auth, and extra_params + flow through LiteLLMClient -- no litellm calls here directly. + + Args: + query: The original question. + reference: Ground-truth reference answer. + candidate: The LLM-generated answer to evaluate. + + Returns: + JudgeResult with score (1-5) and one-sentence reasoning. + Returns score=0 if parsing fails, so the pipeline can continue. + """ + if not candidate or not candidate.strip(): + return JudgeResult(score=0, reasoning="Candidate answer was empty.", error="empty_candidate") + + user_content = _JUDGE_USER_TEMPLATE.format( + query=query, + reference=reference, + candidate=candidate, + ) + messages = [ + {"role": "system", "content": _JUDGE_SYSTEM_PROMPT}, + {"role": "user", "content": user_content}, + ] + + try: + raw, _ = self._client.complete(messages, max_tokens=256) + return _parse_judge_response(raw) + except Exception as exc: + return JudgeResult(score=0, reasoning="", error=f"judge_api_error: {exc}") + + +def _parse_judge_response(raw: str) -> JudgeResult: + """ + Parse the judge's JSON response into a JudgeResult. + + Attempts strict JSON parsing first, then falls back to regex extraction + to handle models that wrap the JSON in prose or code fences. + """ + text = raw.strip() + + # Remove markdown code fences if present + text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.MULTILINE) + text = re.sub(r"\s*```$", "", text, flags=re.MULTILINE) + text = text.strip() + + # Strict JSON parse + try: + data = json.loads(text) + score = int(data["score"]) + if not (1 <= score <= 5): + raise ValueError(f"score {score} out of range 1-5") + return JudgeResult(score=score, reasoning=str(data.get("reasoning", ""))) + except (json.JSONDecodeError, KeyError, ValueError): + pass + + # Fallback: regex extraction + score_match = re.search(r'"score"\s*:\s*([1-5])', text) + reasoning_match = re.search(r'"reasoning"\s*:\s*"([^"]*)"', text) + if score_match: + score = int(score_match.group(1)) + reasoning = reasoning_match.group(1) if reasoning_match else "" + return JudgeResult(score=score, reasoning=reasoning) + + return JudgeResult( + score=0, + reasoning="", + error=f"parse_failure: could not extract score from response: {raw[:200]!r}", + ) diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/orchestrator.py b/tools/harness/src/nv_ingest_harness/utils/qa/orchestrator.py new file mode 100644 index 000000000..82c9b9502 --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/orchestrator.py @@ -0,0 +1,307 @@ +""" +QA evaluation pipeline orchestrator. + +QAEvalPipeline wires together a retriever, one or more LLM clients, and a +judge to produce per-query and aggregate results across a full Q&A dataset. + +Concurrency: + Queries are processed in parallel using ThreadPoolExecutor. All work is + I/O-bound (API calls), so thread-based concurrency is appropriate and + simpler than asyncio. Retrieval is called once per query and the result + is reused across all LLMs, avoiding redundant VDB calls. + + Default max_workers=8. Set qa_max_workers in test_configs.yaml to tune. + At 8 workers: 369 queries typically completes in under 15 minutes even + with slow reasoning models. +""" + +from __future__ import annotations + +import threading +from collections import Counter, defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Optional + +from nv_ingest_harness.utils.qa.scoring import answer_in_context, classify_failure, token_f1 +from nv_ingest_harness.utils.qa.types import ( + AnswerJudge, + LLMClient, + QAQueryResult, + RetrieverStrategy, +) + + +class QAEvalPipeline: + """ + Orchestrates retrieval -> generation -> judging for a Q&A dataset. + + Args: + retriever: Any object satisfying the RetrieverStrategy protocol. + llm_clients: Mapping of {name: LLMClient}. Each client is run for + every query; results are keyed by name in the output. + judge: Any object satisfying the AnswerJudge protocol. + top_k: Number of chunks to retrieve per query. + max_workers: Thread pool size for concurrent query processing. + include_chunks_in_results: Emit retrieved chunk text in per-query + output for debuggability. Defaults to True. + chunk_char_limit: Truncate each chunk to this many characters in + the output JSON to control file size. 0 = no limit. + """ + + def __init__( + self, + retriever: RetrieverStrategy, + llm_clients: dict[str, LLMClient], + judge: AnswerJudge, + top_k: int = 5, + max_workers: int = 8, + include_chunks_in_results: bool = True, + chunk_char_limit: int = 500, + ): + self.retriever = retriever + self.llm_clients = llm_clients + self.judge = judge + self.top_k = top_k + self.max_workers = max_workers + self.include_chunks_in_results = include_chunks_in_results + self.chunk_char_limit = chunk_char_limit + + def evaluate(self, qa_pairs: list[dict]) -> dict: + """ + Evaluate all Q&A pairs and return aggregated results. + + Args: + qa_pairs: List of dicts, each with at least "query" and "answer" keys. + + Returns: + Dict of the form:: + + { + "per_query": [QAQueryResult, ...], + "by_model": { + "nemotron_super_49b": { + "mean_score": 4.1, + "score_distribution": {"1": 0, "2": 5, ...}, + "mean_latency_s": 6.2, + "error_count": 0, + }, + ... + } + } + """ + total = len(qa_pairs) + results: list[Optional[QAQueryResult]] = [None] * total + counter = {"done": 0} + lock = threading.Lock() + + def _process(idx: int, pair: dict) -> tuple[int, QAQueryResult]: + query = pair["query"] + reference = pair["answer"] + + retrieval = self.retriever.retrieve(query, self.top_k) + + query_result = QAQueryResult( + query=query, + reference_answer=reference, + retrieval=retrieval, + ) + + ref_in_chunks = answer_in_context(reference, retrieval.chunks) + query_result.answer_in_context = ref_in_chunks + + for name, client in self.llm_clients.items(): + gen = client.generate(query, retrieval.chunks) + query_result.generations[name] = gen + + if gen.error == "thinking_truncated": + from nv_ingest_harness.utils.qa.types import JudgeResult + + query_result.judgements[name] = JudgeResult( + score=0, + reasoning="Skipped: thinking truncated", + error="thinking_truncated", + ) + query_result.token_f1[name] = { + "exact_match": False, + "f1": 0.0, + "precision": 0.0, + "recall": 0.0, + } + query_result.failure_mode[name] = "thinking_truncated" + continue + + answer_text = gen.answer if not gen.error else "" + verdict = self.judge.judge(query, reference, answer_text) + query_result.judgements[name] = verdict + + query_result.token_f1[name] = token_f1(reference, answer_text) + + judge_score = verdict.score if not verdict.error else None + query_result.failure_mode[name] = classify_failure( + ref_in_chunks=ref_in_chunks, + judge_score=judge_score, + gen_error=gen.error, + candidate=answer_text, + ) + + return idx, query_result + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = {executor.submit(_process, i, pair): i for i, pair in enumerate(qa_pairs)} + + for future in as_completed(futures): + try: + idx, query_result = future.result() + results[idx] = query_result + except Exception as exc: + idx = futures[future] + pair = qa_pairs[idx] + print(f" ERROR processing query [{idx}]: {pair.get('query', '')!r}: {exc}") + + with lock: + counter["done"] += 1 + done = counter["done"] + if done % 10 == 0 or done == total: + print(f" Progress: {done}/{total} queries completed") + + return self._aggregate(results) + + def _aggregate(self, results: list[Optional[QAQueryResult]]) -> dict: + """Compute per-model aggregate statistics from per-query results.""" + valid = [r for r in results if r is not None] + total_submitted = len(results) + dropped = total_submitted - len(valid) + + scores_by_model: dict[str, list[int]] = defaultdict(list) + latencies_by_model: dict[str, list[float]] = defaultdict(list) + errors_by_model: dict[str, int] = defaultdict(int) + f1_by_model: dict[str, list[float]] = defaultdict(list) + exact_by_model: dict[str, list[bool]] = defaultdict(list) + failures_by_model: dict[str, Counter] = defaultdict(Counter) + + aic_count = 0 + + for qr in valid: + if qr.answer_in_context: + aic_count += 1 + + for name, verdict in qr.judgements.items(): + if verdict.error and verdict.score == 0: + errors_by_model[name] += 1 + else: + scores_by_model[name].append(verdict.score) + + for name, gen in qr.generations.items(): + if not gen.error: + latencies_by_model[name].append(gen.latency_s) + + for name, tf1 in qr.token_f1.items(): + f1_by_model[name].append(tf1.get("f1", 0.0)) + exact_by_model[name].append(tf1.get("exact_match", False)) + + for name, fm in qr.failure_mode.items(): + failures_by_model[name][fm] += 1 + + by_model: dict[str, dict] = {} + for name in self.llm_clients: + scores = scores_by_model[name] + latencies = latencies_by_model[name] + + dist: dict[str, int] = {str(k): 0 for k in range(1, 6)} + for s in scores: + dist[str(s)] = dist.get(str(s), 0) + 1 + + by_model[name] = { + "mean_score": round(sum(scores) / len(scores), 4) if scores else 0.0, + "score_distribution": dist, + "mean_latency_s": round(sum(latencies) / len(latencies), 3) if latencies else 0.0, + "scored_count": len(scores), + "error_count": errors_by_model[name], + } + + tier2: dict[str, dict] = {} + for name in self.llm_clients: + f1s = f1_by_model[name] + exacts = exact_by_model[name] + tier2[name] = { + "mean_exact_match": round(sum(exacts) / len(exacts), 4) if exacts else 0.0, + "mean_token_f1": round(sum(f1s) / len(f1s), 4) if f1s else 0.0, + } + + failure_breakdown: dict[str, dict[str, int]] = {} + for name in self.llm_clients: + failure_breakdown[name] = dict(failures_by_model[name]) + + return { + "summary": { + "total_submitted": total_submitted, + "total_completed": len(valid), + "dropped_queries": dropped, + }, + "tier1_retrieval": { + "answer_in_context_rate": round(aic_count / len(valid), 4) if valid else 0.0, + "answer_in_context_count": aic_count, + "total": len(valid), + }, + "tier2_programmatic": tier2, + "tier3_llm_judge": by_model, + "failure_breakdown": failure_breakdown, + "per_query": [ + _query_result_to_dict( + r, + include_chunks=self.include_chunks_in_results, + chunk_char_limit=self.chunk_char_limit, + ) + for r in valid + ], + "by_model": by_model, + } + + +def _truncate(text: str, limit: int) -> str: + if limit <= 0 or len(text) <= limit: + return text + return text[:limit] + "..." + + +def _query_result_to_dict( + qr: QAQueryResult, + include_chunks: bool = True, + chunk_char_limit: int = 500, +) -> dict: + """Serialize a QAQueryResult to a plain dict for JSON output.""" + result: dict = { + "query": qr.query, + "reference_answer": qr.reference_answer, + "retrieved_chunk_count": len(qr.retrieval.chunks), + "answer_in_context": qr.answer_in_context, + } + + if qr.token_f1: + result["token_f1"] = qr.token_f1 + if qr.failure_mode: + result["failure_mode"] = qr.failure_mode + + if include_chunks: + result["retrieved_chunks"] = [_truncate(c, chunk_char_limit) for c in qr.retrieval.chunks] + result["retrieval_metadata"] = qr.retrieval.metadata + + result["generations"] = { + name: { + "answer": gen.answer, + "latency_s": round(gen.latency_s, 3), + "model": gen.model, + "error": gen.error, + } + for name, gen in qr.generations.items() + } + result["judgements"] = { + name: { + "score": j.score, + "reasoning": j.reasoning, + "error": j.error, + } + for name, j in qr.judgements.items() + } + + return result diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/retrievers.py b/tools/harness/src/nv_ingest_harness/utils/qa/retrievers.py new file mode 100644 index 000000000..669aac18c --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/retrievers.py @@ -0,0 +1,297 @@ +""" +Retriever strategy implementations for the QA evaluation pipeline. + +TopKRetriever: queries an existing Milvus or LanceDB collection at eval time. +FileRetriever: reads pre-computed retrieval results from a JSON file. + +FileRetriever is the primary integration point. Any retrieval method -- vector +search, agentic retrieval, hybrid, reranked, BM25, or a completely custom +pipeline -- can plug into the QA eval harness by writing a single JSON file. +See the FileRetriever class docstring for the minimal required format. +""" + +from __future__ import annotations + +import json +import os +import re +import unicodedata +from typing import Optional + +from nv_ingest_harness.utils.qa.types import RetrievalResult + + +def _normalize_query(text: str) -> str: + """Canonical form for query matching: NFKC unicode, stripped, case-folded, + collapsed whitespace. This makes lookup resilient to trivial formatting + differences between the ground-truth CSV and the retrieval JSON.""" + text = unicodedata.normalize("NFKC", text) + text = text.strip().casefold() + text = re.sub(r"\s+", " ", text) + return text + + +class TopKRetriever: + """ + Retriever that queries an existing Milvus or LanceDB collection. + + Requires the collection to exist before construction -- run an e2e + ingestion pass (or e2e_qa_eval) first. + + Args: + collection_name: VDB collection / LanceDB table name. + hostname: Service hostname used for embedding and Milvus endpoints. + model_name: Embedding model name passed to the embedding service. + sparse: Enable hybrid sparse-dense retrieval (Milvus only). + gpu_search: Use GPU acceleration for Milvus search. + vdb_backend: "milvus" or "lancedb". + table_path: Path to LanceDB database directory (required for lancedb). + hybrid: Enable LanceDB hybrid (FTS + vector) retrieval. + """ + + def __init__( + self, + collection_name: str, + hostname: str = "localhost", + model_name: Optional[str] = None, + sparse: bool = False, + gpu_search: bool = False, + vdb_backend: str = "milvus", + table_path: Optional[str] = None, + hybrid: bool = False, + ): + self.collection_name = collection_name + self.hostname = hostname + self.model_name = model_name + self.sparse = sparse + self.gpu_search = gpu_search + self.vdb_backend = vdb_backend + self.table_path = table_path + self.hybrid = hybrid + self.embedding_endpoint = f"http://{hostname}:8012/v1" + + # Lazy import: nv_ingest_client / recall.py are not available in the + # minimal Docker image used for FileRetriever-only smoke tests. + from nv_ingest_harness.utils.recall import get_retrieval_func + + self._retrieval_func = get_retrieval_func( + vdb_backend=vdb_backend, + table_path=table_path, + table_name=collection_name, + hybrid=hybrid, + ) + + self._validate_collection() + + def _validate_collection(self) -> None: + """ + Verify the target collection exists before the pipeline starts. + + Raises RuntimeError with an actionable message if not found, + rather than letting callers hit a cryptic error mid-run. + """ + if self.vdb_backend == "milvus": + try: + from pymilvus import MilvusClient + + client = MilvusClient(uri=f"http://{self.hostname}:19530") + existing = client.list_collections() + if self.collection_name not in existing: + raise RuntimeError( + f"Milvus collection '{self.collection_name}' not found. " + f"Run e2e or e2e_qa_eval first to ingest documents. " + f"Available collections: {existing}" + ) + except ImportError: + pass + elif self.vdb_backend == "lancedb": + if self.table_path and not os.path.exists(self.table_path): + raise RuntimeError( + f"LanceDB path '{self.table_path}' does not exist. " + f"Run e2e or e2e_qa_eval first to ingest documents." + ) + + def retrieve(self, query: str, top_k: int) -> RetrievalResult: + """ + Retrieve the top-k most relevant chunks for a query. + + Extracts text from entity.text and stores source/page metadata. + """ + if self.vdb_backend == "lancedb": + raw = self._retrieval_func( + [query], + embedding_endpoint=self.embedding_endpoint, + model_name=self.model_name, + top_k=top_k, + ) + else: + raw = self._retrieval_func( + [query], + self.collection_name, + hybrid=self.sparse, + embedding_endpoint=self.embedding_endpoint, + model_name=self.model_name, + top_k=top_k, + gpu_search=self.gpu_search, + ) + + hits = raw[0] if raw else [] + chunks: list[str] = [] + metadata: list[dict] = [] + + for hit in hits: + entity = hit.get("entity", {}) + text = entity.get("text", "") + chunks.append(text) + + source = entity.get("source", {}) + content_meta = entity.get("content_metadata", {}) + metadata.append( + { + "source_id": source.get("source_id", ""), + "page_number": content_meta.get("page_number", ""), + "distance": hit.get("distance"), + } + ) + + return RetrievalResult(chunks=chunks, metadata=metadata) + + +class FileRetriever: + """ + Retriever that reads pre-computed results from a JSON file. + + This is the integration point for **any** retrieval method. Vector search, + agentic retrieval, hybrid pipelines, BM25, rerankers, or a completely + custom system -- as long as it produces a JSON file in the format below, + the QA eval harness will generate answers and judge them identically. + This makes comparisons across retrieval strategies apples-to-apples. + + Minimal required JSON format + ---------------------------- + Only ``"queries"`` with ``"chunks"`` is required. Everything else is optional. + + :: + + { + "queries": { + "What is the range of the 767?": { + "chunks": ["First retrieved chunk text...", "Second chunk..."] + } + } + } + + Full format (with optional fields) + ----------------------------------- + :: + + { + "metadata": { # optional -- ignored by FileRetriever + "retrieval_method": "agentic", # free-form; useful for your records + "model": "...", + "top_k": 5 + }, + "queries": { + "What is the range of the 767?": { + "chunks": [ # REQUIRED: list of text strings + "The 767 has a range of..." # (one per retrieved chunk/passage) + ], + "metadata": [ # optional: per-chunk provenance + {"source_id": "bo767.pdf", "page_number": 24, "distance": 0.42} + ] + } + } + } + + Rules: + - The ``"queries"`` key must be a dict mapping query strings to objects. + - Each object must have ``"chunks"``: a list of plain-text strings. + - ``"metadata"`` per-chunk is optional; if present it is carried through + to the results JSON for traceability but is not used for scoring. + - Query matching is normalized (NFKC unicode, case-folded, whitespace- + collapsed) so trivial formatting differences between the ground-truth + CSV and the retrieval JSON do not cause silent misses. + - The harness respects ``top_k`` at eval time: if your JSON has 10 chunks + per query but the eval is configured with ``qa_top_k=5``, only the + first 5 are used. Order matters -- put the best chunks first. + + Args: + file_path: Path to the JSON file with pre-computed retrieval results. + """ + + def __init__(self, file_path: str): + if not os.path.exists(file_path): + raise FileNotFoundError(f"FileRetriever: retrieval results file not found: {file_path}") + + with open(file_path) as f: + data = json.load(f) + + raw_index: dict[str, dict] = data.get("queries", {}) + if not raw_index: + raise ValueError( + f"FileRetriever: no 'queries' key found in {file_path}. " + 'Expected format: {"queries": {"query text": {"chunks": [...], "metadata": [...]}}}' + ) + + self._norm_index: dict[str, dict] = {} + self._raw_keys: dict[str, str] = {} + for raw_key, value in raw_index.items(): + norm = _normalize_query(raw_key) + self._norm_index[norm] = value + self._raw_keys[norm] = raw_key + + self._miss_count = 0 + + def check_coverage(self, qa_pairs: list[dict]) -> float: + """Validate retrieval file covers the ground-truth queries. + + Logs a per-query miss list and returns the coverage fraction. + Intended to be called once before the pipeline starts so data + quality issues surface early rather than as hundreds of silent + warnings mid-run. + """ + total = len(qa_pairs) + if total == 0: + return 1.0 + + misses: list[str] = [] + for pair in qa_pairs: + norm = _normalize_query(pair.get("query", "")) + if norm not in self._norm_index: + misses.append(pair.get("query", "")[:80]) + + coverage = (total - len(misses)) / total + if misses: + print(f" [FileRetriever] Coverage: {coverage:.1%} " f"({total - len(misses)}/{total} queries matched)") + for q in misses[:10]: + print(f" MISS: {q!r}") + if len(misses) > 10: + print(f" ... and {len(misses) - 10} more") + else: + print(f" [FileRetriever] Coverage: 100% ({total}/{total} queries matched)") + + return coverage + + def retrieve(self, query: str, top_k: int) -> RetrievalResult: + """ + Look up pre-computed chunks for a query string. + + Uses normalized matching (case-folded, whitespace-collapsed) so + trivial formatting differences don't cause misses. Falls back to + an empty result if the query is truly absent so the pipeline can + continue and record the miss. + """ + norm = _normalize_query(query) + entry = self._norm_index.get(norm) + + if entry is None: + self._miss_count += 1 + if self._miss_count <= 20: + print(f" [FileRetriever] WARNING: query not found in retrieval file: {query!r}") + elif self._miss_count == 21: + print(" [FileRetriever] WARNING: suppressing further miss warnings (>20)") + return RetrievalResult(chunks=[], metadata=[]) + + chunks = entry.get("chunks", [])[:top_k] + metadata = entry.get("metadata", [])[:top_k] + return RetrievalResult(chunks=chunks, metadata=metadata) diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/scoring.py b/tools/harness/src/nv_ingest_harness/utils/qa/scoring.py new file mode 100644 index 000000000..4bad059ab --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/scoring.py @@ -0,0 +1,259 @@ +""" +Programmatic scoring functions for multi-tier QA evaluation. + +All functions in this module are pure computation with no LLM dependencies. +They provide Tier-1 (retrieval quality) and Tier-2 (answer quality) metrics +that complement the Tier-3 LLM-as-judge score. + +Functions: + answer_in_context -- Tier 1: are reference answer keywords in the chunks? + token_f1 -- Tier 2: SQuAD-style token precision/recall/F1 + classify_failure -- Per-query failure mode classification +""" + +from __future__ import annotations + +import re +import string +from typing import Optional + +_STOP_WORDS = frozenset( + { + "a", + "an", + "the", + "is", + "are", + "was", + "were", + "be", + "been", + "being", + "have", + "has", + "had", + "do", + "does", + "did", + "will", + "would", + "could", + "should", + "may", + "might", + "shall", + "can", + "need", + "dare", + "ought", + "used", + "to", + "of", + "in", + "for", + "on", + "with", + "at", + "by", + "from", + "as", + "into", + "through", + "during", + "before", + "after", + "above", + "below", + "between", + "out", + "off", + "over", + "under", + "again", + "further", + "then", + "once", + "here", + "there", + "when", + "where", + "why", + "how", + "all", + "each", + "every", + "both", + "few", + "more", + "most", + "other", + "some", + "such", + "no", + "not", + "only", + "own", + "same", + "so", + "than", + "too", + "very", + "just", + "because", + "but", + "and", + "or", + "if", + "while", + "about", + "up", + "it", + "its", + "that", + "this", + "these", + "those", + "what", + "which", + "who", + "whom", + } +) + +_NO_CONTEXT_PATTERNS = [ + r"does not contain", + r"do not contain", + r"no information", + r"not enough information", + r"cannot be determined", + r"not mentioned", + r"not provided", + r"context does not", + r"unable to find", + r"no relevant", +] +_NO_CONTEXT_RE = re.compile("|".join(_NO_CONTEXT_PATTERNS), re.IGNORECASE) + + +def _normalize(text: str) -> str: + """Lowercase, strip articles/punctuation, collapse whitespace.""" + text = text.lower() + text = re.sub(r"\b(a|an|the)\b", " ", text) + text = text.translate(str.maketrans("", "", string.punctuation)) + return " ".join(text.split()) + + +def _normalize_numeric(text: str) -> str: + """Normalize numeric formats so '16.00%' matches '16%', '1,000' matches '1000'.""" + text = re.sub(r",(\d{3})", r"\1", text) + text = re.sub(r"(\d+)\.0+(%?)", r"\1\2", text) + return text + + +def _content_words(text: str) -> list[str]: + """Extract content words (non-stopwords) from normalized text.""" + normalized = _normalize(_normalize_numeric(text)) + return [w for w in normalized.split() if w not in _STOP_WORDS] + + +def answer_in_context(reference: str, chunks: list[str]) -> bool: + """ + Tier-1 retrieval quality check. + + Returns True if >= 50% of the content words in the reference answer + appear in the concatenated chunk text. Case-insensitive with numeric + normalization (e.g. '16.00%' matches '16%'). + """ + ref_words = _content_words(reference) + if not ref_words: + return True + + chunk_text = _normalize(_normalize_numeric(" ".join(chunks))) + found = sum(1 for w in ref_words if w in chunk_text) + return found / len(ref_words) >= 0.5 + + +def token_f1(reference: str, candidate: str) -> dict: + """ + Tier-2 SQuAD-style token-level scoring after normalization. + + Strips tags from candidate before scoring. Returns a dict with: + exact_match: bool -- perfect string match after normalization + f1: float -- harmonic mean of precision and recall + precision: float -- fraction of candidate tokens in reference + recall: float -- fraction of reference tokens in candidate + """ + from nv_ingest_harness.utils.qa.generators import strip_think_tags + + candidate = strip_think_tags(candidate) + + norm_ref = _normalize(_normalize_numeric(reference)) + norm_cand = _normalize(_normalize_numeric(candidate)) + + exact = norm_ref == norm_cand + + ref_tokens = norm_ref.split() + cand_tokens = norm_cand.split() + + if not ref_tokens and not cand_tokens: + return {"exact_match": True, "f1": 1.0, "precision": 1.0, "recall": 1.0} + if not ref_tokens or not cand_tokens: + return {"exact_match": exact, "f1": 0.0, "precision": 0.0, "recall": 0.0} + + common = set(ref_tokens) & set(cand_tokens) + num_common = sum(min(ref_tokens.count(t), cand_tokens.count(t)) for t in common) + + if num_common == 0: + return {"exact_match": exact, "f1": 0.0, "precision": 0.0, "recall": 0.0} + + precision = num_common / len(cand_tokens) + recall = num_common / len(ref_tokens) + f1 = 2 * precision * recall / (precision + recall) + + return { + "exact_match": exact, + "f1": round(f1, 4), + "precision": round(precision, 4), + "recall": round(recall, 4), + } + + +def classify_failure( + ref_in_chunks: bool, + judge_score: Optional[int], + gen_error: Optional[str], + candidate: str, +) -> str: + """ + Classify the failure mode for a single query + model combination. + + Returns one of: + "correct" -- judge score >= 4 + "partial" -- judge score 2-3 + "retrieval_miss" -- reference not in chunks AND score <= 2 + "generation_miss" -- reference in chunks but score <= 2 + "thinking_truncated" -- generator flagged truncated reasoning + "no_context" -- candidate says 'no information' AND score <= 2 + """ + if gen_error == "thinking_truncated": + return "thinking_truncated" + + if judge_score is None: + return "thinking_truncated" + + if judge_score >= 4: + return "correct" + + if 2 <= judge_score <= 3: + if _NO_CONTEXT_RE.search(candidate): + return "no_context" + return "partial" + + if judge_score <= 1: + if _NO_CONTEXT_RE.search(candidate): + return "no_context" + if not ref_in_chunks: + return "retrieval_miss" + return "generation_miss" + + return "partial" diff --git a/tools/harness/src/nv_ingest_harness/utils/qa/types.py b/tools/harness/src/nv_ingest_harness/utils/qa/types.py new file mode 100644 index 000000000..cf3117e83 --- /dev/null +++ b/tools/harness/src/nv_ingest_harness/utils/qa/types.py @@ -0,0 +1,117 @@ +""" +Protocol definitions and dataclasses for the QA evaluation pipeline. + +These abstractions allow retrieval strategies, LLM clients, and judges +to be swapped independently without modifying the orchestrator. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Optional, Protocol, runtime_checkable + + +@runtime_checkable +class RetrieverStrategy(Protocol): + """Pluggable retrieval strategy interface.""" + + def retrieve(self, query: str, top_k: int) -> "RetrievalResult": + """ + Retrieve the top-k most relevant chunks for a query. + + Args: + query: Natural language query string. + top_k: Maximum number of chunks to return. + + Returns: + RetrievalResult with chunks and optional metadata. + """ + ... + + +@runtime_checkable +class LLMClient(Protocol): + """Pluggable LLM answer generation interface.""" + + def generate(self, query: str, chunks: list[str]) -> "GenerationResult": + """ + Generate an answer given a query and retrieved context chunks. + + Args: + query: The user question. + chunks: Retrieved text chunks to use as context. + + Returns: + GenerationResult with the generated answer and latency. + """ + ... + + +@runtime_checkable +class AnswerJudge(Protocol): + """Pluggable answer scoring interface.""" + + def judge(self, query: str, reference: str, candidate: str) -> "JudgeResult": + """ + Score a candidate answer against a reference answer. + + Args: + query: The original question. + reference: Ground-truth reference answer. + candidate: LLM-generated candidate answer to evaluate. + + Returns: + JudgeResult with a 1-5 score and reasoning. + """ + ... + + +@dataclass +class RetrievalResult: + """Result from a retrieval operation.""" + + chunks: list[str] + metadata: list[dict[str, Any]] = field(default_factory=list) + + +@dataclass +class GenerationResult: + """Result from a single LLM generation call.""" + + answer: str + latency_s: float + model: str + error: Optional[str] = None + + +@dataclass +class JudgeResult: + """Result from a single judge evaluation.""" + + score: int + reasoning: str + error: Optional[str] = None + + +@dataclass +class QAQueryResult: + """ + Aggregated per-query results across all configured LLMs. + + Each key in `generations` and `judgements` maps to a named LLM + from qa_llm_configs (e.g., "nemotron_super_49b"). + + Multi-tier fields (populated by orchestrator after generation/judging): + answer_in_context: Tier-1 retrieval quality flag. + token_f1: Tier-2 programmatic answer quality per model. + failure_mode: Per-model failure classification. + """ + + query: str + reference_answer: str + retrieval: RetrievalResult + generations: dict[str, GenerationResult] = field(default_factory=dict) + judgements: dict[str, JudgeResult] = field(default_factory=dict) + answer_in_context: Optional[bool] = None + token_f1: dict[str, dict] = field(default_factory=dict) + failure_mode: dict[str, str] = field(default_factory=dict) diff --git a/tools/harness/src/nv_ingest_harness/utils/recall.py b/tools/harness/src/nv_ingest_harness/utils/recall.py index c22f79813..a1028bae4 100644 --- a/tools/harness/src/nv_ingest_harness/utils/recall.py +++ b/tools/harness/src/nv_ingest_harness/utils/recall.py @@ -17,7 +17,7 @@ from nv_ingest_harness.utils.cases import get_repo_root -def _get_retrieval_func( +def get_retrieval_func( vdb_backend: str, table_path: Optional[str] = None, table_name: Optional[str] = None, @@ -29,6 +29,9 @@ def _get_retrieval_func( For LanceDB, returns a partial of lancedb_retrieval with table_path pre-filled. For Milvus, returns nvingest_retrieval directly. + Shared by both recall evaluation (utils/recall.py) and QA evaluation + (utils/qa/retrievers.py) so VDB routing logic lives in one place. + Args: vdb_backend: "milvus" or "lancedb" table_path: Path to LanceDB database directory (required for lancedb) @@ -102,7 +105,7 @@ def get_recall_scores( # Create retrieval function once, outside the batch loop if vdb_backend == "lancedb": - retrieval_func = _get_retrieval_func( + retrieval_func = get_retrieval_func( "lancedb", table_path, table_name=collection_name, @@ -221,7 +224,7 @@ def get_recall_scores_pdf_only( # Create retrieval function once, outside the batch loop if vdb_backend == "lancedb": - retrieval_func = _get_retrieval_func( + retrieval_func = get_retrieval_func( "lancedb", table_path, table_name=collection_name, diff --git a/tools/harness/test_configs.yaml b/tools/harness/test_configs.yaml index 27000c873..17976614b 100644 --- a/tools/harness/test_configs.yaml +++ b/tools/harness/test_configs.yaml @@ -101,6 +101,57 @@ recall: recall_top_k: 10 ground_truth_dir: null +# QA evaluation configuration (only used when running qa_eval or e2e_qa_eval tests) +# These settings measure how well an LLM can answer questions given retrieved context. +# +# API keys are read from environment variables automatically by litellm: +# NVIDIA_API_KEY -> nvidia_nim/ prefixed models +# OPENAI_API_KEY -> openai/ prefixed models +# Do NOT put API keys in this file. +# +# To use a local vLLM/Ollama server instead of NIM: +# model: openai/ # openai/ prefix = any OpenAI-spec server +# api_base: http://localhost:8000/v1 +# (remove api_base to use NIM defaults) +# +# Legacy dataset: set qa_dataset to "bo767_infographic" to use the older +# infographic-only ground truth (368 queries) instead of the full annotations. +qa_eval: + qa_dataset: csv:data/bo767_annotations.csv + qa_top_k: 5 + qa_retriever: file # "topk" (queries existing VDB collection) | "file" (reads JSON) + qa_retriever_config: + file_path: data/test_retrieval/bo767_retrieval_fullpage.json + qa_max_workers: 8 + + # qa_llm_configs: list of LLMs to evaluate in a single run. + # Each entry produces its own mean_score / latency entry in results.json. + # model uses litellm provider-prefix routing. + # extra_params: forwarded as **kwargs to litellm.completion (e.g. reasoning mode). + qa_llm_configs: + - name: nemotron_super_49b # best accuracy preset (reasoning off for speed) + model: nvidia_nim/nvidia/llama-3.3-nemotron-super-49b-v1.5 + api_key: ${NVIDIA_API_KEY} + - name: qwen3_next_80b # fast + good accuracy preset + model: nvidia_nim/qwen/qwen3-next-80b-a3b-instruct + api_key: ${NVIDIA_API_KEY} + + # qa_judge_config: judge LLM used for scoring answers 1-5. + # Default: Mixtral-8x22B (strong general-purpose judge). + qa_judge_config: + model: nvidia_nim/mistralai/mixtral-8x22b-instruct-v0.1 + api_key: ${NVIDIA_API_KEY} + + # To restore TopK retrieval (VDB must be ingested first), replace the above with: + # qa_retriever: topk + # qa_retriever_config: null + + # Example: use ViDoRe v3 dataset instead of bo767_infographic + # qa_dataset: vidore/vidore_v3_finance_en + # qa_retriever: file + # qa_retriever_config: + # file_path: /path/to/vidore_retrieval_results.json + # Pre-configured datasets # Each dataset includes path, extraction settings, and recall evaluator # Use: uv run nv-ingest-harness-run --case=e2e --dataset=bo767 @@ -113,6 +164,7 @@ datasets: extract_images: false extract_infographics: false recall_dataset: bo767 + qa_dataset: csv:data/bo767_annotations.csv earnings: path: /path/to/earnings_consulting