Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 125 additions & 17 deletions src/lean_explore/mcp/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,32 @@

from lean_explore.mcp.app import AppContext, BackendServiceType, mcp_app
from lean_explore.models import SearchResponse, SearchResult
from lean_explore.models.search_types import (
SearchResultSummary,
SearchSummaryResponse,
extract_bold_description,
)


class SearchResultSummaryDict(TypedDict, total=False):
"""Serialized SearchResultSummary for slim MCP search responses."""

id: int
name: str
description: str | None


class SearchSummaryResponseDict(TypedDict, total=False):
"""Serialized SearchSummaryResponse for slim MCP search responses."""

query: str
results: list[SearchResultSummaryDict]
count: int
processing_time_ms: int | None


class SearchResultDict(TypedDict, total=False):
"""Serialized SearchResult for MCP tool responses."""
"""Serialized SearchResult for verbose MCP tool responses."""

id: int
name: str
Expand All @@ -24,7 +46,7 @@ class SearchResultDict(TypedDict, total=False):


class SearchResponseDict(TypedDict, total=False):
"""Serialized SearchResponse for MCP tool responses."""
"""Serialized SearchResponse for verbose MCP tool responses."""

query: str
results: list[SearchResultDict]
Expand Down Expand Up @@ -55,15 +77,54 @@ async def _get_backend_from_context(ctx: MCPContext) -> BackendServiceType:
return backend


async def _execute_backend_search(
backend: BackendServiceType,
query: str,
limit: int,
rerank_top: int | None,
packages: list[str] | None,
) -> SearchResponse:
"""Execute a search on the backend, handling both async and sync backends.

Args:
backend: The backend service (ApiClient or Service).
query: The search query string.
limit: Maximum number of results.
rerank_top: Number of candidates to rerank with cross-encoder.
packages: Optional package filter.

Returns:
The search response from the backend.

Raises:
RuntimeError: If the backend does not support search.
"""
if not hasattr(backend, "search"):
logger.error("Backend service does not have a 'search' method.")
raise RuntimeError("Search functionality not available on configured backend.")

if asyncio.iscoroutinefunction(backend.search):
return await backend.search(
query=query, limit=limit, rerank_top=rerank_top, packages=packages
)
return backend.search(
query=query, limit=limit, rerank_top=rerank_top, packages=packages
)


@mcp_app.tool()
async def search(
ctx: MCPContext,
query: str,
limit: int = 10,
rerank_top: int | None = 50,
packages: list[str] | None = None,
) -> SearchResponseDict:
"""Searches Lean declarations by a query string.
) -> SearchSummaryResponseDict:
"""Searches Lean declarations and returns concise results.

Returns slim results (id, name, short description) to minimize token usage.
Use get_by_id to retrieve full details for specific declarations, or
search_verbose to get all fields upfront.

Args:
ctx: The MCP context, providing access to the backend service.
Expand All @@ -75,29 +136,73 @@ async def search(
Defaults to None (all packages).

Returns:
A dictionary containing the search response with results.
A dictionary containing slim search results with id, name, and description.
"""
backend = await _get_backend_from_context(ctx)
logger.info(
f"MCP Tool 'search' called with query: '{query}', limit: {limit}, "
f"rerank_top: {rerank_top}, packages: {packages}"
)

if not hasattr(backend, "search"):
logger.error("Backend service does not have a 'search' method.")
raise RuntimeError("Search functionality not available on configured backend.")
response = await _execute_backend_search(
backend, query, limit, rerank_top, packages
)

# Call backend search (handle both async and sync)
if asyncio.iscoroutinefunction(backend.search):
response: SearchResponse = await backend.search(
query=query, limit=limit, rerank_top=rerank_top, packages=packages
)
else:
response: SearchResponse = backend.search(
query=query, limit=limit, rerank_top=rerank_top, packages=packages
# Convert full results to slim summaries
summary_results = [
SearchResultSummary(
id=result.id,
name=result.name,
description=extract_bold_description(result.informalization),
)
for result in response.results
]
summary_response = SearchSummaryResponse(
query=response.query,
results=summary_results,
count=response.count,
processing_time_ms=response.processing_time_ms,
)

return summary_response.model_dump(exclude_none=True)


@mcp_app.tool()
async def search_verbose(
ctx: MCPContext,
query: str,
limit: int = 10,
rerank_top: int | None = 50,
packages: list[str] | None = None,
) -> SearchResponseDict:
"""Searches Lean declarations and returns full results with all fields.

Returns complete results including source code, dependencies, module info,
and full informalization. Use this when you need all details upfront. For
a more concise overview, use search instead.

Args:
ctx: The MCP context, providing access to the backend service.
query: A search query string, e.g., "continuous function".
limit: The maximum number of search results to return. Defaults to 10.
rerank_top: Number of candidates to rerank with cross-encoder. Set to 0 or
None to skip reranking. Defaults to 50. Only used with local backend.
packages: Filter results to specific packages (e.g., ["Mathlib", "Std"]).
Defaults to None (all packages).

Returns:
A dictionary containing the full search response with all fields.
"""
backend = await _get_backend_from_context(ctx)
logger.info(
f"MCP Tool 'search_verbose' called with query: '{query}', limit: {limit}, "
f"rerank_top: {rerank_top}, packages: {packages}"
)

response = await _execute_backend_search(
backend, query, limit, rerank_top, packages
)

# Return as dict for MCP
return response.model_dump(exclude_none=True)


Expand All @@ -108,6 +213,9 @@ async def get_by_id(
) -> SearchResultDict | None:
"""Retrieves a specific declaration by its unique identifier.

Returns the full declaration including source code, dependencies, module
info, and informalization. Use this to expand results from the search tool.

Args:
ctx: The MCP context, providing access to the backend service.
declaration_id: The unique integer identifier of the declaration.
Expand Down
18 changes: 16 additions & 2 deletions src/lean_explore/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@
"""

from lean_explore.models.search_db import Base, Declaration
from lean_explore.models.search_types import SearchResponse, SearchResult
from lean_explore.models.search_types import (
SearchResponse,
SearchResult,
SearchResultSummary,
SearchSummaryResponse,
extract_bold_description,
)

__all__ = ["Base", "Declaration", "SearchResult", "SearchResponse"]
__all__ = [
"Base",
"Declaration",
"SearchResult",
"SearchResponse",
"SearchResultSummary",
"SearchSummaryResponse",
"extract_bold_description",
]
54 changes: 54 additions & 0 deletions src/lean_explore/models/search_types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,62 @@
"""Type definitions for search results and related data structures."""

import re

from pydantic import BaseModel, ConfigDict


def extract_bold_description(informalization: str | None) -> str | None:
"""Extract the bold header text from an informalization string.

Informalizations follow the pattern: **Bold Title.** Rest of description...
This function extracts just the bold title portion.

Args:
informalization: The full informalization text, or None.

Returns:
The bold header text (without ** markers), or None if no bold
header is found or input is None.
"""
if not informalization:
return None
match = re.match(r"\*\*(.+?)\*\*", informalization)
return match.group(1) if match else None


class SearchResultSummary(BaseModel):
"""A slim search result containing only identification and description.

Used by the MCP search tool to return concise results that minimize
token usage. Consumers can use the id to fetch full details via get_by_id.
"""

id: int
"""Primary key identifier."""

name: str
"""Fully qualified Lean name (e.g., 'Nat.add')."""

description: str | None
"""Short description extracted from the informalization bold header."""


class SearchSummaryResponse(BaseModel):
"""Response from a slim search operation containing summary results."""

query: str
"""The original search query string."""

results: list[SearchResultSummary]
"""List of slim search results."""

count: int
"""Number of results returned."""

processing_time_ms: int | None = None
"""Processing time in milliseconds, if available."""


class SearchResult(BaseModel):
"""A search result representing a Lean declaration.

Expand Down
Loading