From a960f4f517d1b9d0e08dd18db7eef2e84b9d7d4e Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Tue, 3 Mar 2026 20:17:46 +0100 Subject: [PATCH 1/9] Restructure data_analysis_agent for parallel downloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace single AgentExecutor with 4-step pipeline: filter_node → planner_node → download_node → analysis_node. Downloads (ERA5 + DestinE) now run in parallel via ThreadPoolExecutor instead of sequential LLM tool calls. Analysis agent receives pre-fetched data, freeing full tool budget for REPL/plotting. Add @traceable decorators for LangSmith visibility. --- PULL_REQUEST.md | 39 -- src/climsight/data_analysis_agent.py | 960 ++++++++++++++++++--------- 2 files changed, 628 insertions(+), 371 deletions(-) delete mode 100644 PULL_REQUEST.md diff --git a/PULL_REQUEST.md b/PULL_REQUEST.md deleted file mode 100644 index 4c48479..0000000 --- a/PULL_REQUEST.md +++ /dev/null @@ -1,39 +0,0 @@ -# FIRST ACCEPT PREVIOUS PR ;) - -**This PR is based on PR #197 (analysis modes) and must be merged after it.** - ---- - -## Data Tab: Downloadable Datasets - -Adds a new **Data** tab to the UI where users can download all datasets generated during a session. Also renames "Additional information" → "Figures". - -### What's new - -- **`downloadable_datasets` tracking** — a new field on `AgentState` that accumulates dataset entries (`{label, path, source}`) as they're created throughout the pipeline -- **Climate model CSVs** — tracked after `write_climate_data_manifest()` in `data_agent` -- **ERA5 climatology JSON** — tracked in `prepare_predefined_data()` after extraction -- **ERA5 time series Zarr** — tracked in `data_analysis_agent` after `retrieve_era5_data` tool execution -- **DestinE time series Zarr** — tracked in `data_analysis_agent` after `retrieve_destine_data` tool execution -- **Data tab in UI** — lists all tracked datasets with download buttons; Zarr directories are zipped on the fly, JSON/CSV files download directly -- **Tab rename** — "Additional information" → "Figures" -- **Data tab always visible** — shown regardless of whether figures are available - -### Pipeline fix - -Each agent node now **returns** `downloadable_datasets` in its return dict so LangGraph properly merges state across stages (in-place mutation alone is not enough). - -### Files changed - -| File | Change | -|------|--------| -| `climsight_classes.py` | Add `downloadable_datasets: list = []` to `AgentState` | -| `climsight_engine.py` | Track datasets in `data_agent`, `prepare_predefined_data`, pass through `combine_agent` | -| `data_analysis_agent.py` | Track ERA5/DestinE Zarr outputs from tool intermediate steps | -| `streamlit_interface.py` | Rename tab, add Data tab with download buttons | - -### Works in all modes - -- **fast** — climate model CSVs + ERA5 climatology JSON -- **smart** — above + ERA5 time series Zarr -- **deep** — above + DestinE time series Zarr diff --git a/src/climsight/data_analysis_agent.py b/src/climsight/data_analysis_agent.py index 712bc8b..32a48bf 100644 --- a/src/climsight/data_analysis_agent.py +++ b/src/climsight/data_analysis_agent.py @@ -1,15 +1,18 @@ """ Data analysis agent for Climsight. -This agent mirrors PangaeaGPT's oceanographer/visualization style while operating -on local climatology. It filters context, then uses tools to extract or analyze -climate data, saving outputs into the sandbox. +Restructured as a LangGraph sub-graph with three stages: +1. filter_node — LLM filters context into an analysis brief +2. planner_node — LLM decides which ERA5/DestinE variables to download +3. download_node — ThreadPoolExecutor downloads ALL variables in parallel +4. analysis_node — AgentExecutor with Python REPL for analysis/visualization """ import json import logging import os -from typing import Any, Dict, List +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Dict, List, Optional from climsight_classes import AgentState @@ -19,8 +22,13 @@ from .utils import make_json_serializable from sandbox_utils import ensure_thread_id, ensure_sandbox_dirs, get_sandbox_paths from agent_helpers import create_standard_agent_executor -from tools.era5_retrieval_tool import create_era5_retrieval_tool -from tools.destine_retrieval_tool import create_destine_search_tool, create_destine_retrieval_tool +from tools.era5_retrieval_tool import create_era5_retrieval_tool, retrieve_era5_data +from tools.destine_retrieval_tool import ( + create_destine_search_tool, + create_destine_retrieval_tool, + retrieve_destine_data, + _search_destine_parameters, +) from tools.python_repl import CustomPythonREPLTool from tools.image_viewer import create_image_viewer_tool from tools.reflection_tools import reflect_tool @@ -30,6 +38,7 @@ ) from langchain_core.prompts import ChatPromptTemplate +from langsmith import traceable logger = logging.getLogger(__name__) @@ -170,20 +179,112 @@ def _build_filter_prompt() -> str: ) -def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon: float = None, - has_climate_data: bool = False, has_hazard_data: bool = False, - has_population_data: bool = False, has_era5_data: bool = False, - mode_config: dict = None) -> str: - """System prompt for tool-driven analysis - dynamically built based on config and mode. +# ========================================================================== +# Planner prompt — tells the LLM what downloads are available and asks it +# to output a structured JSON plan +# ========================================================================== + +def _create_planner_prompt( + has_era5_download: bool, + has_destine: bool, + lat: float = None, + lon: float = None, +) -> str: + """Build the system prompt for the download planner LLM.""" + sections = [ + "You are a data download planner for ClimSight's climate analysis agent.\n" + "Given an analysis brief, decide which ERA5 and/or DestinE variables to download.\n" + "You MUST output ONLY valid JSON — no commentary, no markdown fences.\n\n" + ] - NOTE: This agent only runs when python_REPL is enabled (use_powerful_data_analysis=True). - ERA5 climatology and predefined plots are already generated by prepare_predefined_data. - """ + if has_era5_download: + sections.append( + "## ERA5 Time Series (2015-2024, year-by-year)\n" + "Available variables:\n" + "- `t2` — 2m air temperature (K → °C)\n" + "- `cp` — convective precipitation (m)\n" + "- `lsp` — large-scale precipitation (m)\n" + " NOTE: `tp` does NOT exist. For total precipitation, download BOTH `cp` and `lsp`.\n" + "- `u10` — 10m U-wind component (m/s)\n" + "- `v10` — 10m V-wind component (m/s)\n" + "- `mslp` — mean sea level pressure (Pa)\n" + "- `sst` — sea surface temperature (only for coastal/ocean points)\n" + "- `sp` — surface pressure (Pa)\n" + "- `tcc` — total cloud cover (0-1)\n" + "- `sd` — snow depth (m)\n" + "- `skt` — skin temperature (K)\n" + "- `d2` — 2m dewpoint temperature (K)\n\n" + "Default date range: 2015-01-01 to 2024-12-31 (10 years).\n" + "Only download variables needed for the analysis — don't download everything.\n" + "Common patterns:\n" + "- Temperature analysis → t2\n" + "- Precipitation analysis → cp + lsp (always both)\n" + "- Wind analysis → u10 + v10\n" + "- General climate overview → t2, cp, lsp\n\n" + ) + + if has_destine: + sections.append( + "## DestinE Climate Projections (SSP3-7.0, 2020-2039)\n" + "High-resolution IFS-NEMO model. 82 parameters available.\n" + "You specify search queries (natural language), and the system will find the right param_ids.\n" + "Default date range: 20200101-20391231 (full 20 years).\n" + "Common searches:\n" + "- 'temperature at 2 meters'\n" + "- 'total precipitation'\n" + "- 'wind speed at 10 meters' (returns u10 and v10 separately)\n" + "- 'sea surface temperature'\n\n" + ) + + coord_text = "" + if lat is not None and lon is not None: + coord_text = f"Coordinates: lat={lat}, lon={lon}\n" + + sections.append( + "## Output Format\n" + f"{coord_text}" + "Return a JSON object with this structure:\n" + "{{\n" + ' "era5_downloads": [\n' + ' {{"variable_id": "t2", "start_date": "2015-01-01", "end_date": "2024-12-31"}}\n' + " ],\n" + ' "destine_searches": ["temperature at 2 meters", "total precipitation"],\n' + ' "destine_date_range": {{"start": "20200101", "end": "20391231"}},\n' + ' "reasoning": "Brief explanation of why these variables are needed"\n' + "}}\n\n" + "Rules:\n" + "- If ERA5 download is not available, set era5_downloads to []\n" + "- If DestinE is not available, set destine_searches to []\n" + "- For precipitation, ALWAYS include BOTH cp and lsp (never just one)\n" + "- For wind, include BOTH u10 and v10\n" + "- If the analysis only needs climatology comparison (no trends), set all to [] — " + "existing data is sufficient\n" + ) + + return "".join(sections) + + +# ========================================================================== +# Analysis prompt — same as the old _create_tool_prompt but WITHOUT download +# tool sections. Download results are injected as pre-available data. +# ========================================================================== + +def _create_analysis_prompt( + datasets_text: str, + config: dict, + lat: float = None, + lon: float = None, + has_climate_data: bool = False, + has_hazard_data: bool = False, + has_population_data: bool = False, + has_era5_data: bool = False, + mode_config: dict = None, + downloaded_data_text: str = "", +) -> str: + """System prompt for the analysis agent — no download tools, all data pre-fetched.""" if mode_config is None: mode_config = resolve_analysis_config(config) - has_era5_download = mode_config.get("use_era5_data", config.get("use_era5_data", False)) - has_destine = mode_config.get("use_destine_data", config.get("use_destine_data", False)) has_python_repl = mode_config.get("use_powerful_data_analysis", True) hard_limit = mode_config.get("hard_tool_limit", 30) @@ -191,7 +292,10 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon max_per_resp = mode_config.get("max_per_response", 4) max_reflect = mode_config.get("max_reflect", 2) - # --- Build prompt without f-strings for code blocks to avoid brace escaping --- + # Reduce budget since downloads are already done + # Estimate: downloads would have used ~3-6 tool calls + adjusted_limit = max(hard_limit - 6, 10) + sections = [] # ── ROLE ────────────────────────────────────────────────────────────── @@ -201,9 +305,12 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon ] if has_python_repl: role_lines.append( - "You have a persistent Python REPL, pre-extracted data files, and optional ERA5 download access.\n\n" + "You have a persistent Python REPL, pre-extracted data files, and pre-downloaded " + "ERA5/DestinE time series (if any).\n" + "ALL data downloads have already been completed — you do NOT have download tools.\n" + "Focus entirely on analysis and visualization.\n\n" "CRITICAL EFFICIENCY RULES:\n" - f"- HARD LIMIT: {hard_limit} tool calls total for the entire session.\n" + f"- HARD LIMIT: {adjusted_limit} tool calls total for the entire session.\n" f"- MAX {max_per_resp} tool calls per response. Never fire {max_per_resp + 1}+ tools in a single response.\n" "- Write focused Python scripts — each one should accomplish a meaningful chunk of work.\n" "- SEQUENTIAL ordering: first Python_REPL to generate plots, THEN reflect_on_image in a LATER response.\n" @@ -214,7 +321,7 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon role_lines.append( "You have pre-extracted data files and predefined plots.\n\n" "CRITICAL EFFICIENCY RULES:\n" - f"- HARD LIMIT: {hard_limit} tool calls total for the entire session.\n" + f"- HARD LIMIT: {adjusted_limit} tool calls total for the entire session.\n" f"- MAX {max_per_resp} tool calls per response.\n" f"- Ideal session: {ideal_calls} tool calls total.\n" ) @@ -241,6 +348,13 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon "- Columns: Month, mean2t (°C), cp (convective precip, m), lsp (large-scale precip, m), wind_u, wind_v, wind_speed, wind_direction\n" ) + # ── 1b. DOWNLOADED TIME SERIES (from planner/download stage) ────────── + if downloaded_data_text: + sections.append( + "### Downloaded Time Series (pre-fetched by download planner)\n" + + downloaded_data_text + "\n" + ) + # ── 2. PRE-GENERATED PLOTS ──────────────────────────────────────────── predefined_plots = [] if has_climate_data: @@ -260,76 +374,9 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon else: sections.append("## 2. PRE-GENERATED PLOTS\n\nNone yet — you may create any visualizations needed.\n") - # ── 3. ERA5 TIME SERIES (optional download) ────────────────────────── - if has_era5_download: - sections.append( - "## 3. ERA5 TIME SERIES DOWNLOAD (year-by-year data)\n\n" - "Use `retrieve_era5_data` to download full annual time series (2015-2024) from Earthmover.\n" - "This gives you YEAR-BY-YEAR values — far richer than the 10-year climatology average above.\n\n" - "When to download:\n" - "- You need to detect warming/drying TRENDS → download t2, cp, and lsp (sum cp+lsp for total precip)\n" - "- You need interannual variability or extreme-year identification → download the relevant variables\n" - "- You only need monthly climatology for comparison → skip, use era5_climatology.json\n\n" - "**PARALLEL DOWNLOADS**: Call `retrieve_era5_data` for ALL needed variables in a SINGLE response.\n" - "Example: if you need t2, cp, and lsp — call all three retrieve_era5_data in ONE response, not sequentially.\n\n" - "Tool parameters:\n" - "- Variable codes: `t2` (temperature), `cp` (convective precip), `lsp` (large-scale precip), `u10`/`v10` (wind), `mslp` (pressure)\n" - "- NOTE: `tp` (total precipitation) is NOT available. Use `cp` + `lsp` and sum them for total precipitation.\n" - "- Always pass `work_dir='.'`\n" - "- Output: Zarr store saved to `era5_data/` folder\n\n" - "Loading ERA5 Zarr in Python_REPL:\n" - ) - # Code example WITHOUT f-string to avoid brace escaping - sections.append( - "```python\n" - "import xarray as xr, glob\n" - "era5_files = glob.glob('era5_data/*.zarr')\n" - "print(era5_files)\n" - "ds = xr.open_dataset(era5_files[0], engine='zarr', chunks={{}})\n" - "data = ds['t2'].to_series()\n" - "```\n" - ) - - # ── 3b. DESTINE CLIMATE PROJECTIONS ───────────────────────────────── - if has_destine: - sections.append( - "## 3b. DESTINE CLIMATE PROJECTIONS (SSP3-7.0, 82 parameters)\n\n" - "You have access to the DestinE Climate DT — high-resolution projections (IFS-NEMO, 2020-2039).\n" - "Use a TWO-STEP workflow:\n\n" - "**Step 1: Search for ALL needed parameters FIRST**\n" - "Before downloading anything, decide which variables you need (temperature, precipitation, wind, etc.).\n" - "Call `search_destine_parameters` for each query to collect all param_ids and levtypes.\n" - "Example: search_destine_parameters('temperature at 2 meters') → param_id='167', levtype='sfc'\n\n" - "**Step 2: Download ALL variables IN PARALLEL**\n" - "Once you have all param_ids, call `retrieve_destine_data` for ALL of them in a SINGLE response.\n" - "The tools support parallel execution — multiple retrieve calls in one response run concurrently.\n" - "This is MUCH faster than downloading one variable at a time sequentially.\n\n" - "Example: if you need temperature (167) and precipitation (228), call BOTH retrieve_destine_data\n" - "in the SAME response, not one after the other in separate responses.\n\n" - "- Dates: YYYYMMDD format, range 20200101-20391231\n" - "- **By default request the FULL period**: start_date=20200101, end_date=20391231 (20 years of projections)\n" - "- Only use a shorter range if the user explicitly asks for a specific period\n" - "- Output: Zarr store saved to `destine_data/` folder\n\n" - "Loading DestinE data in Python_REPL:\n" - ) - sections.append( - "```python\n" - "import xarray as xr, glob\n" - "destine_files = glob.glob('destine_data/*.zarr')\n" - "print(destine_files)\n" - "ds = xr.open_dataset(destine_files[0], engine='zarr', chunks={{}})\n" - "print(ds)\n" - "```\n" - ) - - # ── 4. TOOLS ────────────────────────────────────────────────────────── - sections.append("## 4. AVAILABLE TOOLS\n") + # ── 3. AVAILABLE TOOLS ──────────────────────────────────────────────── + sections.append("## 3. AVAILABLE TOOLS\n") tools_list = [] - if has_era5_download: - tools_list.append("- **retrieve_era5_data** — download ERA5 year-by-year time series (see section 3)") - if has_destine: - tools_list.append("- **search_destine_parameters** — find DestinE parameters via RAG search (see section 3b)") - tools_list.append("- **retrieve_destine_data** — download DestinE time series (see section 3b)") if has_python_repl: tools_list.append( "- **Python_REPL** — execute Python code in a sandboxed environment.\n" @@ -357,9 +404,9 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon tools_list.append("- **wise_agent** — ask for visualization strategy advice before coding") sections.append("\n".join(tools_list) + "\n") - # ── 5. WORKFLOW ─────────────────────────────────────────────────────── + # ── 4. WORKFLOW ─────────────────────────────────────────────────────── step = 1 - sections.append("## 5. REQUIRED WORKFLOW\n") + sections.append("## 4. REQUIRED WORKFLOW\n") sections.append( f"**Step {step} — Explore and load data:**\n" @@ -409,13 +456,20 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon ) step += 1 - if has_era5_download: + if downloaded_data_text: + sections.append( + f"**Step {step} — Load downloaded time series:**\n" + "ERA5/DestinE data was pre-downloaded by the planner. Load the Zarr files:\n\n" + ) sections.append( - f"**Step {step} — (Optional) Download ERA5/DestinE data — ALL IN PARALLEL:**\n" - "Decide which variables you need, then call ALL download tools in a SINGLE response.\n" - "ERA5: call `retrieve_era5_data` for t2, cp, lsp simultaneously.\n" - "DestinE: call `search_destine_parameters` first, then call `retrieve_destine_data` for all param_ids in one response.\n" - "Load the resulting Zarr files in Python_REPL (see sections 3/3b for loading patterns).\n" + "```python\n" + "import xarray as xr, glob\n" + "# ERA5 time series\n" + "era5_files = glob.glob('era5_data/*.zarr')\n" + "for f in era5_files:\n" + " ds = xr.open_dataset(f, engine='zarr', chunks={{}})\n" + " print(f, list(ds.data_vars))\n" + "```\n\n" ) step += 1 @@ -449,9 +503,9 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon "- Do NOT give up or skip re-plotting — the fixes are usually simple (font sizes, legend position)\n" ) - # ── 6. PLOTTING CONVENTIONS ─────────────────────────────────────────── + # ── 5. PLOTTING CONVENTIONS ─────────────────────────────────────────── sections.append( - "\n## 6. PLOTTING CONVENTIONS\n\n" + "\n## 5. PLOTTING CONVENTIONS\n\n" "Every plot you create MUST follow these rules:\n" "- `plt.figure(figsize=(12, 6))` — wide-format for readability\n" "- `plt.savefig('results/filename.png', dpi=150, bbox_inches='tight')`\n" @@ -465,9 +519,9 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon "- Use `plt.tight_layout()` or `bbox_inches='tight'` to prevent label clipping\n" ) - # ── 7. ERROR RECOVERY ───────────────────────────────────────────────── + # ── 6. ERROR RECOVERY ───────────────────────────────────────────────── sections.append( - "\n## 7. ERROR RECOVERY\n\n" + "\n## 6. ERROR RECOVERY\n\n" "MOST COMMON ERRORS (fix these FIRST):\n" "- `ValueError: invalid literal for int()` on Month → Month column has string names like 'January'.\n" " FIX: Use month_map dict to convert (see Step 1 code example).\n" @@ -483,14 +537,14 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon "- Empty DataFrame? → Print `df.head()` and `df.columns.tolist()` to inspect structure.\n" ) - # ── 8. SANDBOX PATHS ────────────────────────────────────────────────── + # ── 7. SANDBOX PATHS ────────────────────────────────────────────────── sections.append( - f"\n## 8. SANDBOX PATHS AND DATA\n\n{datasets_text}\n" + f"\n## 7. SANDBOX PATHS AND DATA\n\n{datasets_text}\n" ) - # ── 9. PROACTIVE ANALYSIS ───────────────────────────────────────────── + # ── 8. PROACTIVE ANALYSIS ───────────────────────────────────────────── sections.append( - "\n## 9. PROACTIVE ANALYSIS\n\n" + "\n## 8. PROACTIVE ANALYSIS\n\n" "Even if the user's query is vague, you SHOULD proactively:\n" "- Create a temperature trend visualization (all decades + ERA5 baseline)\n" "- Create a precipitation comparison chart\n" @@ -498,9 +552,9 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon "- Identify potential climate risks relevant to the query\n" ) - # ── 10. OUTPUT FORMAT ───────────────────────────────────────────────── + # ── 9. OUTPUT FORMAT ───────────────────────────────────────────────── sections.append( - "\n## 10. OUTPUT FORMAT\n\n" + "\n## 9. OUTPUT FORMAT\n\n" "Your final response MUST include:\n" ) if has_era5_data: @@ -523,16 +577,11 @@ def _create_tool_prompt(datasets_text: str, config: dict, lat: float = None, lon # ── TOOL BUDGET ─────────────────────────────────────────────────────── budget_lines = [ - f"\n## TOOL BUDGET (HARD LIMIT: {hard_limit} tool calls total, max {max_per_resp} per response)\n\n" - f"Plan your session carefully — you have at most {hard_limit} tool calls:\n" + f"\n## TOOL BUDGET (HARD LIMIT: {adjusted_limit} tool calls total, max {max_per_resp} per response)\n\n" + f"Plan your session carefully — you have at most {adjusted_limit} tool calls:\n" ] if has_python_repl: budget_lines.append("- Python_REPL: a few calls, each focused on ONE logical task\n") - if has_era5_download: - budget_lines.append("- retrieve_era5_data: 0-3 calls (one per variable: t2, cp, lsp)\n") - if has_destine: - budget_lines.append("- search_destine_parameters: 1-2 calls (find param_ids before downloading)\n") - budget_lines.append("- retrieve_destine_data: 0-3 calls (use full 2020-2039 range by default)\n") if max_reflect > 0: budget_lines.append(f"- reflect_on_image: one call per plot, max {max_reflect} total — QA ALL generated plots\n") budget_lines.append("- list_plotting_data_files / image_viewer: 0-2 calls\n") @@ -576,6 +625,79 @@ def _normalize_tool_observation(observation: Any) -> Any: return observation +# ========================================================================== +# Helper: process download results +# ========================================================================== + +def _process_download_results(download_results: list) -> dict: + """Process download results into references, downloadable_datasets, tool responses, and prompt text.""" + references = [] + downloadable_datasets = [] + era5_tool_response = "" + destine_tool_response = "" + era5_results = [] + destine_results = [] + downloaded_data_lines = [] + + for entry in download_results: + source = entry.get("source", "") + result = entry.get("result", {}) + if not isinstance(result, dict): + continue + + if result.get("reference"): + if result["reference"] not in references: + references.append(result["reference"]) + + zarr_path = result.get("output_path_zarr", "") + variable = result.get("variable", entry.get("params", {}).get("variable_id", "unknown")) + + if source == "era5": + era5_tool_response = str(result) + era5_results.append(result) + if zarr_path: + downloadable_datasets.append({ + "label": f"ERA5 Time Series: {variable}", + "path": zarr_path, + "source": "ERA5", + }) + downloaded_data_lines.append( + f"- ERA5 `{variable}`: `{zarr_path}` — load with " + f"`xr.open_dataset('{zarr_path}', engine='zarr', chunks={{{{}}}})`" + ) + + elif source == "destine": + destine_tool_response = str(result) + destine_results.append(result) + param_id = result.get("param_id", variable) + if zarr_path: + downloadable_datasets.append({ + "label": f"DestinE Time Series: {param_id}", + "path": zarr_path, + "source": "DestinE", + }) + downloaded_data_lines.append( + f"- DestinE param `{param_id}`: `{zarr_path}` — load with " + f"`xr.open_dataset('{zarr_path}', engine='zarr', chunks={{{{}}}})`" + ) + + downloaded_data_text = "\n".join(downloaded_data_lines) if downloaded_data_lines else "" + + return { + "references": references, + "downloadable_datasets": downloadable_datasets, + "era5_tool_response": era5_tool_response, + "destine_tool_response": destine_tool_response, + "era5_results": era5_results, + "destine_results": destine_results, + "downloaded_data_text": downloaded_data_text, + } + + +# ========================================================================== +# Main entry point — builds and runs the sub-graph +# ========================================================================== + def data_analysis_agent( state: AgentState, config: dict, @@ -584,7 +706,11 @@ def data_analysis_agent( stream_handler, llm_dataanalysis_agent=None, ): - """Run filtered analysis + tool-based climatology extraction.""" + """Run the data analysis sub-graph: filter → plan → download → analyze. + + Node functions are defined as closures to capture infrastructure objects + (LLM, stream_handler, config, etc.) without passing them through LangGraph state. + """ stream_handler.update_progress("Data analysis: preparing sandbox...") # Ensure sandbox paths are available. @@ -622,68 +748,7 @@ def data_analysis_agent( analysis_context = "\n\n".join(context_sections) - # Check if filter step is enabled (configurable) - use_filter_step = config.get("llm_dataanalysis", {}).get("use_filter_step", True) - - if use_filter_step and llm_dataanalysis_agent is not None: - stream_handler.update_progress("Data analysis: filtering context...") - filter_prompt = ChatPromptTemplate.from_messages( - [ - ("system", _build_filter_prompt()), - ("user", "{context}"), - ] - ) - result = llm_dataanalysis_agent.invoke(filter_prompt.format_messages(context=analysis_context)) - filtered_context = result.content if hasattr(result, "content") else str(result) - - # CRITICAL: Always preserve the user's original question - location_str = state.input_params.get('location_str', 'Unknown location') - analysis_brief = f"""USER QUESTION: {state.user} - -Location: {location_str} -Coordinates: {state.input_params.get('lat', '')}, {state.input_params.get('lon', '')} - -ANALYSIS REQUIREMENTS: -{filtered_context} -""" - else: - # Skip filter step - pass essential context directly - stream_handler.update_progress("Data analysis: preparing context (no filter)...") - location_str = state.input_params.get('location_str', 'Unknown location') - analysis_brief = f"""USER QUESTION: {state.user} - -Location: {location_str} -Coordinates: {state.input_params.get('lat', '')}, {state.input_params.get('lon', '')} - -Available climatology: -{climate_summary} - -Required analysis: -- Extract Temperature and Precipitation data -- Compare historical vs future projections -- Create visualizations if Python_REPL is available -""" - - state.data_analysis_prompt_text = analysis_brief - - brief_path = os.path.join(state.uuid_main_dir, "analysis_brief.txt") - with open(brief_path, "w", encoding="utf-8") as f: - f.write(analysis_brief) - - # Build simplified datasets_text for prompt - datasets_text = _build_datasets_text(state) - - # Build datasets dict for Python REPL - datasets = { - "uuid_main_dir": state.uuid_main_dir, - "results_dir": state.results_dir, - } - if state.climate_data_dir: - datasets["climate_data_dir"] = state.climate_data_dir - if state.era5_data_dir: - datasets["era5_data_dir"] = state.era5_data_dir - - # Get coordinates for prompt + # Get coordinates lat = state.input_params.get('lat') lon = state.input_params.get('lon') try: @@ -692,185 +757,416 @@ def data_analysis_agent( except (ValueError, TypeError): lat, lon = None, None - # Resolve effective config from analysis mode + overrides + # Resolve effective config effective = resolve_analysis_config(config) analysis_mode = config.get("analysis_mode", "smart") logger.info(f"data_analysis_agent: mode={analysis_mode}, effective={effective}") - # Tool setup for data_analysis_agent - # ERA5 climatology and predefined plots are already generated by prepare_predefined_data - tools = [] + # Check if filter step is enabled + use_filter_step = config.get("llm_dataanalysis", {}).get("use_filter_step", True) + + # Build datasets text + datasets_text = _build_datasets_text(state) - has_era5_data = config.get("era5_climatology", {}).get("enabled", True) - has_python_repl = effective.get("use_powerful_data_analysis", True) + # Initialize LLM + if llm_dataanalysis_agent is None: + from langchain_openai import ChatOpenAI + llm_dataanalysis_agent = ChatOpenAI( + openai_api_key=api_key, + model_name=config.get("llm_combine", {}).get("model_name", "gpt-4.1-nano"), + ) - # NOTE: ERA5 climatology and predefined plots are ALREADY generated by prepare_predefined_data - # The agent should use the pre-extracted data, not re-extract it logger.info(f"data_analysis_agent starting - predefined plots: {state.predefined_plots}") logger.info(f"ERA5 climatology available: {bool(state.era5_climatology_response)}") - # 3. ERA5 time series retrieval (if enabled) - if effective.get("use_era5_data", False): - arraylake_api_key = config.get("arraylake_api_key", "") - if arraylake_api_key: - tools.append(create_era5_retrieval_tool(arraylake_api_key)) + # ================================================================== + # Node functions as closures — capture llm, stream_handler, config, + # effective, api_key so they don't need to be in graph state. + # ================================================================== + + @traceable(name="filter_node") + def filter_node(gstate: dict) -> dict: + """Node 1: Filter context into an analysis brief via LLM.""" + if use_filter_step and llm_dataanalysis_agent is not None: + stream_handler.update_progress("Data analysis: filtering context...") + filter_prompt = ChatPromptTemplate.from_messages( + [ + ("system", _build_filter_prompt()), + ("user", "{context}"), + ] + ) + result = llm_dataanalysis_agent.invoke( + filter_prompt.format_messages(context=gstate["analysis_context"]) + ) + filtered_context = result.content if hasattr(result, "content") else str(result) + + analysis_brief = ( + f"USER QUESTION: {gstate['user_query']}\n\n" + f"Location: {gstate['location_str']}\n" + f"Coordinates: {gstate['lat']}, {gstate['lon']}\n\n" + f"ANALYSIS REQUIREMENTS:\n{filtered_context}\n" + ) else: - logger.warning("ERA5 data enabled but no arraylake_api_key in config. ERA5 retrieval tool not added.") + stream_handler.update_progress("Data analysis: preparing context (no filter)...") + analysis_brief = ( + f"USER QUESTION: {gstate['user_query']}\n\n" + f"Location: {gstate['location_str']}\n" + f"Coordinates: {gstate['lat']}, {gstate['lon']}\n\n" + f"Available climatology:\n{gstate.get('climate_summary', '')}\n\n" + "Required analysis:\n" + "- Extract Temperature and Precipitation data\n" + "- Compare historical vs future projections\n" + "- Create visualizations if Python_REPL is available\n" + ) + + return {"analysis_brief": analysis_brief} + + @traceable(name="planner_node") + def planner_node(gstate: dict) -> dict: + """Node 2: LLM decides which ERA5/DestinE variables to download.""" + has_era5_download = effective.get("use_era5_data", False) + has_destine = effective.get("use_destine_data", False) + + # If no download sources are available, skip planning + if not has_era5_download and not has_destine: + logger.info("planner_node: no download sources available, skipping") + return {"download_plan": {"era5": [], "destine": []}} + + g_lat = gstate["lat"] + g_lon = gstate["lon"] + analysis_brief = gstate["analysis_brief"] + + stream_handler.update_progress("Data analysis: planning downloads...") + + planner_prompt = ChatPromptTemplate.from_messages( + [ + ("system", _create_planner_prompt(has_era5_download, has_destine, g_lat, g_lon)), + ("user", "{brief}"), + ] + ) - # 3b. DestinE parameter search + data retrieval (if enabled) - if effective.get("use_destine_data", False): - tools.append(create_destine_search_tool(config)) - tools.append(create_destine_retrieval_tool()) + result = llm_dataanalysis_agent.invoke( + planner_prompt.format_messages(brief=analysis_brief) + ) + raw_text = result.content if hasattr(result, "content") else str(result) - # 4. Python REPL for analysis/visualization (if enabled) - if has_python_repl: - repl_tool = CustomPythonREPLTool( - datasets=datasets, - results_dir=state.results_dir, - session_key=thread_id, + # Parse the JSON output from the planner + try: + cleaned = raw_text.strip() + if cleaned.startswith("```"): + lines = cleaned.split("\n") + cleaned = "\n".join(lines[1:-1]) + plan = json.loads(cleaned) + except json.JSONDecodeError: + logger.warning(f"planner_node: failed to parse LLM output as JSON: {raw_text[:200]}") + plan = {"era5_downloads": [], "destine_searches": [], + "reasoning": "Parse error — skipping downloads"} + + logger.info(f"planner_node plan: {plan}") + + # Build the download plan + download_plan = {"era5": [], "destine": []} + + # ERA5 downloads + if has_era5_download: + arraylake_api_key = config.get("arraylake_api_key", "") + for item in plan.get("era5_downloads", []): + download_plan["era5"].append({ + "variable_id": item["variable_id"], + "start_date": item.get("start_date", "2015-01-01"), + "end_date": item.get("end_date", "2024-12-31"), + "min_latitude": g_lat if g_lat is not None else -90.0, + "max_latitude": g_lat if g_lat is not None else 90.0, + "min_longitude": g_lon if g_lon is not None else 0.0, + "max_longitude": g_lon if g_lon is not None else 359.75, + "work_dir": ".", + "arraylake_api_key": arraylake_api_key, + }) + + # DestinE: resolve search queries to param_ids, then build download tasks + if has_destine: + destine_settings = config.get("destine_settings", {}) + chroma_db_path = destine_settings.get("chroma_db_path", "data/destine/chroma_db") + collection_name = destine_settings.get( + "collection_name", "climate_parameters_with_usage_notes" + ) + dest_api_key = (config.get("openai_api_key", "") + or os.environ.get("OPENAI_API_KEY", "")) + + destine_date_range = plan.get( + "destine_date_range", {"start": "20200101", "end": "20391231"} + ) + + for query in plan.get("destine_searches", []): + stream_handler.update_progress( + f"Data analysis: searching DestinE for '{query}'..." + ) + search_result = _search_destine_parameters( + query=query, + k=3, + chroma_db_path=chroma_db_path, + collection_name=collection_name, + openai_api_key=dest_api_key, + ) + if search_result.get("success") and search_result.get("candidates"): + top = search_result["candidates"][0] + download_plan["destine"].append({ + "param_id": top["param_id"], + "levtype": top["levtype"], + "start_date": destine_date_range.get("start", "20200101"), + "end_date": destine_date_range.get("end", "20391231"), + "latitude": g_lat, + "longitude": g_lon, + "work_dir": ".", + }) + logger.info( + f"DestinE search '{query}' → param_id={top['param_id']} " + f"({top.get('name', '')})" + ) + else: + logger.warning(f"DestinE search failed for '{query}': {search_result}") + + total_downloads = len(download_plan["era5"]) + len(download_plan["destine"]) + logger.info(f"planner_node: {total_downloads} downloads planned " + f"(ERA5: {len(download_plan['era5'])}, DestinE: {len(download_plan['destine'])})") + + return {"download_plan": download_plan} + + @traceable(name="download_node") + def download_node(gstate: dict) -> dict: + """Node 3: Execute ALL downloads in parallel using ThreadPoolExecutor.""" + plan = gstate["download_plan"] + + tasks = [] + for item in plan.get("era5", []): + tasks.append(("era5", item)) + for item in plan.get("destine", []): + tasks.append(("destine", item)) + + if not tasks: + return {"download_results": []} + + total = len(tasks) + stream_handler.update_progress( + f"Data analysis: downloading {total} datasets in parallel..." + ) + logger.info(f"download_node: starting {total} parallel downloads") + + results = [] + max_workers = min(total, 6) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {} + for source, params in tasks: + if source == "era5": + fut = executor.submit(retrieve_era5_data, **params) + else: + fut = executor.submit(retrieve_destine_data, **params) + futures[fut] = (source, params) + + for fut in as_completed(futures): + source, params = futures[fut] + try: + dl_result = fut.result() + results.append({"source": source, "params": params, "result": dl_result}) + var_label = params.get("variable_id", params.get("param_id", "?")) + if dl_result.get("success"): + logger.info(f"download_node: {source} {var_label} succeeded") + else: + logger.warning( + f"download_node: {source} {var_label} returned error: " + f"{dl_result.get('error', '')}" + ) + except Exception as e: + logger.error(f"download_node: {source} download failed: {e}") + results.append({"source": source, "params": params, "error": str(e)}) + + stream_handler.update_progress(f"Data analysis: {len(results)} downloads completed.") + return {"download_results": results} + + @traceable(name="analysis_node") + def analysis_node(gstate: dict) -> dict: + """Node 4: Run the analysis AgentExecutor (Python REPL, image_viewer, reflect).""" + analysis_brief = gstate["analysis_brief"] + downloaded_data_text = gstate.get("downloaded_data_text", "") + + has_python_repl = effective.get("use_powerful_data_analysis", True) + max_reflect = effective.get("max_reflect", 2) + + # Build tool list (NO download tools) + tools = [] + + if has_python_repl: + datasets = { + "uuid_main_dir": gstate["uuid_main_dir"], + "results_dir": gstate["results_dir"], + } + if gstate.get("climate_data_dir"): + datasets["climate_data_dir"] = gstate["climate_data_dir"] + if gstate.get("era5_data_dir"): + datasets["era5_data_dir"] = gstate["era5_data_dir"] + + repl_tool = CustomPythonREPLTool( + datasets=datasets, + results_dir=gstate["results_dir"], + session_key=gstate["thread_id"], + ) + tools.append(repl_tool) + + tools.append(list_plotting_data_files_tool) + + vision_model = config.get("llm_combine", {}).get("model_name", "gpt-4o") + image_viewer_tool = create_image_viewer_tool( + openai_api_key=api_key, + model_name=vision_model, + sandbox_path=gstate["results_dir"], + ) + tools.append(image_viewer_tool) + + if has_python_repl: + if max_reflect > 0: + tools.append(reflect_tool) + tools.append(wise_agent_tool) + + stream_handler.update_progress("Data analysis: running analysis agent...") + + tool_prompt = _create_analysis_prompt( + gstate["datasets_text"], + config, + lat=gstate["lat"], + lon=gstate["lon"], + has_climate_data=gstate.get("has_climate_data", False), + has_hazard_data=gstate.get("has_hazard_data", False), + has_population_data=gstate.get("has_population_data", False), + has_era5_data=gstate.get("has_era5_data", False), + mode_config=effective, + downloaded_data_text=downloaded_data_text, ) - tools.append(repl_tool) - # 5. Helper tools - tools.append(list_plotting_data_files_tool) + max_iterations = effective.get("max_iterations", 20) + agent_executor = create_standard_agent_executor( + llm_dataanalysis_agent, + tools, + tool_prompt, + max_iterations=max_iterations, + ) - # 6. Image viewer - ALWAYS available (plots are saved to results/ folder) - vision_model = config.get("llm_combine", {}).get("model_name", "gpt-4o") - image_viewer_tool = create_image_viewer_tool( - openai_api_key=api_key, - model_name=vision_model, - sandbox_path=state.results_dir - ) - tools.append(image_viewer_tool) + agent_input = { + "input": analysis_brief or gstate["user_query"], + "messages": gstate.get("messages", []), + } + + result = agent_executor(agent_input) + + # Process intermediate steps for plot images + plot_images: List[str] = [] + for action, observation in result.get("intermediate_steps", []): + if action.tool in ("Python_REPL", "python_repl"): + obs = _normalize_tool_observation(observation) + if isinstance(obs, dict): + plot_images.extend(obs.get("plot_images", [])) + + analysis_text = result.get("output", "") + + return { + "analysis_text": analysis_text, + "plot_images": plot_images, + } + + # ================================================================== + # Run the pipeline: filter → plan → download (parallel) → analyze + # Sequential calls — each step is a closure with access to outer scope. + # Parallelism happens INSIDE download_node via ThreadPoolExecutor. + # ================================================================== + + # Shared mutable state dict for the pipeline + gstate = { + # Context for filter_node + "analysis_context": analysis_context, + "climate_summary": climate_summary, + "user_query": state.user, + "location_str": state.input_params.get("location_str", ""), + "lat": lat, + "lon": lon, + + # Sandbox paths + "uuid_main_dir": state.uuid_main_dir, + "results_dir": state.results_dir, + "climate_data_dir": state.climate_data_dir, + "era5_data_dir": state.era5_data_dir, + "destine_data_dir": state.destine_data_dir, + "thread_id": thread_id, + + # Data availability flags for analysis prompt + "has_climate_data": bool(state.df_list), + "has_hazard_data": state.hazard_data is not None, + "has_population_data": bool(state.population_config), + "has_era5_data": bool(state.era5_climatology_response), + + # For analysis_node tools + "datasets_text": datasets_text, + "messages": state.messages, - # 7. Image reflection and wise_agent - ONLY when Python REPL is enabled - if has_python_repl: - tools.append(reflect_tool) - tools.append(wise_agent_tool) - - stream_handler.update_progress("Data analysis: running tools...") - tool_prompt = _create_tool_prompt( - datasets_text, config, lat=lat, lon=lon, - has_climate_data=bool(state.df_list), - has_hazard_data=state.hazard_data is not None, - has_population_data=bool(state.population_config), - has_era5_data=bool(state.era5_climatology_response), - mode_config=effective, - ) + # Will be populated by nodes + "analysis_brief": "", + "download_plan": {"era5": [], "destine": []}, + "download_results": [], + "downloaded_data_text": "", + "analysis_text": "", + "plot_images": [], + } - if llm_dataanalysis_agent is None: - from langchain_openai import ChatOpenAI + # Step 1: Filter context + gstate.update(filter_node(gstate)) - llm_dataanalysis_agent = ChatOpenAI( - openai_api_key=api_key, - model_name=config.get("llm_combine", {}).get("model_name", "gpt-4.1-nano"), - ) + # Step 2: Plan downloads + gstate.update(planner_node(gstate)) - max_iterations = effective.get("max_iterations", 20) - agent_executor = create_standard_agent_executor( - llm_dataanalysis_agent, - tools, - tool_prompt, - max_iterations=max_iterations, - ) + # Step 3: Download (parallel) — only if there's something to download + if gstate["download_plan"].get("era5") or gstate["download_plan"].get("destine"): + gstate.update(download_node(gstate)) - agent_input = { - "input": analysis_brief or state.user, - "messages": state.messages, - } + # Step 3b: Process download results into prompt text + download_processed = _process_download_results(gstate.get("download_results", [])) + gstate["downloaded_data_text"] = download_processed["downloaded_data_text"] - result = agent_executor(agent_input) - - data_components_outputs = [] - plot_images: List[str] = [] - era5_climatology_output = None - agent_references: List[str] = [] # Collect references from agent tools - - for action, observation in result.get("intermediate_steps", []): - if action.tool == "get_era5_climatology": - obs = _normalize_tool_observation(observation) - if isinstance(obs, dict) and "error" not in obs: - era5_climatology_output = obs - state.era5_climatology_response = obs - # Collect reference from ERA5 climatology - if "reference" in obs: - agent_references.append(obs["reference"]) - if action.tool == "get_data_components": - obs = _normalize_tool_observation(observation) - data_components_outputs.append(obs) - # Collect references from get_data_components - if isinstance(obs, dict): - if "reference" in obs: - agent_references.append(obs["reference"]) - if "references" in obs: - agent_references.extend(obs["references"]) - if action.tool in ("Python_REPL", "python_repl"): - obs = _normalize_tool_observation(observation) - if isinstance(obs, dict): - plot_images.extend(obs.get("plot_images", [])) - if action.tool == "retrieve_era5_data": - # Handle ERA5 retrieval tool output - obs = _normalize_tool_observation(observation) - if isinstance(obs, dict): - era5_output = str(obs) - # Collect reference from ERA5 retrieval - if "reference" in obs: - agent_references.append(obs["reference"]) - # Track downloaded Zarr for Data tab - if "output_path_zarr" in obs: - variable = obs.get("variable", "unknown") - state.downloadable_datasets.append({ - "label": f"ERA5 Time Series: {variable}", - "path": obs["output_path_zarr"], - "source": "ERA5", - }) - elif hasattr(obs, 'content'): - era5_output = obs.content - else: - era5_output = str(obs) - # Store in state - state.era5_tool_response = era5_output - state.input_params.setdefault("era5_results", []).append(obs) - if action.tool == "retrieve_destine_data": - obs = _normalize_tool_observation(observation) - if isinstance(obs, dict): - if "reference" in obs: - agent_references.append(obs["reference"]) - # Track downloaded Zarr for Data tab - if "output_path_zarr" in obs: - variable = obs.get("variable", obs.get("parameter", "unknown")) - state.downloadable_datasets.append({ - "label": f"DestinE Time Series: {variable}", - "path": obs["output_path_zarr"], - "source": "DestinE", - }) - state.destine_tool_response = str(obs) - state.input_params.setdefault("destine_results", []).append(obs) + # Step 4: Run analysis agent + gstate.update(analysis_node(gstate)) + + # ── Save analysis brief ────────────────────────────────────────────── + analysis_brief = gstate.get("analysis_brief", "") + brief_path = os.path.join(state.uuid_main_dir, "analysis_brief.txt") + try: + with open(brief_path, "w", encoding="utf-8") as f: + f.write(analysis_brief) + except Exception as e: + logger.warning(f"Could not save analysis brief: {e}") - # Add agent-collected references to state.references (deduplicate) - for ref in agent_references: + state.data_analysis_prompt_text = analysis_brief + + # Update state with download results + state.downloadable_datasets.extend(download_processed["downloadable_datasets"]) + for ref in download_processed["references"]: if ref and ref not in state.references: state.references.append(ref) - analysis_text = result.get("output", "") - - # Append ERA5 climatology summary if available - if era5_climatology_output: - analysis_text += "\n\n### ERA5 Observational Baseline (2015-2025)\n" - analysis_text += f"Location: {era5_climatology_output.get('extracted_location', {})}\n" - if "variables" in era5_climatology_output: - for var_name, var_data in era5_climatology_output["variables"].items(): - analysis_text += f"\n**{var_data.get('full_name', var_name)}** ({var_data.get('units', '')}):\n" - monthly = var_data.get("monthly_values", {}) - # Show a few key months - for month in ["January", "April", "July", "October"]: - if month in monthly: - analysis_text += f" {month}: {monthly[month]}\n" - - if data_components_outputs: - analysis_text += "\n\n### Climate Model Extracts:\n" - for item in data_components_outputs: - analysis_text += json.dumps(item, indent=2) + "\n" + if download_processed["era5_tool_response"]: + state.era5_tool_response = download_processed["era5_tool_response"] + if download_processed["destine_tool_response"]: + state.destine_tool_response = download_processed["destine_tool_response"] + for r in download_processed.get("era5_results", []): + state.input_params.setdefault("era5_results", []).append(r) + for r in download_processed.get("destine_results", []): + state.input_params.setdefault("destine_results", []).append(r) + + # Inject downloaded data text into analysis results + if download_processed["downloaded_data_text"]: + # This was already used in analysis_node prompt; also store for reference + state.input_params["downloaded_data_text"] = download_processed["downloaded_data_text"] + + # Get analysis results + analysis_text = gstate.get("analysis_text", "") + plot_images = gstate.get("plot_images", []) # Combine all plot images (pre-generated + agent-generated) all_plot_images = state.predefined_plots + plot_images @@ -888,6 +1184,6 @@ def data_analysis_agent( "era5_climatology_response": state.era5_climatology_response, "era5_tool_response": getattr(state, 'era5_tool_response', None), "destine_tool_response": getattr(state, 'destine_tool_response', None), - "references": state.references, # Propagate collected references + "references": state.references, "downloadable_datasets": state.downloadable_datasets, } From 048d1805dd23c54193cd2bc145e660a9b5671613 Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Tue, 3 Mar 2026 20:57:43 +0100 Subject: [PATCH 2/9] Add download progress messages, stop leaking Arraylake API key - Show per-variable start/done/fail progress via stream_handler - Remove arraylake_api_key from config dict; set env var instead - Add @traceable(hide_inputs=True) on pipeline nodes for LangSmith --- src/climsight/data_analysis_agent.py | 44 ++++++++++++++++++++-------- src/climsight/streamlit_interface.py | 8 ++--- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/src/climsight/data_analysis_agent.py b/src/climsight/data_analysis_agent.py index 32a48bf..31d9187 100644 --- a/src/climsight/data_analysis_agent.py +++ b/src/climsight/data_analysis_agent.py @@ -784,7 +784,7 @@ def data_analysis_agent( # effective, api_key so they don't need to be in graph state. # ================================================================== - @traceable(name="filter_node") + @traceable(name="filter_node", hide_inputs=True) def filter_node(gstate: dict) -> dict: """Node 1: Filter context into an analysis brief via LLM.""" if use_filter_step and llm_dataanalysis_agent is not None: @@ -821,7 +821,7 @@ def filter_node(gstate: dict) -> dict: return {"analysis_brief": analysis_brief} - @traceable(name="planner_node") + @traceable(name="planner_node", hide_inputs=True) def planner_node(gstate: dict) -> dict: """Node 2: LLM decides which ERA5/DestinE variables to download.""" has_era5_download = effective.get("use_era5_data", False) @@ -869,7 +869,6 @@ def planner_node(gstate: dict) -> dict: # ERA5 downloads if has_era5_download: - arraylake_api_key = config.get("arraylake_api_key", "") for item in plan.get("era5_downloads", []): download_plan["era5"].append({ "variable_id": item["variable_id"], @@ -880,7 +879,6 @@ def planner_node(gstate: dict) -> dict: "min_longitude": g_lon if g_lon is not None else 0.0, "max_longitude": g_lon if g_lon is not None else 359.75, "work_dir": ".", - "arraylake_api_key": arraylake_api_key, }) # DestinE: resolve search queries to param_ids, then build download tasks @@ -932,7 +930,7 @@ def planner_node(gstate: dict) -> dict: return {"download_plan": download_plan} - @traceable(name="download_node") + @traceable(name="download_node", hide_inputs=True) def download_node(gstate: dict) -> dict: """Node 3: Execute ALL downloads in parallel using ThreadPoolExecutor.""" plan = gstate["download_plan"] @@ -953,12 +951,22 @@ def download_node(gstate: dict) -> dict: logger.info(f"download_node: starting {total} parallel downloads") results = [] + completed = 0 max_workers = min(total, 6) + # Build human-readable labels for progress messages + def _label(source, params): + if source == "era5": + return f"ERA5 {params.get('variable_id', '?')}" + return f"DestinE {params.get('param_id', '?')}" + with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} for source, params in tasks: + lbl = _label(source, params) + stream_handler.update_progress(f" ↳ starting download: {lbl}") if source == "era5": + # API key read from ARRAYLAKE_API_KEY env var inside retrieve_era5_data fut = executor.submit(retrieve_era5_data, **params) else: fut = executor.submit(retrieve_destine_data, **params) @@ -966,25 +974,35 @@ def download_node(gstate: dict) -> dict: for fut in as_completed(futures): source, params = futures[fut] + lbl = _label(source, params) try: dl_result = fut.result() results.append({"source": source, "params": params, "result": dl_result}) - var_label = params.get("variable_id", params.get("param_id", "?")) if dl_result.get("success"): - logger.info(f"download_node: {source} {var_label} succeeded") + completed += 1 + stream_handler.update_progress( + f" ✓ {lbl} done ({completed}/{total})" + ) + logger.info(f"download_node: {lbl} succeeded") else: - logger.warning( - f"download_node: {source} {var_label} returned error: " - f"{dl_result.get('error', '')}" + completed += 1 + err = dl_result.get('error', '') + stream_handler.update_progress( + f" ✗ {lbl} failed ({completed}/{total}): {err[:80]}" ) + logger.warning(f"download_node: {lbl} returned error: {err}") except Exception as e: - logger.error(f"download_node: {source} download failed: {e}") + completed += 1 + stream_handler.update_progress( + f" ✗ {lbl} error ({completed}/{total}): {str(e)[:80]}" + ) + logger.error(f"download_node: {lbl} failed: {e}") results.append({"source": source, "params": params, "error": str(e)}) - stream_handler.update_progress(f"Data analysis: {len(results)} downloads completed.") + stream_handler.update_progress(f"Data analysis: all {total} downloads finished.") return {"download_results": results} - @traceable(name="analysis_node") + @traceable(name="analysis_node", hide_inputs=True) def analysis_node(gstate: dict) -> dict: """Node 4: Run the analysis AgentExecutor (Python REPL, image_viewer, reflect).""" analysis_brief = gstate["analysis_brief"] diff --git a/src/climsight/streamlit_interface.py b/src/climsight/streamlit_interface.py index 8b181f3..89ca3b0 100644 --- a/src/climsight/streamlit_interface.py +++ b/src/climsight/streamlit_interface.py @@ -280,8 +280,8 @@ def _on_mode_change(): if not arraylake_api_key: st.error("Please provide an Arraylake API key to use ERA5 data retrieval.") st.stop() - # Store in config so data_analysis_agent can pass it to the tool - config["arraylake_api_key"] = arraylake_api_key + # Set env var so era5_retrieval_tool picks it up directly + os.environ["ARRAYLAKE_API_KEY"] = arraylake_api_key # Check DestinE token file if use_destine_data: @@ -315,8 +315,8 @@ def _on_mode_change(): if not arraylake_api_key: st.error("Please provide an Arraylake API key to use ERA5 data retrieval.") st.stop() - # Store in config so data_analysis_agent can pass it to the tool - config["arraylake_api_key"] = arraylake_api_key + # Set env var so era5_retrieval_tool picks it up directly + os.environ["ARRAYLAKE_API_KEY"] = arraylake_api_key # Check DestinE token file if use_destine_data: From df44b17b012a23feb947e399b63762cb800d71b9 Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Tue, 3 Mar 2026 21:09:18 +0100 Subject: [PATCH 3/9] rm md --- DATA_ANALYSIS_AGENT_PROMPT.md | 427 ---------------------------------- 1 file changed, 427 deletions(-) delete mode 100644 DATA_ANALYSIS_AGENT_PROMPT.md diff --git a/DATA_ANALYSIS_AGENT_PROMPT.md b/DATA_ANALYSIS_AGENT_PROMPT.md deleted file mode 100644 index 6b0bdfd..0000000 --- a/DATA_ANALYSIS_AGENT_PROMPT.md +++ /dev/null @@ -1,427 +0,0 @@ -# Data Analysis Agent — Full Prompt Overview - -This document shows the complete prompt sent to the data analysis agent LLM. -The prompt is dynamically assembled in `data_analysis_agent.py` from multiple sections. - -## How the Prompt is Assembled - -The agent uses `create_standard_agent_executor()` from `agent_helpers.py`, which creates a `ChatPromptTemplate` with: - -``` -[system] → tool_prompt (built by _create_tool_prompt()) -[user] → analysis_brief (filtered context from upstream agents) -[messages] → conversation history -[agent_scratchpad] → tool call/response pairs (managed by LangChain) -``` - ---- - -## Part 1: System Prompt (`_create_tool_prompt()`) - -Built dynamically based on config flags. Below is the **full prompt with all features enabled**. - ---- - -### ROLE - -``` -You are ClimSight's data analysis agent. -Your job: provide ADDITIONAL quantitative climate analysis beyond the standard plots. -You have a persistent Python REPL, pre-extracted data files, and optional ERA5 download access. - -CRITICAL EFFICIENCY RULES: -- HARD LIMIT: 30 tool calls total for the entire session. -- MAX 3-4 tool calls per response. Never fire 5+ tools in a single response. -- Write focused Python scripts — each one should accomplish a meaningful chunk of work. -- SEQUENTIAL ordering: first Python_REPL to generate plots, THEN reflect_on_image in a LATER response. - Never call reflect_on_image in the same response as Python_REPL. -- Ideal session: 3-4 REPL calls → 1-2 reflect calls → final answer. That is 6-7 tool calls total. -``` - -### 1. DATA ALREADY IN THE SANDBOX (do not re-extract) - -#### ERA5 Climatology (observational ground truth) -*Included when: ERA5 climatology is available* - -``` -- File: `era5_climatology.json` in sandbox root -- Content: monthly averages of t2m (°C), cp+lsp (precipitation, m), u10/v10 (m/s) — period 2015-2025 -- Load: `era5 = json.load(open('era5_climatology.json'))` -- Role: treat as GROUND TRUTH for validating model data. -``` - -#### Climate Model Data -*Included when: climate model data exists in sandbox* - -``` -- Manifest: `climate_data/climate_data_manifest.json` — READ FIRST to discover all simulations -- Data files: `climate_data/simulation_N.csv` + `simulation_N_meta.json` -- Shortcut: `climate_data/data.csv` = baseline simulation only -- Columns: Month, mean2t (°C), cp (convective precip, m), lsp (large-scale precip, m), - wind_u, wind_v, wind_speed, wind_direction -``` - -### 2. PRE-GENERATED PLOTS (already created — DO NOT recreate) - -*Included when: predefined plots exist* - -``` -- `results/climate_*.png` — temperature, precipitation, wind comparison with ERA5 overlay -- `results/disaster_counts.png` — historical disaster events by type -- `results/population_projection.png` — population trends - -Analyze the underlying DATA directly for your own insights. -Only use `image_viewer` on plots YOU create, not on these predefined ones. -``` - -### 3. ERA5 TIME SERIES DOWNLOAD (year-by-year data) - -*Included when: `use_era5_data: true`* - -``` -Use `retrieve_era5_data` to download full annual time series (2015-2024) from Earthmover. -This gives you YEAR-BY-YEAR values — far richer than the 10-year climatology average above. - -When to download: -- You need to detect warming/drying TRENDS → download t2, cp, and lsp (sum cp+lsp for total precip) -- You need interannual variability or extreme-year identification → download the relevant variables -- You only need monthly climatology for comparison → skip, use era5_climatology.json - -Tool parameters: -- Variable codes: `t2` (temperature), `cp` (convective precip), `lsp` (large-scale precip), - `u10`/`v10` (wind), `mslp` (pressure) -- NOTE: `tp` (total precipitation) is NOT available. Use `cp` + `lsp` and sum them. -- Always pass `work_dir='.'` -- Output: Zarr store saved to `era5_data/` folder - -Loading ERA5 Zarr in Python_REPL: -```python -import xarray as xr, glob -era5_files = glob.glob('era5_data/*.zarr') -print(era5_files) -ds = xr.open_dataset(era5_files[0], engine='zarr', chunks={}) -data = ds['t2'].to_series() -``` - -### 3b. DESTINE CLIMATE PROJECTIONS (SSP3-7.0, 82 parameters) - -*Included when: `use_destine_data: true`* - -``` -You have access to the DestinE Climate DT — high-resolution projections (IFS-NEMO, 2020-2039). -Use a TWO-STEP workflow: - -**Step 1: Search for parameters** -Call `search_destine_parameters` with a natural language query to find relevant parameters. -Example: search_destine_parameters('temperature at 2 meters') → returns candidates with param_id, levtype. - -**Step 2: Download data** -Call `retrieve_destine_data` with param_id and levtype from search results. -- Dates: YYYYMMDD format, range 20200101-20391231 -- **By default request the FULL period**: start_date=20200101, end_date=20391231 (20 years of projections) -- Only use a shorter range if the user explicitly asks for a specific period -- Output: Zarr store saved to `destine_data/` folder - -Loading DestinE data in Python_REPL: -```python -import xarray as xr, glob -destine_files = glob.glob('destine_data/*.zarr') -print(destine_files) -ds = xr.open_dataset(destine_files[0], engine='zarr', chunks={}) -print(ds) -``` - -### 4. AVAILABLE TOOLS - -``` -- **retrieve_era5_data** — download ERA5 year-by-year time series (see section 3) -- **search_destine_parameters** — find DestinE parameters via RAG search (see section 3b) -- **retrieve_destine_data** — download DestinE time series (see section 3b) -- **Python_REPL** — execute Python code in a sandboxed environment. - All files are relative to the sandbox root. - The `results/` directory is pre-created for saving plots. - Datasets are pre-loaded into the sandbox (see paths below). - STRATEGY: DIVIDE AND CONQUER. Split your work into a few focused scripts, - each tackling ONE logical task (e.g., load+explore, then analyze+plot-set-1, - then analyze+plot-set-2). This avoids cascading errors from monolithic scripts. - But don't go overboard with tiny one-liner calls either — find a reasonable balance. - Each script should be self-contained: import what it needs, do meaningful work, print results. -- **list_plotting_data_files** — discover files in sandbox directories -- **image_viewer** — view and analyze plots in `results/` (use relative paths) -- **reflect_on_image** — get quality feedback on a plot you created. - Call once per plot — reflect on ALL generated plots, not just one. - MUST be called in a SEPARATE response AFTER the Python_REPL that created the plots. - Always verify the file exists (via os.path.exists in REPL) BEFORE calling this tool. - MINIMUM ACCEPTABLE SCORE: 7/10. If score < 7, you MUST re-plot with fixes applied. - Read the fix suggestions from the reviewer and apply them in your next REPL call. -- **wise_agent** — ask for visualization strategy advice before coding -``` - -### 5. REQUIRED WORKFLOW - -``` -**Step 1 — Explore and load data:** -MANDATORY FIRST STEP: Load data files AND print their structure before any analysis. -This prevents cascading errors from wrong column names or data formats. - -CRITICAL DATA FORMAT WARNINGS: -- Month column may contain STRING NAMES ('January', 'February') — convert before using as int -- CSV paths in manifest are FILENAMES ONLY — always prepend 'climate_data/' -- Precipitation column may be 'tp' (total precip in mm) OR separate 'cp'/'lsp' (in meters) -- Always print df.columns.tolist() and df.head(2) BEFORE writing analysis code -``` - -```python -import os, json -import pandas as pd - -# 1. Load manifest and print structure -manifest = json.load(open('climate_data/climate_data_manifest.json')) -for e in manifest['entries']: - csv_path = os.path.join('climate_data', os.path.basename(e['csv'])) - print(e['years_of_averaging'], csv_path, '(baseline)' if e.get('main') else '') - -# 2. Load one CSV and inspect columns/dtypes -csv_path = os.path.join('climate_data', os.path.basename(manifest['entries'][0]['csv'])) -df = pd.read_csv(csv_path) -print('Columns:', df.columns.tolist()) -print('Dtypes:', df.dtypes.to_dict()) -print(df.head(2)) - -# 3. Convert Month column (handles both 'January' strings and integers) -month_map = {name: i+1 for i, name in enumerate( - ['January','February','March','April','May','June', - 'July','August','September','October','November','December'])} -if not pd.api.types.is_numeric_dtype(df['Month']): - df['Month'] = df['Month'].map(month_map) -df['Month'] = df['Month'].astype(int) -``` - -```python -# ERA5 observations (ground truth) -era5 = json.load(open('era5_climatology.json')) -era5_temp = era5['variables']['t2m']['monthly_values'] # dict: month_name → value -``` - -``` -**Step 2 — (Optional) Download ERA5 time series:** -Call `retrieve_era5_data` for `t2`, `cp`, and/or `lsp` if year-by-year analysis is needed. -Load the resulting Zarr files in Python_REPL (see section 3 for loading pattern). - -**Step 3 — Climatology analysis + comparison plots:** -Load ALL climate model CSVs from Step 1. Use the EXACT column names you printed in Step 1. -REMINDER: prepend 'climate_data/' to CSV filenames from manifest. -REMINDER: Convert Month strings ('January'→1) if needed (see Step 1 code). -REMINDER: Precipitation may be column 'tp' (already in mm) or 'cp'/'lsp' (in meters, multiply by 1000). -Compute monthly means, deltas between decades. -Create 2-3 comparison plots (temperature, precipitation, wind) saved to `results/`. -Print a concise summary of baseline values and projected changes. - -**Step 4 — Threshold & risk analysis + additional plots:** -If ERA5 time series were downloaded: compute threshold exceedances (heat days, frost days, -dry spells, wind extremes). Create 2-3 threshold/risk plots saved to `results/`. -Print quantitative risk metrics. If no ERA5 time series, skip this step. - -**Step 5 — Verify ALL plots (SEPARATE response, after plots exist):** -In a NEW response (never in the same response as Python_REPL), call `reflect_on_image` -once per generated plot — QA ALL of them, not just one. -MINIMUM SCORE: 7/10. If any plot scores below 7: -- Read the reviewer's fix suggestions carefully -- Write a NEW Python_REPL script applying those exact fixes -- Do NOT give up or skip re-plotting — the fixes are usually simple (font sizes, legend position) -``` - -### 6. PLOTTING CONVENTIONS - -``` -Every plot you create MUST follow these rules: -- `plt.figure(figsize=(12, 6))` — wide-format for readability -- `plt.savefig('results/filename.png', dpi=150, bbox_inches='tight')` -- `plt.close()` — ALWAYS close to prevent memory leaks -- Font sizes: title 14pt, axis labels 12pt, tick labels 10pt, legend 10pt -- Color palette: use scientific defaults — blue=#2196F3 cold, red=#F44336 hot, - green=#4CAF50 precipitation; use 'tab10' for multi-series -- ERA5 observations: always plot as BLACK solid line with circle markers ('k-o') -- Model projections: colored dashed lines, labeled by decade -- Include units on EVERY axis (°C, mm/month, m/s) -- Use `plt.tight_layout()` or `bbox_inches='tight'` to prevent label clipping -``` - -### 7. ERROR RECOVERY - -``` -MOST COMMON ERRORS (fix these FIRST): -- `ValueError: invalid literal for int()` on Month → Month column has string names like 'January'. - FIX: Use month_map dict to convert (see Step 1 code example). -- `FileNotFoundError: simulation_1.csv` → Manifest paths are filenames only. - FIX: Prepend 'climate_data/': `os.path.join('climate_data', os.path.basename(e['csv']))` -- `KeyError: 'cp'` or `'lsp'` → CSV column is 'tp' (total precip in mm), not cp/lsp. - FIX: Check df.columns.tolist() first, use whatever precipitation column exists. - -Other errors: -- File not found? → Run `list_plotting_data_files` to see available files and adapt paths. -- Zarr load fails? → Check `era5_data/` contents with `glob.glob('era5_data/*')`. -- Plot save fails? → Ensure `results/` dir exists: `os.makedirs('results', exist_ok=True)`. -- JSON parse error? → Print the file contents first, then fix the loading code. -- Empty DataFrame? → Print `df.head()` and `df.columns.tolist()` to inspect structure. -``` - -### 8. SANDBOX PATHS AND DATA - -*Dynamically generated from `_build_datasets_text(state)`:* - -``` -Available data directories: -- Climate data: 'climate_data/' -- ERA5 data: 'era5_data/' -- DestinE data: 'destine_data/' - -## Climate Data Files Available (in 'climate_data/' folder) -Files: data.csv, simulation_1.csv, simulation_1_meta.json, ... -Note: Load with `pd.read_csv('climate_data/data.csv')` -``` - -### 9. PROACTIVE ANALYSIS - -``` -Even if the user's query is vague, you SHOULD proactively: -- Create a temperature trend visualization (all decades + ERA5 baseline) -- Create a precipitation comparison chart -- Highlight the 3 months with the largest projected changes -- Identify potential climate risks relevant to the query -``` - -### 10. OUTPUT FORMAT - -``` -Your final response MUST include: -1. **Observed Climate** — current conditions from ERA5 (2015-2025 baseline) -2. **Model Performance** — how well projections match ERA5 observations -3. **Projected Changes** — future vs baseline, with magnitude and timing -4. **Critical Months** — months with largest changes or highest risk -5. **Visualizations** — list of created plot files in `results/` -6. **Implications** — interpretation relevant to the user's query -``` - -### TOOL BUDGET (HARD LIMIT: 30 tool calls total, max 3-4 per response) - -``` -Plan your session carefully — you have at most 30 tool calls: -- Python_REPL: a few calls, each focused on ONE logical task -- retrieve_era5_data: 0-3 calls (one per variable: t2, cp, lsp) -- search_destine_parameters: 1-2 calls (find param_ids before downloading) -- retrieve_destine_data: 0-3 calls (use full 2020-2039 range by default) -- reflect_on_image: one call per plot — QA ALL generated plots, not just one -- list_plotting_data_files / image_viewer: 0-2 calls -- wise_agent: 0-1 calls - -DIVIDE AND CONQUER — Python_REPL strategy: -- Script 1: Load ALL data, explore structure, print column names and shapes -- Script 2: Climatology analysis + comparison plots (temp, precip, wind) -- Script 3: Threshold/risk analysis + additional plots (if ERA5 time series available) -- Script 4 (if needed): Fix any errors from previous scripts, create missing plots - -WHY: One massive all-in-one script causes cascading errors — one bug kills everything. -Splitting into reasonable chunks lets you catch and fix errors between steps. - -ANTI-SPAM RULES: -- Never call more than 3-4 tools in a single response. -- Never call reflect_on_image in the same response as Python_REPL. -- Never call reflect_on_image more than twice total. -- Don't spam tiny one-liner REPL calls — each script should do meaningful work. -``` - ---- - -## Part 2: User Message (`analysis_brief`) - -The user message sent to the agent is either: - -### With filter LLM (default) - -A two-step process: -1. All upstream agent outputs are concatenated and sent to a filter LLM -2. The filter extracts actionable analysis requirements - -Filter prompt (`_build_filter_prompt()`): - -``` -You are a context filter for ClimSight's data analysis agent. -Your output will be consumed by an agent that has Python REPL, ERA5 data access, -and climate model data. Focus on what it should COMPUTE and PLOT. - -Extract ONLY actionable analysis requirements as concise bullets: -- Target variables with units (e.g., 'Temperature (°C)', 'Precipitation (mm/month)') -- Quantitative thresholds or criteria (e.g., 'days above 35°C', 'monthly rainfall < 50mm') -- Time ranges or scenario labels (e.g., '2020-2029 vs 2040-2049', 'SSP5-8.5') -- Spatial specifics (location name, coordinates, search radius) -- Requested analyses (trend detection, seasonal comparison, anomaly identification, custom plots) -- Mentioned crops, infrastructure, or decision topics (e.g., 'wheat cultivation', 'solar panel siting') - -Rules: -- Do NOT include raw climate data values or lengthy text passages. -- Do NOT include RAG or Wikipedia excerpts — only summarize their KEY requirements. -- Omit vague statements that cannot be translated into a computation or plot. -- If no specific analysis is requested, default to: temperature trends, precipitation comparison, - wind assessment, and a climate change signal summary. -``` - -The resulting filtered context is wrapped as: - -``` -USER QUESTION: {user's original question} - -Location: {location name} -Coordinates: {lat}, {lon} - -ANALYSIS REQUIREMENTS: -{filtered bullets from filter LLM} -``` - -### Without filter (fallback) - -``` -USER QUESTION: {user's original question} - -Location: {location name} -Coordinates: {lat}, {lon} - -Available climatology: -{ERA5 climatology summary} - -Required analysis: -- Extract Temperature and Precipitation data -- Compare historical vs future projections -- Create visualizations if Python_REPL is available -``` - ---- - -## Part 3: Agent Execution - -- **LLM**: `ChatOpenAI` with model from `config["llm_combine"]["model_name"]` (default: `gpt-4.1-nano`) -- **Agent type**: `create_openai_tools_agent` (OpenAI tools/function calling) -- **Max iterations**: 20 (each iteration = one LLM call + tool execution) -- **Tool calls per response**: LLM can call multiple tools in parallel (OpenAI native feature) - -### Registered Tools (when all features enabled) - -| Tool | Source | Purpose | -|------|--------|---------| -| `retrieve_era5_data` | `era5_retrieval_tool.py` | Download ERA5 time series via Arraylake | -| `search_destine_parameters` | `destine_retrieval_tool.py` | RAG search over 82 DestinE parameters | -| `retrieve_destine_data` | `destine_retrieval_tool.py` | Download DestinE projections via polytope | -| `Python_REPL` | `python_repl.py` | Sandboxed Python execution (Jupyter kernel) | -| `list_plotting_data_files` | `visualization_tools.py` | List files in sandbox directories | -| `image_viewer` | `image_viewer.py` | View and analyze plot images | -| `reflect_on_image` | `reflection_tools.py` | Quality feedback on generated plots | -| `wise_agent` | `visualization_tools.py` | Visualization strategy advice | - -### Post-Processing - -After agent execution, intermediate steps are scanned for: -- ERA5 climatology outputs → stored in state -- Data component outputs → collected with references -- Python REPL outputs → plot images collected -- ERA5/DestinE retrieval outputs → references collected -- All collected references → added to final state for combine_agent From 2178a3a87e9dbd9ce28cc08909a66104ecd38d73 Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Tue, 3 Mar 2026 21:24:29 +0100 Subject: [PATCH 4/9] update --- test/plot_destine_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plot_destine_data.py b/test/plot_destine_data.py index 87911dd..55423ea 100644 --- a/test/plot_destine_data.py +++ b/test/plot_destine_data.py @@ -3,7 +3,7 @@ import xarray as xr import matplotlib.pyplot as plt -zarr_path = "/Users/ikuznets/work/projects/climsight/code/climsight/tmp/sandbox/38c864498d174b8a90ebb24ac67cf70e/destine_data/destine_167_sfc_20200101_20211231.zarr" +zarr_path = "era5_cp_temporal_20200101_20291231.zarr" ds = xr.open_dataset(zarr_path, engine="zarr") From 5cd291fd46fa5f56d5a0ceb2648aa4363d652a85 Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Tue, 3 Mar 2026 21:33:38 +0100 Subject: [PATCH 5/9] Set ERA5 default range to 1975-2024, fix arraylake client cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Planner prompt now defaults to 1975-01-01–2024-12-31 for ERA5 - Explicitly close arraylake Client in finally block to avoid async finalizer error when running in ThreadPoolExecutor threads --- src/climsight/data_analysis_agent.py | 12 +++++++----- src/climsight/tools/era5_retrieval_tool.py | 6 ++++++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/climsight/data_analysis_agent.py b/src/climsight/data_analysis_agent.py index 31d9187..d17f0b5 100644 --- a/src/climsight/data_analysis_agent.py +++ b/src/climsight/data_analysis_agent.py @@ -199,7 +199,7 @@ def _create_planner_prompt( if has_era5_download: sections.append( - "## ERA5 Time Series (2015-2024, year-by-year)\n" + "## ERA5 Reanalysis Time Series (1979-2024)\n" "Available variables:\n" "- `t2` — 2m air temperature (K → °C)\n" "- `cp` — convective precipitation (m)\n" @@ -214,8 +214,10 @@ def _create_planner_prompt( "- `sd` — snow depth (m)\n" "- `skt` — skin temperature (K)\n" "- `d2` — 2m dewpoint temperature (K)\n\n" - "Default date range: 2015-01-01 to 2024-12-31 (10 years).\n" - "Only download variables needed for the analysis — don't download everything.\n" + "Default date range: 1975-01-01 to 2024-12-31 (50 years of observations).\n" + "ALWAYS use the full default range unless the user explicitly asks for a shorter period.\n" + "ERA5 downloads are fast, so prefer downloading the full range.\n" + "Only download variables relevant to the analysis — don't download everything.\n" "Common patterns:\n" "- Temperature analysis → t2\n" "- Precipitation analysis → cp + lsp (always both)\n" @@ -246,7 +248,7 @@ def _create_planner_prompt( "Return a JSON object with this structure:\n" "{{\n" ' "era5_downloads": [\n' - ' {{"variable_id": "t2", "start_date": "2015-01-01", "end_date": "2024-12-31"}}\n' + ' {{"variable_id": "t2", "start_date": "1975-01-01", "end_date": "2024-12-31"}}\n' " ],\n" ' "destine_searches": ["temperature at 2 meters", "total precipitation"],\n' ' "destine_date_range": {{"start": "20200101", "end": "20391231"}},\n' @@ -872,7 +874,7 @@ def planner_node(gstate: dict) -> dict: for item in plan.get("era5_downloads", []): download_plan["era5"].append({ "variable_id": item["variable_id"], - "start_date": item.get("start_date", "2015-01-01"), + "start_date": item.get("start_date", "1975-01-01"), "end_date": item.get("end_date", "2024-12-31"), "min_latitude": g_lat if g_lat is not None else -90.0, "max_latitude": g_lat if g_lat is not None else 90.0, diff --git a/src/climsight/tools/era5_retrieval_tool.py b/src/climsight/tools/era5_retrieval_tool.py index 36c6c92..464b253 100644 --- a/src/climsight/tools/era5_retrieval_tool.py +++ b/src/climsight/tools/era5_retrieval_tool.py @@ -144,6 +144,7 @@ def retrieve_era5_data( ARRAYLAKE_API_KEY environment variable. """ ds = None + client = None local_zarr_path = None query_type = "temporal" # Hardcoded for Climsight point-data focus @@ -325,6 +326,11 @@ def retrieve_era5_data( finally: if ds is not None: ds.close() + if client is not None: + try: + client.close() + except Exception: + pass # Suppress async loop conflicts during cleanup def create_era5_retrieval_tool(arraylake_api_key: str): """ From 73534aefdbb353ecfe6e4c71f480b299a22db065 Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Tue, 3 Mar 2026 21:36:54 +0100 Subject: [PATCH 6/9] Add per-download and per-source timing to download progress --- src/climsight/data_analysis_agent.py | 38 ++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/src/climsight/data_analysis_agent.py b/src/climsight/data_analysis_agent.py index d17f0b5..e236465 100644 --- a/src/climsight/data_analysis_agent.py +++ b/src/climsight/data_analysis_agent.py @@ -952,9 +952,13 @@ def download_node(gstate: dict) -> dict: ) logger.info(f"download_node: starting {total} parallel downloads") + import time as _time + results = [] completed = 0 max_workers = min(total, 6) + # Per-source elapsed times for summary + timings = {"era5": [], "destine": []} # Build human-readable labels for progress messages def _label(source, params): @@ -962,46 +966,64 @@ def _label(source, params): return f"ERA5 {params.get('variable_id', '?')}" return f"DestinE {params.get('param_id', '?')}" + start_times = {} + t_wall_start = _time.time() + with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} for source, params in tasks: lbl = _label(source, params) stream_handler.update_progress(f" ↳ starting download: {lbl}") if source == "era5": - # API key read from ARRAYLAKE_API_KEY env var inside retrieve_era5_data fut = executor.submit(retrieve_era5_data, **params) else: fut = executor.submit(retrieve_destine_data, **params) futures[fut] = (source, params) + start_times[fut] = _time.time() for fut in as_completed(futures): source, params = futures[fut] lbl = _label(source, params) + elapsed = _time.time() - start_times[fut] + timings[source].append(elapsed) try: dl_result = fut.result() results.append({"source": source, "params": params, "result": dl_result}) if dl_result.get("success"): completed += 1 stream_handler.update_progress( - f" ✓ {lbl} done ({completed}/{total})" + f" ✓ {lbl} done ({completed}/{total}) — {elapsed:.1f}s" ) - logger.info(f"download_node: {lbl} succeeded") + logger.info(f"download_node: {lbl} succeeded in {elapsed:.1f}s") else: completed += 1 err = dl_result.get('error', '') stream_handler.update_progress( - f" ✗ {lbl} failed ({completed}/{total}): {err[:80]}" + f" ✗ {lbl} failed ({completed}/{total}) — {elapsed:.1f}s: {err[:80]}" ) - logger.warning(f"download_node: {lbl} returned error: {err}") + logger.warning(f"download_node: {lbl} error in {elapsed:.1f}s: {err}") except Exception as e: completed += 1 stream_handler.update_progress( - f" ✗ {lbl} error ({completed}/{total}): {str(e)[:80]}" + f" ✗ {lbl} error ({completed}/{total}) — {elapsed:.1f}s: {str(e)[:80]}" ) - logger.error(f"download_node: {lbl} failed: {e}") + logger.error(f"download_node: {lbl} failed in {elapsed:.1f}s: {e}") results.append({"source": source, "params": params, "error": str(e)}) - stream_handler.update_progress(f"Data analysis: all {total} downloads finished.") + t_wall = _time.time() - t_wall_start + + # Per-source timing summary + parts = [] + if timings["era5"]: + t_max = max(timings["era5"]) + parts.append(f"ERA5 {len(timings['era5'])} vars in {t_max:.1f}s") + if timings["destine"]: + t_max = max(timings["destine"]) + parts.append(f"DestinE {len(timings['destine'])} vars in {t_max:.1f}s") + summary = f"all {total} downloads finished — wall time {t_wall:.1f}s" + if parts: + summary += f" ({', '.join(parts)})" + stream_handler.update_progress(f"Data analysis: {summary}") return {"download_results": results} @traceable(name="analysis_node", hide_inputs=True) From 1ec72dbaad4966491854b6d01a070512c2ab607a Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Wed, 4 Mar 2026 11:18:12 +0100 Subject: [PATCH 7/9] Improve analysis prompt: full time range, deeper statistics, no path leaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Instruct agent to analyze full ERA5 (1975-2024) and DestinE (2020-2039) ranges independently, not just the overlap period - Expand workflow from 2-3 plots to 4-6+ with trend analysis, extremes, distributions, seasonal decomposition, and anomaly plots - Clarify predefined plots already cover basic climatology — do not recreate - Strengthen output format: all computed numbers must appear in final text since raw REPL stdout is not forwarded to combine agent - Remove file paths from output format to prevent sandbox path leakage --- src/climsight/data_analysis_agent.py | 123 ++++++++++++++++++++------- 1 file changed, 92 insertions(+), 31 deletions(-) diff --git a/src/climsight/data_analysis_agent.py b/src/climsight/data_analysis_agent.py index e236465..852b06a 100644 --- a/src/climsight/data_analysis_agent.py +++ b/src/climsight/data_analysis_agent.py @@ -370,7 +370,10 @@ def _create_analysis_prompt( sections.append( "## 2. PRE-GENERATED PLOTS (already created — DO NOT recreate)\n\n" + "\n".join(predefined_plots) + "\n\n" - "Analyze the underlying DATA directly for your own insights.\n" + "These plots already cover basic monthly climatology comparisons (model decades vs ERA5).\n" + "DO NOT recreate similar plots — they are already done.\n" + "Instead, use the same underlying data for DEEPER analysis: trends, anomalies, extremes,\n" + "distributions, seasonal decomposition — things the predefined plots do NOT cover.\n" "Only use `image_viewer` on plots YOU create, not on these predefined ones.\n" ) else: @@ -462,36 +465,71 @@ def _create_analysis_prompt( sections.append( f"**Step {step} — Load downloaded time series:**\n" "ERA5/DestinE data was pre-downloaded by the planner. Load the Zarr files:\n\n" + "CRITICAL — analyze the FULL time range of each dataset:\n" + "- ERA5 covers ~1975–2024 (up to 50 years of observations). This is your primary record.\n" + " Compute long-term trends, decadal averages, anomalies from the 30-year climatological mean.\n" + "- DestinE covers 2020–2039 (20 years of high-resolution projections under SSP3-7.0).\n" + " Analyze the full projected range — trends, seasonal shifts, comparison to ERA5 baseline.\n" + "- Do NOT limit analysis to the overlap period (2020–2024). Each dataset tells its own story:\n" + " ERA5 = what happened over decades, DestinE = what is projected for the next two decades.\n" + "- Use the overlap (2020–2024) ONLY for validating DestinE against ERA5 observations.\n\n" ) sections.append( "```python\n" "import xarray as xr, glob\n" - "# ERA5 time series\n" + "# ERA5 time series (full historical record)\n" "era5_files = glob.glob('era5_data/*.zarr')\n" "for f in era5_files:\n" " ds = xr.open_dataset(f, engine='zarr', chunks={{}})\n" - " print(f, list(ds.data_vars))\n" + " print(f, list(ds.data_vars), 'time range:', str(ds.time.values[0])[:10], 'to', str(ds.time.values[-1])[:10])\n" + "# DestinE projections (if available)\n" + "destine_files = glob.glob('destine_data/*.zarr')\n" + "for f in destine_files:\n" + " ds = xr.open_dataset(f, engine='zarr', chunks={{}})\n" + " print(f, list(ds.data_vars), 'time range:', str(ds.time.values[0])[:10], 'to', str(ds.time.values[-1])[:10])\n" "```\n\n" ) step += 1 sections.append( - f"**Step {step} — Climatology analysis + comparison plots:**\n" + f"**Step {step} — Deeper climatology analysis (DO NOT re-plot predefined figures):**\n" + "IMPORTANT: Monthly climatology comparison plots (temperature, precipitation, wind vs ERA5)\n" + "are ALREADY generated as predefined plots in `results/climate_*.png`.\n" + "Do NOT recreate them. Instead, use the same underlying data for DEEPER analysis:\n\n" "Load ALL climate model CSVs from Step 1. Use the EXACT column names you printed in Step 1.\n" "REMINDER: prepend 'climate_data/' to CSV filenames from manifest.\n" "REMINDER: Convert Month strings ('January'→1) if needed (see Step 1 code).\n" - "REMINDER: Precipitation may be column 'tp' (already in mm) or 'cp'/'lsp' (in meters, multiply by 1000).\n" - "Compute monthly means, deltas between decades.\n" - "Create 2-3 comparison plots (temperature, precipitation, wind) saved to `results/`.\n" - "Print a concise summary of baseline values and projected changes.\n" + "REMINDER: Precipitation may be column 'tp' (already in mm) or 'cp'/'lsp' (in meters, multiply by 1000).\n\n" + "Focus on analysis that goes BEYOND the predefined plots:\n" + "- Decadal change analysis: compute and plot differences between earliest and latest decades\n" + "- Seasonal cycle shifts: how does the seasonal pattern change across decades?\n" + "- If ERA5 time series downloaded: annual mean time series over the full record (1975-2024)\n" + " with linear trend line, annotate slope (°C/decade or mm/decade)\n" + "- Multi-variable correlation: do temperature and precipitation changes correlate?\n" + "- Anomaly plots: departure from long-term mean per year or month\n" + "Print a concise summary of baseline values, projected changes, and their magnitudes.\n" ) step += 1 sections.append( - f"**Step {step} — Threshold & risk analysis + additional plots:**\n" - "If ERA5 time series were downloaded: compute threshold exceedances (heat days, frost days,\n" - "dry spells, wind extremes). Create 2-3 threshold/risk plots saved to `results/`.\n" - "Print quantitative risk metrics. If no ERA5 time series, skip this step.\n" + f"**Step {step} — Statistical analysis, trends, and extremes:**\n" + "Perform deep quantitative analysis on downloaded time series (4-6 plots minimum):\n\n" + "Trend analysis:\n" + "- Linear regression on annual means for each variable (scipy.stats.linregress)\n" + "- Report slope (per decade), R², and p-value for statistical significance\n" + "- Plot time series with trend line and confidence interval\n\n" + "Extreme value analysis:\n" + "- Threshold exceedances: heat days (>30°C), frost days (<0°C), heavy precip events, dry spells\n" + "- Compute annual counts of threshold exceedances and plot their trend over time\n" + "- Percentile analysis: 90th, 95th, 99th percentiles per decade — are extremes intensifying?\n\n" + "Distribution analysis:\n" + "- Compare distributions across decades (histograms or KDE plots)\n" + "- Seasonal decomposition: how do trends differ by season (DJF, MAM, JJA, SON)?\n\n" + "If DestinE projections available:\n" + "- Compare projected extremes (2020-2039) against ERA5 historical baseline (1975-2024)\n" + "- Quantify projected changes in threshold exceedances\n\n" + "Print ALL computed metrics — trends, p-values, exceedance counts, percentile shifts.\n" + "If no ERA5 time series, skip this step.\n" ) step += 1 @@ -547,33 +585,52 @@ def _create_analysis_prompt( # ── 8. PROACTIVE ANALYSIS ───────────────────────────────────────────── sections.append( "\n## 8. PROACTIVE ANALYSIS\n\n" - "Even if the user's query is vague, you SHOULD proactively:\n" - "- Create a temperature trend visualization (all decades + ERA5 baseline)\n" - "- Create a precipitation comparison chart\n" + "You have a generous tool budget — USE IT. More analysis is always better than less.\n" + "Think like a climate scientist writing a technical report. The user needs quantitative evidence.\n\n" + "Even if the user's query is vague, you MUST proactively:\n" + "- For EVERY downloaded variable: produce (a) full time series plot, (b) trend analysis with slope,\n" + " (c) anomaly plot relative to long-term mean, (d) seasonal breakdown\n" + "- Compute and report: trend significance (p-value), standard deviations, correlation between variables\n" "- Highlight the 3 months with the largest projected changes\n" "- Identify potential climate risks relevant to the query\n" + "- If both ERA5 and DestinE are available: create a combined timeline plot showing\n" + " historical observations transitioning into projections\n\n" + "Even if a finding won't make the final report, the analysis informs a better answer.\n" + "Do NOT stop after 2-3 plots — exploit your full tool budget for thorough analysis.\n" ) # ── 9. OUTPUT FORMAT ───────────────────────────────────────────────── sections.append( - "\n## 9. OUTPUT FORMAT\n\n" + "\n## 9. OUTPUT FORMAT (CRITICAL)\n\n" + "Your final text response is the ONLY information passed to the downstream combine agent.\n" + "Raw Python REPL output (print statements, statistics) is NOT forwarded — only YOUR written response.\n" + "Therefore you MUST include ALL quantitative results in your final response:\n" + "- Every computed number: trend slopes (°C/decade), p-values, R² values, percentiles\n" + "- Threshold exceedance counts, mean values, standard deviations\n" + "- For EVERY plot: describe what it shows and the key finding (do NOT include file paths in your text)\n\n" "Your final response MUST include:\n" ) if has_era5_data: sections.append( - "1. **Observed Climate** — current conditions from ERA5 (2015-2025 baseline)\n" - "2. **Model Performance** — how well projections match ERA5 observations\n" - "3. **Projected Changes** — future vs baseline, with magnitude and timing\n" - "4. **Critical Months** — months with largest changes or highest risk\n" - "5. **Visualizations** — list of created plot files in `results/`\n" - "6. **Implications** — interpretation relevant to the user's query\n" + "1. **Observed Climate** — current conditions from ERA5 with specific values\n" + " (e.g., 'Mean annual temperature: 10.3°C, warmest month: July at 21.5°C')\n" + "2. **Long-term Trends** — trend slope, significance, time period analyzed\n" + " (e.g., 'Temperature trend: +0.35°C/decade (p<0.01) over 1975-2024')\n" + "3. **Model Performance** — how well projections match ERA5 observations, with bias values\n" + "4. **Projected Changes** — future vs baseline with magnitude and timing\n" + " (e.g., 'Temperature +2.1°C by 2040s relative to 1995-2014 baseline')\n" + "5. **Extremes** — threshold exceedances, percentile shifts, with counts\n" + " (e.g., 'Heat days (>30°C): increased from 5/year (1975-1984) to 18/year (2015-2024)')\n" + "6. **Critical Months** — months with largest changes or highest risk, with values\n" + "7. **Visualizations** — for each plot: one-sentence description of what it shows and key finding\n" + "8. **Implications** — interpretation relevant to the user's query\n" ) else: sections.append( - "1. **Key Climate Values** — temperature, precipitation, wind summary\n" - "2. **Climate Change Signal** — future vs historical differences\n" - "3. **Critical Months** — months with largest changes\n" - "4. **Visualizations** — list of created plot files\n" + "1. **Key Climate Values** — temperature, precipitation, wind with specific numbers\n" + "2. **Climate Change Signal** — future vs historical differences with magnitudes\n" + "3. **Critical Months** — months with largest changes, with values\n" + "4. **Visualizations** — for each plot: one-sentence description of what it shows and key finding\n" "5. **Implications** — interpretation relevant to the user's query\n" ) @@ -592,12 +649,16 @@ def _create_analysis_prompt( if has_python_repl: budget_lines.append( "DIVIDE AND CONQUER — Python_REPL strategy:\n" - "- Script 1: Load ALL data, explore structure, print column names and shapes\n" - "- Script 2: Climatology analysis + comparison plots (temp, precip, wind)\n" - "- Script 3: Threshold/risk analysis + additional plots (if ERA5 time series available)\n" - "- Script 4 (if needed): Fix any errors from previous scripts, create missing plots\n\n" + "- Script 1: Load ALL data, explore structure, print column names, shapes, and time ranges\n" + "- Script 2: ERA5 long-term time series analysis + trend plots (full historical record)\n" + "- Script 3: Climatology comparison — model decades vs ERA5 baseline (monthly plots)\n" + "- Script 4: Extreme/threshold analysis + distribution plots (if time series available)\n" + "- Script 5: DestinE projection analysis — trends, comparison to ERA5 (if DestinE available)\n" + "- Script 6: Statistical summary — print all computed metrics for the final report\n" + "- Additional scripts: fix errors, re-plot after reflection feedback\n\n" "WHY: One massive all-in-one script causes cascading errors — one bug kills everything.\n" - "Splitting into reasonable chunks lets you catch and fix errors between steps.\n\n" + "Splitting into reasonable chunks lets you catch and fix errors between steps.\n" + "You have a large budget — use more scripts for deeper analysis, not fewer.\n\n" ) budget_lines.append( From eb0f1cad90af0569f784d84345240f4fc8d9c161 Mon Sep 17 00:00:00 2001 From: Ivan Kuznetsov Date: Wed, 4 Mar 2026 11:53:19 +0100 Subject: [PATCH 8/9] Add retry on ERA5 download failure (1 retry after 10s) --- src/climsight/tools/era5_retrieval_tool.py | 46 +++++++++++++--------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/src/climsight/tools/era5_retrieval_tool.py b/src/climsight/tools/era5_retrieval_tool.py index 464b253..6cbb3d9 100644 --- a/src/climsight/tools/era5_retrieval_tool.py +++ b/src/climsight/tools/era5_retrieval_tool.py @@ -278,35 +278,43 @@ def retrieve_era5_data( if subset.sizes.get('time', 0) == 0: return {"success": False, "error": "Empty time slice.", "message": "No data found for the requested dates."} - # --- 4. Save to Zarr (Atomic Write) --- + # --- 4. Save to Zarr (Atomic Write, with 1 retry) --- logging.info(f"Downloading data to {local_zarr_path}...") ds_out = subset.to_dataset(name=short_var) - + # Clear encoding for var in ds_out.variables: ds_out[var].encoding = {} # Atomic write: write to temp dir first, then rename on success temp_zarr_path = local_zarr_path + ".tmp" - - # Clean up any previous failed temp directory - if os.path.exists(temp_zarr_path): - shutil.rmtree(temp_zarr_path) - - try: - ds_out.to_zarr(temp_zarr_path, mode="w", consolidated=True, compute=True) - - # Atomic replace: remove old cache (if exists) and rename temp to final - if os.path.exists(local_zarr_path): - shutil.rmtree(local_zarr_path) - os.rename(temp_zarr_path, local_zarr_path) - logging.info("✅ Download complete.") - except Exception as write_error: - # Clean up failed temp directory + max_attempts = 2 + + for attempt in range(1, max_attempts + 1): + # Clean up any previous failed temp directory if os.path.exists(temp_zarr_path): - shutil.rmtree(temp_zarr_path, ignore_errors=True) - raise write_error + shutil.rmtree(temp_zarr_path) + + try: + ds_out.to_zarr(temp_zarr_path, mode="w", consolidated=True, compute=True) + + # Atomic replace: remove old cache (if exists) and rename temp to final + if os.path.exists(local_zarr_path): + shutil.rmtree(local_zarr_path) + os.rename(temp_zarr_path, local_zarr_path) + logging.info("✅ Download complete.") + break # success + except Exception as write_error: + # Clean up failed temp directory + if os.path.exists(temp_zarr_path): + shutil.rmtree(temp_zarr_path, ignore_errors=True) + if attempt < max_attempts: + import time as _time + logging.warning(f"ERA5 {short_var}: download failed (attempt {attempt}/{max_attempts}), retrying in 10s... Error: {write_error}") + _time.sleep(10) + else: + raise write_error return { "success": True, From 28445ef6904bcd805a61182587ba9dbbc1f5be6f Mon Sep 17 00:00:00 2001 From: dmpantiu Date: Wed, 4 Mar 2026 16:03:10 +0100 Subject: [PATCH 9/9] fix: prioritize UI api_key for DestinE parameter search The dest_api_key in planner_node now checks the api_key argument (passed from Streamlit UI) before falling back to config/env. Without this, users entering their OpenAI key via the UI would get DestinE search failures. --- src/climsight/data_analysis_agent.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/climsight/data_analysis_agent.py b/src/climsight/data_analysis_agent.py index 852b06a..48b5b2f 100644 --- a/src/climsight/data_analysis_agent.py +++ b/src/climsight/data_analysis_agent.py @@ -951,7 +951,8 @@ def planner_node(gstate: dict) -> dict: collection_name = destine_settings.get( "collection_name", "climate_parameters_with_usage_notes" ) - dest_api_key = (config.get("openai_api_key", "") + dest_api_key = (api_key + or config.get("openai_api_key", "") or os.environ.get("OPENAI_API_KEY", "")) destine_date_range = plan.get(