From 1a352554d3b5b09d7d1a0ed02616461162e0a71f Mon Sep 17 00:00:00 2001 From: Silas Pignotti Date: Tue, 21 Apr 2026 12:36:45 +0200 Subject: [PATCH 1/2] chore(opencode): consolidate project state file naming --- .opencode/PROJECT_STATE.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 .opencode/PROJECT_STATE.md diff --git a/.opencode/PROJECT_STATE.md b/.opencode/PROJECT_STATE.md new file mode 100644 index 0000000..e39e82e --- /dev/null +++ b/.opencode/PROJECT_STATE.md @@ -0,0 +1,23 @@ +# Project State + +## Current Focus +- Validate installed `litresearch` behavior with low-cost end-to-end smoke runs. + +## Active Work +- None. + +## Key Decisions +- [2026-04-11] v1.0.0 release blockers were resolved (security fixes, CLI fix, settings wiring, rate limiting, tests added). +- [2026-04-20] Use a temporary workspace with constrained runtime settings (`max_results_per_query=5`, `screening_selection_mode=top_k`, `screening_top_k=3`, `top_n=5`) for low-cost verification before broader test campaigns. + +## Lessons +- [2026-04-20] `--stop-after-screening` pause + `resume` currently re-runs full screening in `analysis` stage instead of resuming post-screening, which increases LLM cost and time for pause/resume workflows. +- [2026-04-20] Resume writes a new `metrics.json` run timeline from the resume invocation, so pre-pause stage metrics are not preserved in the final metrics artifact. + +## Next Steps +- Add a dedicated post-screening checkpoint state so `resume` can skip re-screening. +- Preserve/merge metrics across paused and resumed segments. +- Add a configurable hard cap for total generated queries or total candidates to keep cost predictable. + +## Blocked +- None. From d92fb1c65cb1c063a55ed70dda0c41320113edcc Mon Sep 17 00:00:00 2001 From: Silas Pignotti Date: Tue, 21 Apr 2026 13:08:59 +0200 Subject: [PATCH 2/2] fix(pipeline): improve pause resume checkpoint behavior --- src/litresearch/cli.py | 7 +- src/litresearch/models.py | 1 + src/litresearch/pipeline.py | 21 ++++-- src/litresearch/stages/analysis.py | 64 +++++++++++------ tests/unit/test_analysis.py | 108 ++++++++++++++++++++++++++++- tests/unit/test_models.py | 2 + tests/unit/test_pipeline.py | 78 +++++++++++++++++++++ 7 files changed, 254 insertions(+), 27 deletions(-) diff --git a/src/litresearch/cli.py b/src/litresearch/cli.py index ab34ef2..56e886c 100644 --- a/src/litresearch/cli.py +++ b/src/litresearch/cli.py @@ -103,7 +103,12 @@ def run( inject_pdfs_dir=inject_pdfs, stop_after_screening=stop_after_screening, ) - console.print(f"[green]Run complete.[/green] Output: {state.output_dir}") + if state.screened_papers_completed and not state.analyses: + console.print( + f"[yellow]Pipeline paused at screening checkpoint.[/yellow] Output: {state.output_dir}" + ) + else: + console.print(f"[green]Run complete.[/green] Output: {state.output_dir}") @app.command() diff --git a/src/litresearch/models.py b/src/litresearch/models.py index 6e2e589..f4192a1 100644 --- a/src/litresearch/models.py +++ b/src/litresearch/models.py @@ -159,6 +159,7 @@ class PipelineState(BaseModel): screening_results: list[ScreeningResult] = Field(default_factory=list) analyses: list[AnalysisResult] = Field(default_factory=list) ranked_paper_ids: list[str] = Field(default_factory=list) + screened_papers_completed: bool = False current_stage: str output_dir: str created_at: str diff --git a/src/litresearch/pipeline.py b/src/litresearch/pipeline.py index 5f9d04a..622c1aa 100644 --- a/src/litresearch/pipeline.py +++ b/src/litresearch/pipeline.py @@ -126,7 +126,17 @@ def run_pipeline( output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "state.json" - metrics = RunMetrics(run_id=f"run-{uuid.uuid4().hex[:12]}", started_at=started_at) + metrics_path = output_dir / "metrics.json" + + # Preserve existing metrics when resuming + if resume_path is not None and metrics_path.exists(): + try: + metrics = RunMetrics.model_validate_json(metrics_path.read_text(encoding="utf-8")) + except Exception: # noqa: BLE001 + metrics = RunMetrics(run_id=f"run-{uuid.uuid4().hex[:12]}", started_at=started_at) + else: + metrics = RunMetrics(run_id=f"run-{uuid.uuid4().hex[:12]}", started_at=started_at) + effective_inject_pdfs_dir = inject_pdfs_dir if effective_inject_pdfs_dir is None and settings.inject_pdf_dir: effective_inject_pdfs_dir = Path(settings.inject_pdf_dir) @@ -164,18 +174,21 @@ def run_pipeline( state.save(state_path) metrics = _populate_aggregate_metrics(metrics, state) _write_metrics(output_dir, metrics) - except PauseForPDFsError: + except PauseForPDFsError as pause_exc: # Not a failure - user chose to pause for manual PDF injection - # Save state at screening checkpoint + # Preserve screening results and mark screening as completed checkpoint_state = state.model_copy( update={ + "screening_results": pause_exc.screening_results, + "screened_papers_completed": True, "updated_at": _timestamp(), } ) checkpoint_state.save(state_path) console.print("\n[bold yellow]Pipeline paused at screening checkpoint.[/bold yellow]") console.print(f"State saved to: {state_path}") - return state # Return current state + state = checkpoint_state + return state except Exception as exc: # noqa: BLE001 stage_metrics = stage_metrics.model_copy( update={ diff --git a/src/litresearch/stages/analysis.py b/src/litresearch/stages/analysis.py index d236ffa..02ca7f8 100644 --- a/src/litresearch/stages/analysis.py +++ b/src/litresearch/stages/analysis.py @@ -289,9 +289,15 @@ def _analyze_paper( class PauseForPDFsError(Exception): """Raised when pipeline should pause after screening for manual PDF injection.""" - def __init__(self, papers_needing_pdfs: list[Paper], state_path: str) -> None: + def __init__( + self, + papers_needing_pdfs: list[Paper], + state_path: str, + screening_results: list[ScreeningResult] | None = None, + ) -> None: self.papers_needing_pdfs = papers_needing_pdfs self.state_path = state_path + self.screening_results = screening_results or [] super().__init__(f"{len(papers_needing_pdfs)} papers need manual PDFs") @@ -314,26 +320,43 @@ def run( papers_by_id = {paper.paper_id: paper for paper in state.candidates} - screening_results: list[ScreeningResult] = [] - screened_papers: list[tuple[Paper, ScreeningResult, int]] = [] - for index, paper in enumerate(track(state.candidates, description="Screening papers")): - pdf_excerpt = None - if not paper.abstract: - pdf_excerpt = _screening_pdf_excerpt(paper, state.questions, settings, inject_pdfs_dir) - - screening_result = _screen_paper( - paper, - state.questions, - settings, - screening_prompt, - screening_fallback_prompt, - pdf_excerpt=pdf_excerpt, + if state.screened_papers_completed and state.screening_results: + screening_results = state.screening_results + screened_papers = [ + ( + papers_by_id[result.paper_id], + result, + idx, + ) + for idx, result in enumerate(screening_results) + if result.paper_id in papers_by_id + ] + console.print( + f"[dim]Screening already completed ({len(screening_results)} papers). Skipping.[/dim]" ) - if screening_result is None: - continue + else: + screening_results: list[ScreeningResult] = [] + screened_papers: list[tuple[Paper, ScreeningResult, int]] = [] + for index, paper in enumerate(track(state.candidates, description="Screening papers")): + pdf_excerpt = None + if not paper.abstract: + pdf_excerpt = _screening_pdf_excerpt( + paper, state.questions, settings, inject_pdfs_dir + ) + + screening_result = _screen_paper( + paper, + state.questions, + settings, + screening_prompt, + screening_fallback_prompt, + pdf_excerpt=pdf_excerpt, + ) + if screening_result is None: + continue - screening_results.append(screening_result) - screened_papers.append((paper, screening_result, index)) + screening_results.append(screening_result) + screened_papers.append((paper, screening_result, index)) passed_papers = _select_papers_for_analysis(screened_papers, settings) @@ -370,7 +393,7 @@ def run( console.print(" 2. Continue without PDFs (analysis will use abstracts only):") console.print(f" litresearch resume {state.output_dir}/state.json\n") - raise PauseForPDFsError(papers_needing_pdfs, state.output_dir) + raise PauseForPDFsError(papers_needing_pdfs, state.output_dir, screening_results) analyses: list[AnalysisResult] = [] for paper in track(passed_papers, description="Analyzing papers"): @@ -392,6 +415,7 @@ def run( update={ "candidates": updated_candidates, "screening_results": screening_results, + "screened_papers_completed": True, "analyses": analyses, "current_stage": "analysis", } diff --git a/tests/unit/test_analysis.py b/tests/unit/test_analysis.py index f46dc99..2cd0fd6 100644 --- a/tests/unit/test_analysis.py +++ b/tests/unit/test_analysis.py @@ -1,8 +1,21 @@ import json from litresearch.config import Settings -from litresearch.models import Paper, PipelineState, ScreeningResult -from litresearch.stages.analysis import _injected_pdf_path, run +from litresearch.models import AnalysisResult, Paper, PipelineState, ScreeningResult +from litresearch.stages.analysis import PauseForPDFsError, _injected_pdf_path, run + + +def _make_state(tmp_path, *, papers=None, screening_results=None, screened_papers_completed=False): + return PipelineState( + questions=["q"], + candidates=papers or [], + screening_results=screening_results or [], + screened_papers_completed=screened_papers_completed, + current_stage="enrichment", + output_dir=str(tmp_path), + created_at="2026-03-09T16:00:00Z", + updated_at="2026-03-09T16:00:00Z", + ) def test_injected_pdf_path_rejects_path_traversal(tmp_path) -> None: @@ -110,3 +123,94 @@ def test_analysis_saves_pdf_and_marks_candidate_downloaded(tmp_path, monkeypatch assert "p1.pdf" in updated_state.candidates[0].pdf_path assert (tmp_path / "papers" / "p1.pdf").read_bytes() == b"%PDF-1.0" assert len(updated_state.analyses) == 1 + + +def test_analysis_skips_screening_when_already_completed(tmp_path, monkeypatch) -> None: + """When screened_papers_completed is True and screening_results exist, screening is skipped.""" + import litresearch.stages.analysis as analysis_stage + + papers = [ + Paper(paper_id="p1", title="P1", abstract="a", citation_count=10), + Paper(paper_id="p2", title="P2", abstract="a", citation_count=5), + ] + existing_results = [ + ScreeningResult(paper_id="p1", relevance_score=90, rationale="fit"), + ScreeningResult(paper_id="p2", relevance_score=80, rationale="fit"), + ] + state = _make_state( + tmp_path, + papers=papers, + screening_results=existing_results, + screened_papers_completed=True, + ) + + screening_call_count = 0 + + def _fail_if_called(*args, **kwargs): + nonlocal screening_call_count + screening_call_count += 1 + raise AssertionError("Screening should not be called when already completed") + + monkeypatch.setattr(analysis_stage, "load_prompt", lambda _name: "prompt") + monkeypatch.setattr(analysis_stage, "_screen_paper", _fail_if_called) + monkeypatch.setattr( + analysis_stage, + "_analyze_paper", + lambda paper, questions, settings, prompt, output_dir, inject_pdfs_dir=None: ( + AnalysisResult( + paper_id=paper.paper_id, + summary="summary", + key_findings=["finding"], + methodology="experiment", + relevance_score=90, + relevance_rationale="fit", + ), + paper, + ), + ) + import litresearch.models as models + + monkeypatch.setattr(models, "AnalysisResult", models.AnalysisResult, raising=False) + + settings = Settings(screening_selection_mode="top_k", screening_top_k=2) + updated = run(state, settings) + + assert screening_call_count == 0 + assert len(updated.analyses) == 2 + assert updated.screened_papers_completed is True + + +def test_pause_for_pdfs_carries_screening_results(tmp_path, monkeypatch) -> None: + """PauseForPDFsError includes screening_results for checkpoint state.""" + import litresearch.stages.analysis as analysis_stage + + papers = [ + Paper( + paper_id="p1", + title="P1", + abstract="a", + citation_count=10, + pdf_status="not_attempted", + ), + ] + state = _make_state(tmp_path, papers=papers) + settings = Settings(screening_selection_mode="top_k", screening_top_k=1) + + monkeypatch.setattr(analysis_stage, "load_prompt", lambda _name: "prompt") + monkeypatch.setattr( + analysis_stage, + "_screen_paper", + lambda paper, questions, settings, prompt, fb_prompt, pdf_excerpt=None: ScreeningResult( + paper_id=paper.paper_id, + relevance_score=90, + rationale="fit", + ), + ) + + try: + run(state, settings, stop_after_screening=True) + except PauseForPDFsError as exc: + assert len(exc.screening_results) == 1 + assert exc.screening_results[0].paper_id == "p1" + else: + raise AssertionError("Expected PauseForPDFsError") diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index fc76bde..0cd1e89 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -14,6 +14,7 @@ def test_pipeline_state_save_load_roundtrip(tmp_path: Path) -> None: output_dir="output/run-1", created_at="2026-03-09T16:00:00Z", updated_at="2026-03-09T16:05:00Z", + screened_papers_completed=True, ) path = tmp_path / "state.json" @@ -22,6 +23,7 @@ def test_pipeline_state_save_load_roundtrip(tmp_path: Path) -> None: loaded = PipelineState.load(path) assert loaded == state + assert loaded.screened_papers_completed is True def test_paper_from_s2_normalizes_fields() -> None: diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 5482e1d..161143c 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -2,6 +2,7 @@ from litresearch import pipeline from litresearch.config import Settings +from litresearch.models import PipelineState, RunMetrics, StageMetrics def test_run_pipeline_auto_increments_non_empty_output_dir(tmp_path: Path, monkeypatch) -> None: @@ -48,3 +49,80 @@ def test_run_pipeline_keeps_empty_existing_output_dir(tmp_path: Path, monkeypatc ) assert state.output_dir == str(base_output) + + +def test_resume_preserves_existing_metrics(tmp_path: Path, monkeypatch) -> None: + """When resuming, existing metrics.json is loaded and extended instead of replaced.""" + monkeypatch.setattr(pipeline, "STAGE_ORDER", []) + + output_dir = tmp_path / "output" + output_dir.mkdir() + state_path = output_dir / "state.json" + + pre_pause_state = PipelineState( + questions=["q"], + current_stage="start", + output_dir=str(output_dir), + created_at="2026-04-20T10:00:00Z", + updated_at="2026-04-20T10:05:00Z", + ) + pre_pause_state.save(state_path) + + pre_pause_metrics = RunMetrics( + run_id="run-test123", + started_at="2026-04-20T10:00:00Z", + stages=[ + StageMetrics( + name="query_gen", + started_at="2026-04-20T10:00:00Z", + completed_at="2026-04-20T10:01:00Z", + duration_seconds=5.0, + ), + ], + ) + (output_dir / "metrics.json").write_text( + pre_pause_metrics.model_dump_json(indent=2), encoding="utf-8" + ) + + pipeline.run_pipeline( + questions=[], + settings=Settings(output_dir=str(output_dir)), + resume_path=state_path, + ) + + metrics_data = (output_dir / "metrics.json").read_text(encoding="utf-8") + metrics = RunMetrics.model_validate_json(metrics_data) + + assert metrics.run_id == "run-test123" + assert len(metrics.stages) == 1 + assert metrics.stages[0].name == "query_gen" + + +def test_resume_creates_new_metrics_when_none_exist(tmp_path: Path, monkeypatch) -> None: + """When resuming without existing metrics, a fresh RunMetrics is created.""" + monkeypatch.setattr(pipeline, "STAGE_ORDER", []) + + output_dir = tmp_path / "output" + output_dir.mkdir() + state_path = output_dir / "state.json" + + state = PipelineState( + questions=["q"], + current_stage="start", + output_dir=str(output_dir), + created_at="2026-04-20T10:00:00Z", + updated_at="2026-04-20T10:05:00Z", + ) + state.save(state_path) + + pipeline.run_pipeline( + questions=[], + settings=Settings(output_dir=str(output_dir)), + resume_path=state_path, + ) + + assert (output_dir / "metrics.json").exists() + metrics = RunMetrics.model_validate_json( + (output_dir / "metrics.json").read_text(encoding="utf-8") + ) + assert metrics.run_id.startswith("run-")