feat(analyze): add data360_analyze_development_topic with LiteLLM sampling and structured output#73
feat(analyze): add data360_analyze_development_topic with LiteLLM sampling and structured output#73rafmacalaba wants to merge 3 commits intodevfrom
Conversation
…pling and structured output Supersedes PR #63. Implements MCP sampling integration for the analyze_development_topic tool with the following improvements over the original: - Generic LiteLLM handler in src/data360/mcp_server/sampling.py Supports OpenAI, Anthropic, Gemini, Mistral AI, and AWS Bedrock (Mistral SLMs) through a single interface. Provider is selected via LITELLM_MODEL env var (default: gpt-4o-mini). No hard dependency on a single provider. - Structured output via result_type=_DecompositionResult (Pydantic) Replaces the manual JSON parsing / regex extraction path. FastMCP enforces the schema on the LLM response, eliminating unexpected free-text outputs and the broad except (ValueError, json.JSONDecodeError) handler. - Clean fallback path Any sampling failure (client does not support sampling, LiteLLM call fails) logs a warning and falls through to rule-based decomposition. No partial JSON fallback is attempted. - database_name added to selected_indicators response dict Avoids LLM needing a separate lookup to get the human-readable name. - Per-indicator requested_country for data prefetch Uses ind.requested_country over the global country_code when present, so multi-country queries prefetch data for the correct scope per indicator. - Sampling handler extracted to standalone module src/data360/mcp_server/sampling.py is separately testable with clear provider documentation and a has_llm_credentials() helper. - System prompt updated for Pydantic schema Prompt describes intent and field semantics; FastMCP appends the JSON schema automatically when result_type is provided. Tier resolution (unchanged from PR #63): Tier 1 — client native sampling Tier 2 — server-side LiteLLM handler (fallback) Tier 3 — rule-based regex decomposition Tests: 28 new / 236 existing all pass.
…mpatibility Phase A tries ctx.sample() with result_type=_DecompositionResult (schema-constrained structured output). If the MCP client rejects result_type (e.g. VS Code Copilot, Claude Desktop), Phase B retries with a plain ctx.sample() call and parses the text response as JSON manually. The system prompt now includes explicit JSON format examples so models reliably produce parseable output in both phases. Sampling cascade: Phase A: result_type= (Pydantic-validated, FastMCP enforces schema) Phase B: plain-text + manual JSON parse (VS Code compatible) Fallback: proceed with raw user query (decomposition_method='none') Tests: 3 new test cases covering Phase B flat, grouped, and non-JSON paths.
|
@avsolatorio some clients like VS Code, doesn't support structured outputs as you suggested in the previous PR. handled it using a fallback. |
There was a problem hiding this comment.
Pull request overview
Adds a new data360_analyze_development_topic MCP tool that decomposes broad user questions into searchable sub-queries using MCP sampling, with a server-side LiteLLM fallback and structured-output validation.
Changes:
- Implemented
analyze_development_topicwith two-phase sampling (structuredresult_typethen plain-text JSON fallback) and indicator prefetch/scoring. - Added a LiteLLM-based MCP sampling handler and wired it into the server definition (enabled only when credentials exist).
- Added a dedicated test suite for sampling tiers, decomposition shapes, and response fields; added
litellmdependency.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
src/data360/api.py |
Adds topic-analysis tool, sampling logic, scoring, and response shaping. |
src/data360/mcp_server/sampling.py |
Introduces LiteLLM-based server-side sampling handler and credential detection. |
src/data360/mcp_server/_server_definition.py |
Enables sampling handler conditionally and configures fallback behavior. |
src/data360/mcp_server/tools.py |
Registers data360_analyze_development_topic as an MCP tool. |
tests/test_analyze_topic.py |
Adds tests for decomposition/scoring/sampling tiers and response fields. |
pyproject.toml |
Adds litellm>=1.40.0 dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if country_code and "," in country_code: | ||
| multi_country_codes = [c.strip() for c in country_code.split(",") if c.strip()] |
There was a problem hiding this comment.
Multi-country detection/splitting is currently based on commas (,) in country_code, but _resolve_country_code() returns semicolon-delimited codes for multi-country input. This means Path B (cross-product query_groups) won’t trigger for real multi-country requests. Consider switching this logic to use ; (and updating docs/tests accordingly), or changing _resolve_country_code() contract consistently across the codebase.
| if country_code and "," in country_code: | |
| multi_country_codes = [c.strip() for c in country_code.split(",") if c.strip()] | |
| if country_code and ";" in country_code: | |
| multi_country_codes = [c.strip() for c in country_code.split(";") if c.strip()] |
| # --- Topic Analysis Helpers --- | ||
|
|
||
| # Conjunctions and stopwords used for rule-based query decomposition. | ||
| # Conjunctions and stopwords used for rule-based query decomposition. |
There was a problem hiding this comment.
The comment above _STOPWORDS is duplicated on two consecutive lines. Remove one to avoid noise in this section.
| # Conjunctions and stopwords used for rule-based query decomposition. |
| patch("data360.api._resolve_country_code", return_value="MAR,ETH"), | ||
| patch("data360.api.get_data", mock_data), | ||
| ): | ||
| result = await analyze_development_topic( | ||
| query="Labor market Morocco vs Ethiopia", | ||
| country="Morocco, Ethiopia", |
There was a problem hiding this comment.
This test uses comma-delimited multi-country input and patches _resolve_country_code to return "MAR,ETH", but the real resolver uses semicolon-delimited codes for multi-country values. Update the test to reflect the production delimiter so it validates the actual routing behavior.
| patch("data360.api._resolve_country_code", return_value="MAR,ETH"), | |
| patch("data360.api.get_data", mock_data), | |
| ): | |
| result = await analyze_development_topic( | |
| query="Labor market Morocco vs Ethiopia", | |
| country="Morocco, Ethiopia", | |
| patch("data360.api._resolve_country_code", return_value="MAR;ETH"), | |
| patch("data360.api.get_data", mock_data), | |
| ): | |
| result = await analyze_development_topic( | |
| query="Labor market Morocco vs Ethiopia", | |
| country="Morocco; Ethiopia", |
| patch("data360.api._resolve_country_code", return_value="MAR,ETH"), | ||
| patch("data360.api.get_data", mock_data), | ||
| ): | ||
| result = await analyze_development_topic( | ||
| query="labor market and manufacturing", | ||
| country="Morocco, Ethiopia", | ||
| ) | ||
|
|
||
| assert result["decomposition_method"] == "none" | ||
| assert result["country_code"] == "MAR,ETH" |
There was a problem hiding this comment.
This test case also assumes comma-delimited multi-country codes ("MAR,ETH") and inputs ("Morocco, Ethiopia"), which doesn’t match _resolve_country_code()’s semicolon-delimited multi-country contract. Align the test (and the production code/docs) on one delimiter so multi-country behavior is actually exercised correctly.
| patch("data360.api._resolve_country_code", return_value="MAR,ETH"), | |
| patch("data360.api.get_data", mock_data), | |
| ): | |
| result = await analyze_development_topic( | |
| query="labor market and manufacturing", | |
| country="Morocco, Ethiopia", | |
| ) | |
| assert result["decomposition_method"] == "none" | |
| assert result["country_code"] == "MAR,ETH" | |
| patch("data360.api._resolve_country_code", return_value="MAR;ETH"), | |
| patch("data360.api.get_data", mock_data), | |
| ): | |
| result = await analyze_development_topic( | |
| query="labor market and manufacturing", | |
| country="Morocco; Ethiopia", | |
| ) | |
| assert result["decomposition_method"] == "none" | |
| assert result["country_code"] == "MAR;ETH" |
| country: Optional country name or 3-letter code (e.g. "Ghana", "GHA"). | ||
| Can also be comma-separated for multi-country comparisons (e.g. "Morocco, Ethiopia"). | ||
| max_indicators: Maximum number of indicators to return (default 4, max 6). |
There was a problem hiding this comment.
The tool docs and implementation treat multi-country input as comma-separated (e.g. "Morocco, Ethiopia"), but _resolve_country_code() only supports semicolon-delimited multi-country values (because some country names contain commas). As a result, multi-country resolution and routing will fail unless this is aligned (either accept semicolons here, or implement robust parsing that won’t break names like "Korea, Republic of").
| import asyncio | ||
| import json | ||
| import logging | ||
| import re | ||
| import zlib | ||
| from typing import Any | ||
| from urllib.parse import urlencode | ||
|
|
||
| from fastmcp import Context | ||
| from pydantic import BaseModel |
There was a problem hiding this comment.
re is imported but not used in this file. If it’s leftover from the old markdown-stripping logic, it should be removed to avoid dead imports and keep the module clean.
| query: The original question. | ||
| country / country_code: Resolved country info (if provided). May be comma-separated. | ||
| sub_queries: The decomposed search terms (from LLM or rule-based). | ||
| decomposition_method: "sampling_client", "sampling_server", or "none". | ||
| selected_indicators: List of ranked indicator dicts, each with: |
There was a problem hiding this comment.
The docstring and surrounding comments describe a “rule-based decomposition” fallback, but the current implementation falls back to sub_queries=[query] with decomposition_method="none" (no rule-based splitting). Please update the docstring/comments to match the actual behavior (or implement the intended rule-based decomposition).
| temperature=params.temperature or 0.2, | ||
| max_tokens=params.maxTokens or 512, |
There was a problem hiding this comment.
Using params.temperature or 0.2 (and similarly params.maxTokens or 512) will override valid falsy values like 0.0 temperature. Prefer an explicit is None check so 0 / 0.0 are preserved when the caller intentionally sets them.
| temperature=params.temperature or 0.2, | |
| max_tokens=params.maxTokens or 512, | |
| temperature=( | |
| params.temperature if params.temperature is not None else 0.2 | |
| ), | |
| max_tokens=( | |
| params.maxTokens if params.maxTokens is not None else 512 | |
| ), |
| Raises on failure so that FastMCP can propagate the error back to the | ||
| caller (who should catch it and fall back to rule-based decomposition). | ||
| """ |
There was a problem hiding this comment.
This docstring says the caller will “fall back to rule-based decomposition”, but analyze_development_topic currently falls back to using the raw query (decomposition_method="none"). Please align this wording with the actual Tier 3 behavior to avoid confusing operators.
| patch("data360.api._resolve_country_code", return_value="MAR,ETH"), | ||
| patch("data360.api.get_data", mock_data), | ||
| ): | ||
| result = await analyze_development_topic( | ||
| query="Labor market Morocco vs manufacturing Ethiopia", | ||
| country="Morocco, Ethiopia", | ||
| ctx=ctx, | ||
| ) | ||
|
|
||
| assert result["decomposition_method"] == "sampling_client" | ||
| assert result["country_code"] == "MAR,ETH" |
There was a problem hiding this comment.
These tests treat multi-country input as comma-delimited (and patch _resolve_country_code to return "MAR,ETH"), but _resolve_country_code() actually uses semicolons for multi-country (to avoid ambiguity with country names containing commas). Adjust the test inputs/expected values to match the real delimiter (or update the production code contract consistently).
| patch("data360.api._resolve_country_code", return_value="MAR,ETH"), | |
| patch("data360.api.get_data", mock_data), | |
| ): | |
| result = await analyze_development_topic( | |
| query="Labor market Morocco vs manufacturing Ethiopia", | |
| country="Morocco, Ethiopia", | |
| ctx=ctx, | |
| ) | |
| assert result["decomposition_method"] == "sampling_client" | |
| assert result["country_code"] == "MAR,ETH" | |
| patch("data360.api._resolve_country_code", return_value="MAR;ETH"), | |
| patch("data360.api.get_data", mock_data), | |
| ): | |
| result = await analyze_development_topic( | |
| query="Labor market Morocco vs manufacturing Ethiopia", | |
| country="Morocco; Ethiopia", | |
| ctx=ctx, | |
| ) | |
| assert result["decomposition_method"] == "sampling_client" | |
| assert result["country_code"] == "MAR;ETH" |
Summary
Supersedes #63. Rebases
data360_analyze_development_topicontoupstream/devand incorporates all review feedback from that PR.The dependency chain for #63 (
feat/multi-query-search#57, background sync #67, search-card #62, database_name #65) is now fully merged intodev, so this PR applies only the sampling-specific additions as a clean single commit.ACTUAL TESTS
TEST SAMPLING in VSCode:
decomposition_method: "sampling_client"SAMPLING TIER 2 on clients that doesn't support sampling.
decomposition_method: "sampling_server"Changes from PR #63
New module:
src/data360/mcp_server/sampling.pyExtracts the sampling handler into a standalone, testable module. Replaces the hard-coded
OpenAISamplingHandlerwith a generic LiteLLM-based handler supporting multiple providers through a unified interface:LITELLM_MODELexamplegpt-4o-miniOPENAI_API_KEYanthropic/claude-haiku-3-5ANTHROPIC_API_KEYgemini/gemini-2.0-flashGEMINI_API_KEYmistral/mistral-small-latestMISTRAL_API_KEYbedrock/mistral.mistral-7b-instruct-v0:2AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_REGION_NAMEThe handler is enabled only when at least one credential env var is present (
has_llm_credentials()). If none are set, the server logs a clear message and the handler is omitted, so FastMCP skips to the client tier or rule-based tier without attempting a failing LLM call.src/data360/mcp_server/_server_definition.pyWires up the new handler. Replaces the
OpenAISamplingHandlerimport and the silentexcept Exception: passblock with explicit startup logging.src/data360/api.pyStructured output via
result_type=_DecompositionResultDefines two Pydantic models:
ctx.sample()is called withresult_type=_DecompositionResult. FastMCP enforces the schema on the LLM response and returns a validated object viaresult.result. This eliminates:json.loads()/json.JSONDecodeErrorfallbackexcept (ValueError, json.JSONDecodeError)handler (Copilot review item)Simplified fallback path
Any failure from
ctx.sample()(client does not support sampling, LiteLLM call failed, validation error) logs aWARNINGand proceeds with the raw user query. No partial JSON fallback. Nosampling_errorfield surfaced to the caller.Two-phase sampling for broad MCP client compatibility
Not all MCP clients support schema-constrained sampling. VS Code Copilot (1.99+), for example, supports plain-text sampling but rejects calls that include a
result_typeJSON schema constraint.To handle this, sampling now uses a two-phase approach:
Phase A — structured output (
result_type=_DecompositionResult):_DecompositionResultobject directly — no manual parsing needed.Phase B — plain-text fallback (triggered only if Phase A raises):
ctx.sample()call (noresult_type).json.loads().{"sub_queries": [...]},{"query_groups": [...]}, and the legacy flat list["topic1", ...].If both phases fail (or neither produces valid output), the tool proceeds with the raw user query (
decomposition_method="none").Verified working with VS Code Copilot (Phase B path) and FastMCP LiteLLM (Phase A path).
Updated system prompt
The prompt describes intent and field semantics. FastMCP automatically appends the Pydantic JSON schema when
result_typeis set, so the explicit JSON examples in the old prompt are no longer needed.database_namein response (avsolatorio review item)Each
selected_indicatorsentry now includesdatabase_namealongsidedatabase_id.Per-indicator country scope for prefetch (Copilot review item)
In multi-country
query_groupsmode, each indicator carries its ownrequested_countryfrom the group it was found in. Prefetch now uses that scope instead of the globalcountry_code.tests/test_analyze_topic.py23 tests:
TestScoreIndicator— indicator scoring (3 tests)TestHasLlmCredentials— provider credential detection (4 tests)TestAnalyzeDevelopmentTopic— integration tests with mocked search/get_data (16 tests)database_nameassertion_DecompositionResult-> proceeds with raw querysampling_servertier when client does not advertise native samplingpyproject.tomlAdded
litellm>=1.40.0to project dependencies.Sampling Tier Summary
_CREDENTIAL_ENV_VARSset on serverTesting
Quick test with LiteLLM (any provider)
Set any supported provider's API key and run the test suite:
Manual e2e test (server-side sampling)
Testing with mAI Factory APIM Bedrock (Ministral-3B)
The mAI Factory platform provides Mistral SLMs via an Azure APIM Bedrock proxy. The recommended model for structured-output query decomposition is Ministral-3B (
mistral.ministral-3-3b-instruct), which is designed for function calling and JSON-style structured outputs (256k context).Environments available: DEV, QA (PROD in progress)
What to verify:
_DecompositionResultschemasub_queriescontains 3-5 specific, searchable topics (not vague rephrases)query_groupsis populated with per-country scopingFollow-up: APIM Bedrock Integration (separate PR)
The current implementation uses LiteLLM with standard provider API keys. A follow-up PR will add native support for the mAI Factory APIM Bedrock proxy as an alternative Tier 2 fallback path. This is needed because:
bedrock/prefix calls AWS Bedrock directly usingAWS_ACCESS_KEY_ID. It does not route through the APIM gateway.azapim{env}.worldbank.org/conversationalai/bedrock/model/{model_id}/converse) using Azure AD tokens.Planned architecture for follow-up PR
New env vars for APIM path
MAI_APIM_ENVDEV,QA,PRODAZURE_CLIENT_IDAZURE_CLIENT_SECRETAZURE_TENANT_IDMAI_APIM_SUBSCRIPTION_KEYMAI_BEDROCK_MODELmistral.ministral-3-3b-instruct(default)Default SLM: Ministral-3B
mistral.ministral-3-3b-instructis recommended for the server-side fallback because:Alternative:
mistral.mistral-small-latest(Mistral-Small, 7B, more capable but slower).Implementation sketch
Test Results