From 7768614a6bbd4adb4e7153e31fcf8c31235d99ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chindri=C8=99=20Mihai=20Alexandru?= <12643176+chindris-mihai-alexandru@users.noreply.github.com> Date: Wed, 26 Nov 2025 23:19:51 +0200 Subject: [PATCH 1/6] Replace Serper with Exa.ai for semantic web search Exa.ai provides AI-native neural search which offers significant advantages for research agents: - Semantic understanding: Finds relevant results based on meaning, not just keyword matching - Query optimization: Built-in autoprompt improves query quality - Direct content retrieval: Can fetch full page text in a single call - Better for complex queries: Neural embeddings excel at nuanced research questions This change simplifies the codebase by removing the dual search provider system and standardizing on Exa.ai. --- .env.example | 15 ++- inference/tool_search.py | 231 ++++++++++++++++++++++----------------- 2 files changed, 143 insertions(+), 103 deletions(-) diff --git a/.env.example b/.env.example index 8558e9c4..f7bb08c3 100644 --- a/.env.example +++ b/.env.example @@ -46,9 +46,14 @@ MAX_WORKERS=30 # API Keys and External Services # ============================================================================= -# Serper API for web search and Google Scholar -# Get your key from: https://serper.dev/ -SERPER_KEY_ID=your_key +# Exa.ai API for semantic web search +# Get your key from: https://exa.ai/ +# Exa provides AI-native neural search with: +# - Semantic understanding (not just keyword matching) +# - Built-in query optimization +# - Direct content retrieval +# - Better results for complex research queries +EXA_API_KEY=your_key # Jina API for web page reading # Get your key from: https://jina.ai/ @@ -57,8 +62,8 @@ JINA_API_KEYS=your_key # Summary model API (OpenAI-compatible) for page summarization # Get your key from: https://platform.openai.com/ API_KEY=your_key -API_BASE=your_api_base -SUMMARY_MODEL_NAME=your_summary_model_name +API_BASE=https://api.openai.com/v1 +SUMMARY_MODEL_NAME=gpt-4o-mini # Dashscope API for file parsing (PDF, Office, etc.) # Get your key from: https://dashscope.aliyun.com/ diff --git a/inference/tool_search.py b/inference/tool_search.py index 1a3f7b53..48869a45 100644 --- a/inference/tool_search.py +++ b/inference/tool_search.py @@ -1,131 +1,166 @@ +""" +Exa.ai Search Tool for DeepResearch +AI-native semantic search with neural embeddings for superior research results. + +Exa.ai advantages: +- Neural/semantic search (understands meaning, not just keywords) +- Can retrieve full page contents directly +- Better for research and complex queries +- Built-in query optimization (autoprompt) +- Supports date filtering and domain restrictions +""" + import json -from concurrent.futures import ThreadPoolExecutor -from typing import List, Union +import os +from typing import Any, Dict, List, Optional, Union import requests from qwen_agent.tools.base import BaseTool, register_tool -import asyncio -from typing import Dict, List, Optional, Union -import uuid -import http.client -import json - -import os - -SERPER_KEY=os.environ.get('SERPER_KEY_ID') +EXA_API_KEY = os.environ.get('EXA_API_KEY') +EXA_BASE_URL = "https://api.exa.ai" @register_tool("search", allow_overwrite=True) class Search(BaseTool): name = "search" - description = "Performs batched web searches: supply an array 'query'; the tool retrieves the top 10 results for each query in one call." + description = "Performs semantic web searches using Exa.ai: supply an array 'query'; retrieves top results with AI-powered understanding." parameters = { "type": "object", "properties": { "query": { "type": "array", - "items": { - "type": "string" - }, - "description": "Array of query strings. Include multiple complementary search queries in a single call." + "items": {"type": "string"}, + "description": "Array of query strings. Exa understands natural language queries well." }, + "num_results": { + "type": "integer", + "description": "Number of results per query (default: 10, max: 100)", + "default": 10 + }, + "include_contents": { + "type": "boolean", + "description": "Whether to include page text content", + "default": False + } }, "required": ["query"], } def __init__(self, cfg: Optional[dict] = None): super().__init__(cfg) - def google_search_with_serp(self, query: str): - def contains_chinese_basic(text: str) -> bool: - return any('\u4E00' <= char <= '\u9FFF' for char in text) - conn = http.client.HTTPSConnection("google.serper.dev") - if contains_chinese_basic(query): - payload = json.dumps({ - "q": query, - "location": "China", - "gl": "cn", - "hl": "zh-cn" - }) - - else: - payload = json.dumps({ - "q": query, - "location": "United States", - "gl": "us", - "hl": "en" - }) + self.api_key = EXA_API_KEY + if not self.api_key: + raise ValueError("EXA_API_KEY environment variable not set. Get your key from https://exa.ai/") + + def exa_search(self, query: str, num_results: int = 10, include_contents: bool = False) -> str: + """ + Perform a search using Exa.ai API. + + Exa supports multiple search types: + - "auto": Intelligently combines neural and other methods (default) + - "neural": AI-powered semantic search + - "deep": Comprehensive search with query expansion + """ headers = { - 'X-API-KEY': SERPER_KEY, - 'Content-Type': 'application/json' - } + "Content-Type": "application/json", + "x-api-key": self.api_key + } + + payload: Dict[str, Any] = { + "query": query, + "numResults": num_results, + "type": "auto", + "useAutoprompt": True, + } + if include_contents: + payload["contents"] = { + "text": {"maxCharacters": 2000} + } - for i in range(5): + response = None + for attempt in range(3): try: - conn.request("POST", "/search", payload, headers) - res = conn.getresponse() + response = requests.post( + f"{EXA_BASE_URL}/search", + headers=headers, + json=payload, + timeout=30 + ) + response.raise_for_status() break - except Exception as e: - print(e) - if i == 4: - return f"Google search Timeout, return None, Please try again later." + except requests.exceptions.RequestException as e: + if attempt == 2: + return f"Exa search failed after 3 attempts: {str(e)}" continue - - data = res.read() - results = json.loads(data.decode("utf-8")) - - try: - if "organic" not in results: - raise Exception(f"No results found for query: '{query}'. Use a less specific query.") - - web_snippets = list() - idx = 0 - if "organic" in results: - for page in results["organic"]: - idx += 1 - date_published = "" - if "date" in page: - date_published = "\nDate published: " + page["date"] - - source = "" - if "source" in page: - source = "\nSource: " + page["source"] - - snippet = "" - if "snippet" in page: - snippet = "\n" + page["snippet"] - - redacted_version = f"{idx}. [{page['title']}]({page['link']}){date_published}{source}\n{snippet}" - redacted_version = redacted_version.replace("Your browser can't play this video.", "") - web_snippets.append(redacted_version) - - content = f"A Google search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n" + "\n\n".join(web_snippets) - return content - except: - return f"No results found for '{query}'. Try with a more general query." - - - - def search_with_serp(self, query: str): - result = self.google_search_with_serp(query) - return result + + if response is None: + return "Exa search failed: no response received" + + results = response.json() + + if "results" not in results or not results["results"]: + return f"No results found for '{query}'. Try a different query." + + web_snippets = [] + for idx, result in enumerate(results["results"], 1): + title = result.get("title", "No title") + url = result.get("url", "") + published_date = result.get("publishedDate", "") + + snippet_parts = [f"{idx}. [{title}]({url})"] + + if published_date: + snippet_parts.append(f"Date: {published_date[:10]}") + + if include_contents and "text" in result: + text = result["text"][:500] + snippet_parts.append(f"\n{text}...") + elif "snippet" in result: + snippet_parts.append(f"\n{result['snippet']}") + + web_snippets.append("\n".join(snippet_parts)) + + search_type = results.get("resolvedSearchType", "neural") + content = f"Exa {search_type} search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(web_snippets) + return content - def call(self, params: Union[str, dict], **kwargs) -> str: - try: - query = params["query"] - except: - return "[Search] Invalid request format: Input must be a JSON object containing 'query' field" + def call(self, params: Union[str, dict], **kwargs: Any) -> str: + params_dict: Dict[str, Any] + if isinstance(params, str): + try: + params_dict = json.loads(params) + except json.JSONDecodeError: + return "[Search] Invalid JSON input" + else: + params_dict = dict(params) + + query = params_dict.get("query") + if not query: + return "[Search] Invalid request: 'query' field is required" + + raw_num = params_dict.get("num_results", 10) + num_results = int(raw_num) if raw_num is not None else 10 + include_contents = bool(params_dict.get("include_contents", False)) if isinstance(query, str): - # 单个查询 - response = self.search_with_serp(query) - else: - # 多个查询 - assert isinstance(query, List) + return self.exa_search(query, num_results, include_contents) + + if isinstance(query, list): responses = [] for q in query: - responses.append(self.search_with_serp(q)) - response = "\n=======\n".join(responses) - - return response + responses.append(self.exa_search(q, num_results, include_contents)) + return "\n=======\n".join(responses) + + return "[Search] Invalid query format: must be string or array of strings" + +if __name__ == "__main__": + from dotenv import load_dotenv + + env_path = os.path.join(os.path.dirname(__file__), "..", ".env") + load_dotenv(env_path) + + searcher = Search() + result = searcher.call({"query": ["What is retrieval augmented generation?"]}) + print(result) From 63f5738df30c4c170a541dd6a7963cfac538bb49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chindri=C8=99=20Mihai=20Alexandru?= <12643176+chindris-mihai-alexandru@users.noreply.github.com> Date: Wed, 26 Nov 2025 23:28:33 +0200 Subject: [PATCH 2/6] Add category filtering and AI highlights support to Exa search - Add category parameter to filter results (research paper, news, github, etc.) - Add AI-generated highlights for better content extraction - Include author information in search results - Document all available Exa categories in docstrings --- inference/tool_search.py | 70 ++++++++++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/inference/tool_search.py b/inference/tool_search.py index 48869a45..52737ea4 100644 --- a/inference/tool_search.py +++ b/inference/tool_search.py @@ -8,6 +8,8 @@ - Better for research and complex queries - Built-in query optimization (autoprompt) - Supports date filtering and domain restrictions +- Category filtering (research papers, news, company info, etc.) +- AI-generated highlights for quick comprehension """ import json @@ -19,11 +21,17 @@ EXA_API_KEY = os.environ.get('EXA_API_KEY') EXA_BASE_URL = "https://api.exa.ai" +# Valid Exa categories for filtering results +VALID_CATEGORIES = [ + "company", "research paper", "news", "pdf", + "github", "tweet", "personal site", "linkedin profile" +] + @register_tool("search", allow_overwrite=True) class Search(BaseTool): name = "search" - description = "Performs semantic web searches using Exa.ai: supply an array 'query'; retrieves top results with AI-powered understanding." + description = "Performs semantic web searches using Exa.ai: supply an array 'query'; retrieves top results with AI-powered understanding. Supports category filtering for research papers, news, etc." parameters = { "type": "object", "properties": { @@ -39,8 +47,13 @@ class Search(BaseTool): }, "include_contents": { "type": "boolean", - "description": "Whether to include page text content", + "description": "Whether to include page text content and highlights", "default": False + }, + "category": { + "type": "string", + "description": "Filter by category: 'research paper', 'news', 'company', 'pdf', 'github', 'tweet', 'personal site', 'linkedin profile'", + "enum": ["company", "research paper", "news", "pdf", "github", "tweet", "personal site", "linkedin profile"] } }, "required": ["query"], @@ -52,7 +65,13 @@ def __init__(self, cfg: Optional[dict] = None): if not self.api_key: raise ValueError("EXA_API_KEY environment variable not set. Get your key from https://exa.ai/") - def exa_search(self, query: str, num_results: int = 10, include_contents: bool = False) -> str: + def exa_search( + self, + query: str, + num_results: int = 10, + include_contents: bool = False, + category: Optional[str] = None + ) -> str: """ Perform a search using Exa.ai API. @@ -60,6 +79,16 @@ def exa_search(self, query: str, num_results: int = 10, include_contents: bool = - "auto": Intelligently combines neural and other methods (default) - "neural": AI-powered semantic search - "deep": Comprehensive search with query expansion + + Categories available: + - "research paper": Academic papers and publications + - "news": News articles + - "company": Company websites and info + - "pdf": PDF documents + - "github": GitHub repositories + - "tweet": Twitter/X posts + - "personal site": Personal websites/blogs + - "linkedin profile": LinkedIn profiles """ headers = { "Content-Type": "application/json", @@ -73,9 +102,14 @@ def exa_search(self, query: str, num_results: int = 10, include_contents: bool = "useAutoprompt": True, } + # Add category filter if specified + if category and category in VALID_CATEGORIES: + payload["category"] = category + if include_contents: payload["contents"] = { - "text": {"maxCharacters": 2000} + "text": {"maxCharacters": 2000}, + "highlights": True } response = None @@ -107,22 +141,33 @@ def exa_search(self, query: str, num_results: int = 10, include_contents: bool = title = result.get("title", "No title") url = result.get("url", "") published_date = result.get("publishedDate", "") + author = result.get("author", "") snippet_parts = [f"{idx}. [{title}]({url})"] + if author: + snippet_parts.append(f"Author: {author}") if published_date: snippet_parts.append(f"Date: {published_date[:10]}") - if include_contents and "text" in result: - text = result["text"][:500] - snippet_parts.append(f"\n{text}...") + # Prefer highlights (AI-generated key points), then text, then snippet + if include_contents: + highlights = result.get("highlights", []) + if highlights: + snippet_parts.append("\nKey points:") + for h in highlights[:3]: + snippet_parts.append(f" • {h}") + elif "text" in result: + text = result["text"][:500] + snippet_parts.append(f"\n{text}...") elif "snippet" in result: snippet_parts.append(f"\n{result['snippet']}") web_snippets.append("\n".join(snippet_parts)) search_type = results.get("resolvedSearchType", "neural") - content = f"Exa {search_type} search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(web_snippets) + category_info = f" (category: {category})" if category else "" + content = f"Exa {search_type} search{category_info} for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(web_snippets) return content def call(self, params: Union[str, dict], **kwargs: Any) -> str: @@ -142,14 +187,19 @@ def call(self, params: Union[str, dict], **kwargs: Any) -> str: raw_num = params_dict.get("num_results", 10) num_results = int(raw_num) if raw_num is not None else 10 include_contents = bool(params_dict.get("include_contents", False)) + category = params_dict.get("category") + + # Validate category if provided + if category and category not in VALID_CATEGORIES: + category = None if isinstance(query, str): - return self.exa_search(query, num_results, include_contents) + return self.exa_search(query, num_results, include_contents, category) if isinstance(query, list): responses = [] for q in query: - responses.append(self.exa_search(q, num_results, include_contents)) + responses.append(self.exa_search(q, num_results, include_contents, category)) return "\n=======\n".join(responses) return "[Search] Invalid query format: must be string or array of strings" From 8fda6ecdb55407dfc958bca7140b00bc0b4141f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chindri=C8=99=20Mihai=20Alexandru?= <12643176+chindris-mihai-alexandru@users.noreply.github.com> Date: Wed, 26 Nov 2025 23:34:50 +0200 Subject: [PATCH 3/6] Polish Exa search: remove unused import, add rate limit and auth error handling --- inference/tool_search.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/inference/tool_search.py b/inference/tool_search.py index 52737ea4..cb3b3645 100644 --- a/inference/tool_search.py +++ b/inference/tool_search.py @@ -14,7 +14,7 @@ import json import os -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, Optional, Union import requests from qwen_agent.tools.base import BaseTool, register_tool @@ -123,6 +123,13 @@ def exa_search( ) response.raise_for_status() break + except requests.exceptions.HTTPError as e: + if response is not None and response.status_code == 429: + return f"Exa search rate limited. Please wait and try again." + if response is not None and response.status_code == 401: + return f"Exa API key invalid. Check your EXA_API_KEY environment variable." + if attempt == 2: + return f"Exa search failed after 3 attempts: {str(e)}" except requests.exceptions.RequestException as e: if attempt == 2: return f"Exa search failed after 3 attempts: {str(e)}" From 1eb21d45665bb9ec071b51fd74739f235a43e5ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chindri=C8=99=20Mihai=20Alexandru?= <12643176+chindris-mihai-alexandru@users.noreply.github.com> Date: Fri, 28 Nov 2025 16:33:58 +0200 Subject: [PATCH 4/6] =?UTF-8?q?feat:=20add=20Exa=20=E2=86=92=20Serper=20?= =?UTF-8?q?=E2=86=92=20DuckDuckGo=20fallback=20chain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements multi-provider search with automatic fallback: - Exa.ai as primary (best semantic search quality) - Serper.dev as first fallback (Google results) - DuckDuckGo as second fallback (free, always available) Fallback triggers: missing API key, rate limit, or API errors. Each provider has 3 retry attempts before failing over. --- inference/tool_search.py | 387 +++++++++++++++++++++++++++------------ 1 file changed, 270 insertions(+), 117 deletions(-) diff --git a/inference/tool_search.py b/inference/tool_search.py index cb3b3645..c47b77c4 100644 --- a/inference/tool_search.py +++ b/inference/tool_search.py @@ -1,95 +1,58 @@ """ -Exa.ai Search Tool for DeepResearch -AI-native semantic search with neural embeddings for superior research results. - -Exa.ai advantages: -- Neural/semantic search (understands meaning, not just keywords) -- Can retrieve full page contents directly -- Better for research and complex queries -- Built-in query optimization (autoprompt) -- Supports date filtering and domain restrictions -- Category filtering (research papers, news, company info, etc.) -- AI-generated highlights for quick comprehension +Multi-Provider Search Tool for DeepResearch +Implements a fallback chain: Exa.ai → Serper → DuckDuckGo + +Provider priority: +1. Exa.ai (best quality, semantic search with neural embeddings) +2. Serper.dev (Google results, reliable fallback) +3. DuckDuckGo (free, always available) + +The tool automatically falls back to the next provider when: +- API key is not configured +- Rate limit is hit +- API errors occur """ import json import os -from typing import Any, Dict, Optional, Union +import time +from typing import Any, Dict, List, Optional, Union import requests from qwen_agent.tools.base import BaseTool, register_tool +# API Keys from environment EXA_API_KEY = os.environ.get('EXA_API_KEY') +SERPER_API_KEY = os.environ.get('SERPER_API_KEY') + +# API endpoints EXA_BASE_URL = "https://api.exa.ai" +SERPER_BASE_URL = "https://google.serper.dev" -# Valid Exa categories for filtering results +# Valid Exa categories VALID_CATEGORIES = [ "company", "research paper", "news", "pdf", "github", "tweet", "personal site", "linkedin profile" ] -@register_tool("search", allow_overwrite=True) -class Search(BaseTool): - name = "search" - description = "Performs semantic web searches using Exa.ai: supply an array 'query'; retrieves top results with AI-powered understanding. Supports category filtering for research papers, news, etc." - parameters = { - "type": "object", - "properties": { - "query": { - "type": "array", - "items": {"type": "string"}, - "description": "Array of query strings. Exa understands natural language queries well." - }, - "num_results": { - "type": "integer", - "description": "Number of results per query (default: 10, max: 100)", - "default": 10 - }, - "include_contents": { - "type": "boolean", - "description": "Whether to include page text content and highlights", - "default": False - }, - "category": { - "type": "string", - "description": "Filter by category: 'research paper', 'news', 'company', 'pdf', 'github', 'tweet', 'personal site', 'linkedin profile'", - "enum": ["company", "research paper", "news", "pdf", "github", "tweet", "personal site", "linkedin profile"] - } - }, - "required": ["query"], - } +class SearchProviderError(Exception): + """Raised when a search provider fails and should fallback.""" + pass - def __init__(self, cfg: Optional[dict] = None): - super().__init__(cfg) - self.api_key = EXA_API_KEY - if not self.api_key: - raise ValueError("EXA_API_KEY environment variable not set. Get your key from https://exa.ai/") - - def exa_search( - self, - query: str, - num_results: int = 10, + +class ExaSearch: + """Exa.ai semantic search provider.""" + + def __init__(self, api_key: str): + self.api_key = api_key + + def search( + self, + query: str, + num_results: int = 10, include_contents: bool = False, category: Optional[str] = None ) -> str: - """ - Perform a search using Exa.ai API. - - Exa supports multiple search types: - - "auto": Intelligently combines neural and other methods (default) - - "neural": AI-powered semantic search - - "deep": Comprehensive search with query expansion - - Categories available: - - "research paper": Academic papers and publications - - "news": News articles - - "company": Company websites and info - - "pdf": PDF documents - - "github": GitHub repositories - - "tweet": Twitter/X posts - - "personal site": Personal websites/blogs - - "linkedin profile": LinkedIn profiles - """ headers = { "Content-Type": "application/json", "x-api-key": self.api_key @@ -102,7 +65,6 @@ def exa_search( "useAutoprompt": True, } - # Add category filter if specified if category and category in VALID_CATEGORIES: payload["category"] = category @@ -124,58 +86,239 @@ def exa_search( response.raise_for_status() break except requests.exceptions.HTTPError as e: - if response is not None and response.status_code == 429: - return f"Exa search rate limited. Please wait and try again." - if response is not None and response.status_code == 401: - return f"Exa API key invalid. Check your EXA_API_KEY environment variable." + if response is not None: + if response.status_code == 429: + raise SearchProviderError("Exa rate limited") + if response.status_code == 401: + raise SearchProviderError("Exa API key invalid") + if response.status_code == 402: + raise SearchProviderError("Exa credits exhausted") if attempt == 2: - return f"Exa search failed after 3 attempts: {str(e)}" + raise SearchProviderError(f"Exa failed: {e}") except requests.exceptions.RequestException as e: if attempt == 2: - return f"Exa search failed after 3 attempts: {str(e)}" - continue + raise SearchProviderError(f"Exa failed: {e}") + time.sleep(1) if response is None: - return "Exa search failed: no response received" + raise SearchProviderError("Exa: no response") results = response.json() if "results" not in results or not results["results"]: - return f"No results found for '{query}'. Try a different query." - - web_snippets = [] - for idx, result in enumerate(results["results"], 1): - title = result.get("title", "No title") - url = result.get("url", "") - published_date = result.get("publishedDate", "") - author = result.get("author", "") - - snippet_parts = [f"{idx}. [{title}]({url})"] + return f"No results found for '{query}'." + + snippets = [] + for idx, r in enumerate(results["results"], 1): + title = r.get("title", "No title") + url = r.get("url", "") + date = r.get("publishedDate", "")[:10] if r.get("publishedDate") else "" - if author: - snippet_parts.append(f"Author: {author}") - if published_date: - snippet_parts.append(f"Date: {published_date[:10]}") + parts = [f"{idx}. [{title}]({url})"] + if date: + parts.append(f"Date: {date}") - # Prefer highlights (AI-generated key points), then text, then snippet if include_contents: - highlights = result.get("highlights", []) + highlights = r.get("highlights", []) if highlights: - snippet_parts.append("\nKey points:") + parts.append("Key points:") for h in highlights[:3]: - snippet_parts.append(f" • {h}") - elif "text" in result: - text = result["text"][:500] - snippet_parts.append(f"\n{text}...") - elif "snippet" in result: - snippet_parts.append(f"\n{result['snippet']}") + parts.append(f" • {h}") + elif r.get("text"): + parts.append(r["text"][:500] + "...") + elif r.get("snippet"): + parts.append(r["snippet"]) - web_snippets.append("\n".join(snippet_parts)) + snippets.append("\n".join(parts)) search_type = results.get("resolvedSearchType", "neural") - category_info = f" (category: {category})" if category else "" - content = f"Exa {search_type} search{category_info} for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(web_snippets) - return content + cat_info = f" (category: {category})" if category else "" + return f"[Exa {search_type}]{cat_info} '{query}' - {len(snippets)} results:\n\n" + "\n\n".join(snippets) + + +class SerperSearch: + """Serper.dev Google search provider.""" + + def __init__(self, api_key: str): + self.api_key = api_key + + def search(self, query: str, num_results: int = 10) -> str: + headers = { + "X-API-KEY": self.api_key, + "Content-Type": "application/json" + } + + payload = { + "q": query, + "num": min(num_results, 100) + } + + response = None + for attempt in range(3): + try: + response = requests.post( + f"{SERPER_BASE_URL}/search", + headers=headers, + json=payload, + timeout=30 + ) + response.raise_for_status() + break + except requests.exceptions.HTTPError as e: + if response is not None: + if response.status_code == 429: + raise SearchProviderError("Serper rate limited") + if response.status_code in (401, 403): + raise SearchProviderError("Serper API key invalid") + if attempt == 2: + raise SearchProviderError(f"Serper failed: {e}") + except requests.exceptions.RequestException as e: + if attempt == 2: + raise SearchProviderError(f"Serper failed: {e}") + time.sleep(1) + + if response is None: + raise SearchProviderError("Serper: no response") + + data = response.json() + organic = data.get("organic", []) + + if not organic: + return f"No results found for '{query}'." + + snippets = [] + for idx, r in enumerate(organic[:num_results], 1): + title = r.get("title", "No title") + url = r.get("link", "") + snippet = r.get("snippet", "") + + parts = [f"{idx}. [{title}]({url})"] + if snippet: + parts.append(snippet) + snippets.append("\n".join(parts)) + + return f"[Serper/Google] '{query}' - {len(snippets)} results:\n\n" + "\n\n".join(snippets) + + +class DuckDuckGoSearch: + """DuckDuckGo search provider (free, no API key needed).""" + + def search(self, query: str, num_results: int = 10) -> str: + try: + from duckduckgo_search import DDGS + except ImportError: + raise SearchProviderError("DuckDuckGo: duckduckgo_search not installed") + + results: List[Dict[str, Any]] = [] + for attempt in range(3): + try: + with DDGS() as ddg: + results = list(ddg.text(query, max_results=num_results)) + break + except Exception as e: + err = str(e).lower() + if "ratelimit" in err or "429" in err: + if attempt < 2: + wait = 2 ** attempt + time.sleep(wait) + continue + raise SearchProviderError(f"DuckDuckGo rate limited after {attempt + 1} attempts") + if attempt == 2: + raise SearchProviderError(f"DuckDuckGo failed: {e}") + time.sleep(1) + + if not results: + return f"No results found for '{query}'." + + snippets = [] + for idx, r in enumerate(results, 1): + title = r.get("title", "No title") + url = r.get("href", "") + body = r.get("body", "") + + parts = [f"{idx}. [{title}]({url})"] + if body: + parts.append(body) + snippets.append("\n".join(parts)) + + return f"[DuckDuckGo] '{query}' - {len(snippets)} results:\n\n" + "\n\n".join(snippets) + + +@register_tool("search", allow_overwrite=True) +class Search(BaseTool): + """ + Multi-provider search tool with automatic fallback. + + Fallback chain: Exa.ai → Serper → DuckDuckGo + """ + + name = "search" + description = "Search the web. Tries Exa.ai (semantic), then Serper (Google), then DuckDuckGo. Supply 'query' as string or array." + parameters = { + "type": "object", + "properties": { + "query": { + "type": "array", + "items": {"type": "string"}, + "description": "Query string or array of queries" + }, + "num_results": { + "type": "integer", + "description": "Results per query (default: 10)", + "default": 10 + }, + "include_contents": { + "type": "boolean", + "description": "Include page contents (Exa only)", + "default": False + }, + "category": { + "type": "string", + "description": "Category filter (Exa only): 'research paper', 'news', 'company', 'pdf', 'github'", + "enum": VALID_CATEGORIES + } + }, + "required": ["query"], + } + + def __init__(self, cfg: Optional[dict] = None): + super().__init__(cfg) + self.providers: List[tuple] = [] + + # Build provider list based on available API keys + if EXA_API_KEY: + self.providers.append(("exa", ExaSearch(EXA_API_KEY))) + if SERPER_API_KEY: + self.providers.append(("serper", SerperSearch(SERPER_API_KEY))) + # DuckDuckGo always available (no API key needed) + self.providers.append(("duckduckgo", DuckDuckGoSearch())) + + if not self.providers: + raise ValueError("No search providers available") + + def _search_with_fallback( + self, + query: str, + num_results: int = 10, + include_contents: bool = False, + category: Optional[str] = None + ) -> str: + """Try each provider in order until one succeeds.""" + errors = [] + + for name, provider in self.providers: + try: + if name == "exa": + return provider.search(query, num_results, include_contents, category) + elif name == "serper": + return provider.search(query, num_results) + else: # duckduckgo + return provider.search(query, num_results) + except SearchProviderError as e: + errors.append(f"{name}: {e}") + continue + + return f"[Search] All providers failed for '{query}':\n" + "\n".join(f" - {e}" for e in errors) def call(self, params: Union[str, dict], **kwargs: Any) -> str: params_dict: Dict[str, Any] @@ -183,33 +326,31 @@ def call(self, params: Union[str, dict], **kwargs: Any) -> str: try: params_dict = json.loads(params) except json.JSONDecodeError: - return "[Search] Invalid JSON input" + return "[Search] Invalid JSON" else: params_dict = dict(params) query = params_dict.get("query") if not query: - return "[Search] Invalid request: 'query' field is required" + return "[Search] 'query' is required" - raw_num = params_dict.get("num_results", 10) - num_results = int(raw_num) if raw_num is not None else 10 + num_results = int(params_dict.get("num_results", 10) or 10) include_contents = bool(params_dict.get("include_contents", False)) category = params_dict.get("category") - # Validate category if provided if category and category not in VALID_CATEGORIES: category = None if isinstance(query, str): - return self.exa_search(query, num_results, include_contents, category) + return self._search_with_fallback(query, num_results, include_contents, category) if isinstance(query, list): - responses = [] + results = [] for q in query: - responses.append(self.exa_search(q, num_results, include_contents, category)) - return "\n=======\n".join(responses) + results.append(self._search_with_fallback(q, num_results, include_contents, category)) + return "\n=======\n".join(results) - return "[Search] Invalid query format: must be string or array of strings" + return "[Search] query must be string or array" if __name__ == "__main__": @@ -218,6 +359,18 @@ def call(self, params: Union[str, dict], **kwargs: Any) -> str: env_path = os.path.join(os.path.dirname(__file__), "..", ".env") load_dotenv(env_path) + # Check available providers + exa_key = os.environ.get('EXA_API_KEY') + serper_key = os.environ.get('SERPER_API_KEY') + + print("Available providers:") + if exa_key: + print(" ✓ Exa.ai") + if serper_key: + print(" ✓ Serper.dev") + print(" ✓ DuckDuckGo (always available)") + print() + searcher = Search() result = searcher.call({"query": ["What is retrieval augmented generation?"]}) print(result) From f554d191a8f63786718450a46ebbab08adddf7e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chindri=C8=99=20Mihai=20Alexandru?= <12643176+chindris-mihai-alexandru@users.noreply.github.com> Date: Fri, 28 Nov 2025 16:48:15 +0200 Subject: [PATCH 5/6] Add Tavily to multi-provider search chain - Reorder providers by quality: Exa -> Tavily -> Serper -> DuckDuckGo - Tavily is purpose-built for RAG/LLMs with 1,000 free requests/month - Simplify implementation with cleaner function structure - Remove unnecessary class-based approach for providers - Update .env.example with TAVILY_API_KEY documentation --- .env.example | 40 ++- inference/tool_search.py | 597 ++++++++++++++++++++------------------- 2 files changed, 338 insertions(+), 299 deletions(-) diff --git a/.env.example b/.env.example index f7bb08c3..c53f27cb 100644 --- a/.env.example +++ b/.env.example @@ -46,15 +46,24 @@ MAX_WORKERS=30 # API Keys and External Services # ============================================================================= -# Exa.ai API for semantic web search +# Web Search Providers (in order of quality/preference) +# The system will try each provider in order until one succeeds. +# You only need ONE provider configured, but having multiple provides fallback. + +# Exa.ai - Best semantic/neural search ($10 free credits) # Get your key from: https://exa.ai/ -# Exa provides AI-native neural search with: -# - Semantic understanding (not just keyword matching) -# - Built-in query optimization -# - Direct content retrieval -# - Better results for complex research queries EXA_API_KEY=your_key +# Tavily - Purpose-built for RAG/LLMs (1,000 free requests/month) +# Get your key from: https://tavily.com/ +TAVILY_API_KEY=your_key + +# Serper API for Google search results (2,500 free queries) +# Get your key from: https://serper.dev/ +SERPER_KEY_ID=your_key + +# DuckDuckGo is always available as final fallback (FREE, no API key needed) + # Jina API for web page reading # Get your key from: https://jina.ai/ JINA_API_KEYS=your_key @@ -62,8 +71,8 @@ JINA_API_KEYS=your_key # Summary model API (OpenAI-compatible) for page summarization # Get your key from: https://platform.openai.com/ API_KEY=your_key -API_BASE=https://api.openai.com/v1 -SUMMARY_MODEL_NAME=gpt-4o-mini +API_BASE=your_api_base +SUMMARY_MODEL_NAME=your_summary_model_name # Dashscope API for file parsing (PDF, Office, etc.) # Get your key from: https://dashscope.aliyun.com/ @@ -100,4 +109,17 @@ IDP_KEY_SECRET=your_idp_key_secret # These are typically set by distributed training frameworks # WORLD_SIZE=1 -# RANK=0 \ No newline at end of file +# RANK=0 + +# ============================================================================= +# llama.cpp Local Inference (Alternative for Mac/Local Users) +# ============================================================================= +# If using the llama.cpp local inference option instead of vLLM: + +# The llama.cpp server URL (default works if using start_llama_server.sh) +LLAMA_SERVER_URL=http://127.0.0.1:8080 + +# For llama.cpp mode: +# - Web search uses DuckDuckGo by default (FREE, no API key needed) +# - JINA_API_KEYS is optional but recommended for better page reading +# - See: python inference/interactive_llamacpp.py --help \ No newline at end of file diff --git a/inference/tool_search.py b/inference/tool_search.py index c47b77c4..03229595 100644 --- a/inference/tool_search.py +++ b/inference/tool_search.py @@ -1,376 +1,393 @@ """ -Multi-Provider Search Tool for DeepResearch -Implements a fallback chain: Exa.ai → Serper → DuckDuckGo +Multi-Provider Web Search Tool +============================== -Provider priority: -1. Exa.ai (best quality, semantic search with neural embeddings) -2. Serper.dev (Google results, reliable fallback) -3. DuckDuckGo (free, always available) +Implements a robust search fallback chain optimized for AI research: + 1. Exa.ai - Best semantic/neural search, $10 free credits + 2. Tavily - Purpose-built for RAG/LLMs, 1,000 free requests/month + 3. Serper - Google SERP results, 2,500 free queries + 4. DuckDuckGo - Free forever, final fallback (no API key needed) -The tool automatically falls back to the next provider when: -- API key is not configured -- Rate limit is hit -- API errors occur +Each provider is tried in order. If one fails (rate limit, error, no key), +the next provider is attempted automatically. + +Environment Variables: + EXA_API_KEY - Exa.ai API key (https://exa.ai/) + TAVILY_API_KEY - Tavily API key (https://tavily.com/) + SERPER_KEY_ID - Serper API key (https://serper.dev/) + +If no API keys are set, DuckDuckGo is used as the default (free, no key needed). """ +import http.client import json import os import time -from typing import Any, Dict, List, Optional, Union +from typing import Dict, List, Optional, Union + import requests from qwen_agent.tools.base import BaseTool, register_tool -# API Keys from environment -EXA_API_KEY = os.environ.get('EXA_API_KEY') -SERPER_API_KEY = os.environ.get('SERPER_API_KEY') -# API endpoints -EXA_BASE_URL = "https://api.exa.ai" -SERPER_BASE_URL = "https://google.serper.dev" +# API Keys from environment +EXA_API_KEY = os.environ.get("EXA_API_KEY", "") +TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY", "") +SERPER_KEY = os.environ.get("SERPER_KEY_ID", "") -# Valid Exa categories -VALID_CATEGORIES = [ - "company", "research paper", "news", "pdf", - "github", "tweet", "personal site", "linkedin profile" -] +def contains_chinese(text: str) -> bool: + """Check if text contains Chinese characters.""" + return any("\u4E00" <= char <= "\u9FFF" for char in text) -class SearchProviderError(Exception): - """Raised when a search provider fails and should fallback.""" - pass +# ============================================================================= +# Search Providers +# ============================================================================= -class ExaSearch: - """Exa.ai semantic search provider.""" - - def __init__(self, api_key: str): - self.api_key = api_key +def search_exa(query: str, num_results: int = 10) -> Optional[str]: + """ + Exa.ai - Neural/semantic search engine. + Best for finding conceptually relevant results, not just keyword matches. + """ + if not EXA_API_KEY: + return None - def search( - self, - query: str, - num_results: int = 10, - include_contents: bool = False, - category: Optional[str] = None - ) -> str: - headers = { - "Content-Type": "application/json", - "x-api-key": self.api_key - } + try: + response = requests.post( + "https://api.exa.ai/search", + headers={ + "x-api-key": EXA_API_KEY, + "Content-Type": "application/json", + }, + json={ + "query": query, + "numResults": num_results, + "useAutoprompt": True, + "type": "neural", + }, + timeout=30, + ) - payload: Dict[str, Any] = { - "query": query, - "numResults": num_results, - "type": "auto", - "useAutoprompt": True, - } + if response.status_code == 401: + print("[Exa] Invalid API key") + return None + if response.status_code == 429: + print("[Exa] Rate limited") + return None + if response.status_code != 200: + print(f"[Exa] Error {response.status_code}: {response.text[:200]}") + return None - if category and category in VALID_CATEGORIES: - payload["category"] = category + data = response.json() + results = data.get("results", []) - if include_contents: - payload["contents"] = { - "text": {"maxCharacters": 2000}, - "highlights": True - } + if not results: + return None - response = None - for attempt in range(3): - try: - response = requests.post( - f"{EXA_BASE_URL}/search", - headers=headers, - json=payload, - timeout=30 - ) - response.raise_for_status() - break - except requests.exceptions.HTTPError as e: - if response is not None: - if response.status_code == 429: - raise SearchProviderError("Exa rate limited") - if response.status_code == 401: - raise SearchProviderError("Exa API key invalid") - if response.status_code == 402: - raise SearchProviderError("Exa credits exhausted") - if attempt == 2: - raise SearchProviderError(f"Exa failed: {e}") - except requests.exceptions.RequestException as e: - if attempt == 2: - raise SearchProviderError(f"Exa failed: {e}") - time.sleep(1) + snippets = [] + for idx, r in enumerate(results, 1): + title = r.get("title", "No title") + url = r.get("url", "") + text = r.get("text", "")[:300] if r.get("text") else "" + published = r.get("publishedDate", "") + + snippet = f"{idx}. [{title}]({url})" + if published: + snippet += f"\nDate published: {published[:10]}" + if text: + snippet += f"\n{text}" + snippets.append(snippet) - if response is None: - raise SearchProviderError("Exa: no response") + return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) + + except requests.Timeout: + print("[Exa] Request timeout") + return None + except Exception as e: + print(f"[Exa] Error: {e}") + return None + + +def search_tavily(query: str, num_results: int = 10) -> Optional[str]: + """ + Tavily - Search API designed specifically for RAG and LLM applications. + Returns AI-optimized snippets and supports advanced filtering. + """ + if not TAVILY_API_KEY: + return None + + try: + response = requests.post( + "https://api.tavily.com/search", + headers={"Content-Type": "application/json"}, + json={ + "api_key": TAVILY_API_KEY, + "query": query, + "max_results": num_results, + "search_depth": "advanced", + "include_answer": False, + "include_raw_content": False, + }, + timeout=30, + ) + + if response.status_code == 401: + print("[Tavily] Invalid API key") + return None + if response.status_code == 429: + print("[Tavily] Rate limited") + return None + if response.status_code != 200: + print(f"[Tavily] Error {response.status_code}: {response.text[:200]}") + return None - results = response.json() + data = response.json() + results = data.get("results", []) - if "results" not in results or not results["results"]: - return f"No results found for '{query}'." + if not results: + return None snippets = [] - for idx, r in enumerate(results["results"], 1): + for idx, r in enumerate(results, 1): title = r.get("title", "No title") url = r.get("url", "") - date = r.get("publishedDate", "")[:10] if r.get("publishedDate") else "" + content = r.get("content", "")[:300] + score = r.get("score", 0) - parts = [f"{idx}. [{title}]({url})"] - if date: - parts.append(f"Date: {date}") - - if include_contents: - highlights = r.get("highlights", []) - if highlights: - parts.append("Key points:") - for h in highlights[:3]: - parts.append(f" • {h}") - elif r.get("text"): - parts.append(r["text"][:500] + "...") - elif r.get("snippet"): - parts.append(r["snippet"]) - - snippets.append("\n".join(parts)) + snippet = f"{idx}. [{title}]({url})" + if content: + snippet += f"\n{content}" + snippets.append(snippet) - search_type = results.get("resolvedSearchType", "neural") - cat_info = f" (category: {category})" if category else "" - return f"[Exa {search_type}]{cat_info} '{query}' - {len(snippets)} results:\n\n" + "\n\n".join(snippets) + return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) + + except requests.Timeout: + print("[Tavily] Request timeout") + return None + except Exception as e: + print(f"[Tavily] Error: {e}") + return None -class SerperSearch: - """Serper.dev Google search provider.""" - - def __init__(self, api_key: str): - self.api_key = api_key +def search_serper(query: str, num_results: int = 10) -> Optional[str]: + """ + Serper - Google Search API (SERP results). + Fast and reliable Google search results. + """ + if not SERPER_KEY: + return None - def search(self, query: str, num_results: int = 10) -> str: - headers = { - "X-API-KEY": self.api_key, - "Content-Type": "application/json" - } + try: + conn = http.client.HTTPSConnection("google.serper.dev") + + if contains_chinese(query): + payload = json.dumps({ + "q": query, + "location": "China", + "gl": "cn", + "hl": "zh-cn", + "num": num_results, + }) + else: + payload = json.dumps({ + "q": query, + "location": "United States", + "gl": "us", + "hl": "en", + "num": num_results, + }) - payload = { - "q": query, - "num": min(num_results, 100) + headers = { + "X-API-KEY": SERPER_KEY, + "Content-Type": "application/json", } - response = None + res = None for attempt in range(3): try: - response = requests.post( - f"{SERPER_BASE_URL}/search", - headers=headers, - json=payload, - timeout=30 - ) - response.raise_for_status() + conn.request("POST", "/search", payload, headers) + res = conn.getresponse() break - except requests.exceptions.HTTPError as e: - if response is not None: - if response.status_code == 429: - raise SearchProviderError("Serper rate limited") - if response.status_code in (401, 403): - raise SearchProviderError("Serper API key invalid") - if attempt == 2: - raise SearchProviderError(f"Serper failed: {e}") - except requests.exceptions.RequestException as e: + except Exception as e: if attempt == 2: - raise SearchProviderError(f"Serper failed: {e}") + print(f"[Serper] Connection error: {e}") + return None time.sleep(1) + continue - if response is None: - raise SearchProviderError("Serper: no response") + if res is None: + return None - data = response.json() - organic = data.get("organic", []) + data = json.loads(res.read().decode("utf-8")) - if not organic: - return f"No results found for '{query}'." + if "error" in data: + print(f"[Serper] API error: {data['error']}") + return None + + if "organic" not in data: + return None snippets = [] - for idx, r in enumerate(organic[:num_results], 1): - title = r.get("title", "No title") - url = r.get("link", "") - snippet = r.get("snippet", "") + for idx, page in enumerate(data["organic"], 1): + title = page.get("title", "No title") + url = page.get("link", "") + snippet_text = page.get("snippet", "") + date = page.get("date", "") + source = page.get("source", "") + + result = f"{idx}. [{title}]({url})" + if date: + result += f"\nDate published: {date}" + if source: + result += f"\nSource: {source}" + if snippet_text: + result += f"\n{snippet_text}" - parts = [f"{idx}. [{title}]({url})"] - if snippet: - parts.append(snippet) - snippets.append("\n".join(parts)) + result = result.replace("Your browser can't play this video.", "") + snippets.append(result) - return f"[Serper/Google] '{query}' - {len(snippets)} results:\n\n" + "\n\n".join(snippets) + return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) + + except Exception as e: + print(f"[Serper] Error: {e}") + return None -class DuckDuckGoSearch: - """DuckDuckGo search provider (free, no API key needed).""" +def search_duckduckgo(query: str, num_results: int = 10) -> Optional[str]: + """ + DuckDuckGo - Free search with no API key required. + Rate limited but reliable as a final fallback. + """ + try: + from duckduckgo_search import DDGS + from duckduckgo_search.exceptions import RatelimitException + except ImportError: + print("[DuckDuckGo] duckduckgo-search package not installed") + return None - def search(self, query: str, num_results: int = 10) -> str: + retries = 3 + for attempt in range(retries): try: - from duckduckgo_search import DDGS - except ImportError: - raise SearchProviderError("DuckDuckGo: duckduckgo_search not installed") - - results: List[Dict[str, Any]] = [] - for attempt in range(3): - try: - with DDGS() as ddg: - results = list(ddg.text(query, max_results=num_results)) - break - except Exception as e: - err = str(e).lower() - if "ratelimit" in err or "429" in err: - if attempt < 2: - wait = 2 ** attempt - time.sleep(wait) - continue - raise SearchProviderError(f"DuckDuckGo rate limited after {attempt + 1} attempts") - if attempt == 2: - raise SearchProviderError(f"DuckDuckGo failed: {e}") - time.sleep(1) - - if not results: - return f"No results found for '{query}'." - - snippets = [] - for idx, r in enumerate(results, 1): - title = r.get("title", "No title") - url = r.get("href", "") - body = r.get("body", "") + with DDGS() as ddgs: + results = list(ddgs.text(query, max_results=num_results)) + + if not results: + return None - parts = [f"{idx}. [{title}]({url})"] - if body: - parts.append(body) - snippets.append("\n".join(parts)) + snippets = [] + for idx, r in enumerate(results, 1): + title = r.get("title", "No title") + url = r.get("href", r.get("link", "")) + body = r.get("body", "")[:300] + + snippet = f"{idx}. [{title}]({url})" + if body: + snippet += f"\n{body}" + snippets.append(snippet) + + return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) - return f"[DuckDuckGo] '{query}' - {len(snippets)} results:\n\n" + "\n\n".join(snippets) + except RatelimitException: + if attempt < retries - 1: + time.sleep(2 ** attempt) + continue + print("[DuckDuckGo] Rate limited after retries") + return None + except Exception as e: + print(f"[DuckDuckGo] Error: {e}") + return None + + return None -@register_tool("search", allow_overwrite=True) -class Search(BaseTool): +# ============================================================================= +# Multi-Provider Search with Fallback +# ============================================================================= + +def multi_provider_search(query: str, num_results: int = 10) -> str: """ - Multi-provider search tool with automatic fallback. + Search using multiple providers with automatic fallback. - Fallback chain: Exa.ai → Serper → DuckDuckGo + Provider priority (by quality): + 1. Exa.ai - Best semantic search + 2. Tavily - Purpose-built for LLMs + 3. Serper - Google SERP results + 4. DuckDuckGo - Free fallback + + Returns the first successful result or an error message. """ + providers = [ + ("Exa", search_exa), + ("Tavily", search_tavily), + ("Serper", search_serper), + ("DuckDuckGo", search_duckduckgo), + ] + + errors = [] + + for name, search_fn in providers: + result = search_fn(query, num_results) + if result: + return result + errors.append(name) + + return f"No results found for '{query}'. All providers failed: {', '.join(errors)}. Try a different query." + + +# ============================================================================= +# Qwen Agent Tool Registration +# ============================================================================= + +@register_tool("search", allow_overwrite=True) +class Search(BaseTool): + """Web search tool with multi-provider fallback.""" name = "search" - description = "Search the web. Tries Exa.ai (semantic), then Serper (Google), then DuckDuckGo. Supply 'query' as string or array." + description = "Performs batched web searches: supply an array 'query'; the tool retrieves the top 10 results for each query in one call." parameters = { "type": "object", "properties": { "query": { "type": "array", "items": {"type": "string"}, - "description": "Query string or array of queries" - }, - "num_results": { - "type": "integer", - "description": "Results per query (default: 10)", - "default": 10 - }, - "include_contents": { - "type": "boolean", - "description": "Include page contents (Exa only)", - "default": False + "description": "Array of query strings. Include multiple complementary search queries in a single call.", }, - "category": { - "type": "string", - "description": "Category filter (Exa only): 'research paper', 'news', 'company', 'pdf', 'github'", - "enum": VALID_CATEGORIES - } }, "required": ["query"], } def __init__(self, cfg: Optional[dict] = None): super().__init__(cfg) - self.providers: List[tuple] = [] - # Build provider list based on available API keys + # Log which providers are available + available = [] if EXA_API_KEY: - self.providers.append(("exa", ExaSearch(EXA_API_KEY))) - if SERPER_API_KEY: - self.providers.append(("serper", SerperSearch(SERPER_API_KEY))) - # DuckDuckGo always available (no API key needed) - self.providers.append(("duckduckgo", DuckDuckGoSearch())) + available.append("Exa") + if TAVILY_API_KEY: + available.append("Tavily") + if SERPER_KEY: + available.append("Serper") + available.append("DuckDuckGo") - if not self.providers: - raise ValueError("No search providers available") + print(f"[Search] Available providers: {', '.join(available)}") - def _search_with_fallback( - self, - query: str, - num_results: int = 10, - include_contents: bool = False, - category: Optional[str] = None - ) -> str: - """Try each provider in order until one succeeds.""" - errors = [] - - for name, provider in self.providers: - try: - if name == "exa": - return provider.search(query, num_results, include_contents, category) - elif name == "serper": - return provider.search(query, num_results) - else: # duckduckgo - return provider.search(query, num_results) - except SearchProviderError as e: - errors.append(f"{name}: {e}") - continue - - return f"[Search] All providers failed for '{query}':\n" + "\n".join(f" - {e}" for e in errors) - - def call(self, params: Union[str, dict], **kwargs: Any) -> str: - params_dict: Dict[str, Any] + def call(self, params: Union[str, dict], **kwargs) -> str: if isinstance(params, str): - try: - params_dict = json.loads(params) - except json.JSONDecodeError: - return "[Search] Invalid JSON" - else: - params_dict = dict(params) + return "[Search] Invalid request format: Input must be a JSON object containing 'query' field" + params_dict: dict = params query = params_dict.get("query") - if not query: - return "[Search] 'query' is required" - - num_results = int(params_dict.get("num_results", 10) or 10) - include_contents = bool(params_dict.get("include_contents", False)) - category = params_dict.get("category") - - if category and category not in VALID_CATEGORIES: - category = None + if query is None: + return "[Search] Invalid request format: Input must be a JSON object containing 'query' field" if isinstance(query, str): - return self._search_with_fallback(query, num_results, include_contents, category) + return multi_provider_search(query) - if isinstance(query, list): - results = [] - for q in query: - results.append(self._search_with_fallback(q, num_results, include_contents, category)) - return "\n=======\n".join(results) + if not isinstance(query, list): + return "[Search] Invalid query format: 'query' must be a string or array of strings" - return "[Search] query must be string or array" - - -if __name__ == "__main__": - from dotenv import load_dotenv - - env_path = os.path.join(os.path.dirname(__file__), "..", ".env") - load_dotenv(env_path) - - # Check available providers - exa_key = os.environ.get('EXA_API_KEY') - serper_key = os.environ.get('SERPER_API_KEY') - - print("Available providers:") - if exa_key: - print(" ✓ Exa.ai") - if serper_key: - print(" ✓ Serper.dev") - print(" ✓ DuckDuckGo (always available)") - print() - - searcher = Search() - result = searcher.call({"query": ["What is retrieval augmented generation?"]}) - print(result) + responses = [] + for q in query: + responses.append(multi_provider_search(q)) + + return "\n=======\n".join(responses) From a2c991f7d80995302d94b15999b259bf18f32371 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chindri=C8=99=20Mihai=20Alexandru?= <12643176+chindris-mihai-alexandru@users.noreply.github.com> Date: Fri, 28 Nov 2025 17:04:42 +0200 Subject: [PATCH 6/6] Improve search providers with robust error handling - Use Exa type: auto instead of neural for better results - Switch Tavily to Bearer token auth (per API docs) - Add Exa 402 (payment required) error handling - Add Tavily 432/433 quota error handling - Switch Serper from http.client to requests for consistency - Add sanitize_query() for input validation - Add format_results() for consistent output - Handle DuckDuckGoSearchException - Add ConnectionError, JSONDecodeError handling - Better error messages with provider names --- inference/tool_search.py | 417 ++++++++++++++++++++++++++------------- 1 file changed, 280 insertions(+), 137 deletions(-) diff --git a/inference/tool_search.py b/inference/tool_search.py index 03229595..7431e5e2 100644 --- a/inference/tool_search.py +++ b/inference/tool_search.py @@ -3,10 +3,10 @@ ============================== Implements a robust search fallback chain optimized for AI research: - 1. Exa.ai - Best semantic/neural search, $10 free credits - 2. Tavily - Purpose-built for RAG/LLMs, 1,000 free requests/month - 3. Serper - Google SERP results, 2,500 free queries - 4. DuckDuckGo - Free forever, final fallback (no API key needed) + 1. Exa.ai - Best semantic/neural search ($10 free credits) + 2. Tavily - Purpose-built for RAG/LLMs (1,000 free requests/month) + 3. Serper - Google SERP results (2,500 free queries) + 4. DuckDuckGo - Free forever, final fallback (no API key needed) Each provider is tried in order. If one fails (rate limit, error, no key), the next provider is attempted automatically. @@ -19,27 +19,63 @@ If no API keys are set, DuckDuckGo is used as the default (free, no key needed). """ -import http.client import json import os import time -from typing import Dict, List, Optional, Union +from typing import List, Optional, Union import requests from qwen_agent.tools.base import BaseTool, register_tool # API Keys from environment -EXA_API_KEY = os.environ.get("EXA_API_KEY", "") -TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY", "") -SERPER_KEY = os.environ.get("SERPER_KEY_ID", "") +EXA_API_KEY = os.environ.get("EXA_API_KEY", "").strip() +TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY", "").strip() +SERPER_KEY = os.environ.get("SERPER_KEY_ID", "").strip() + +# Request timeouts (seconds) +REQUEST_TIMEOUT = 30 def contains_chinese(text: str) -> bool: """Check if text contains Chinese characters.""" + if not text: + return False return any("\u4E00" <= char <= "\u9FFF" for char in text) +def sanitize_query(query: str) -> str: + """Sanitize and validate a search query.""" + if not query: + return "" + # Strip whitespace and limit length + return query.strip()[:500] + + +def format_results(query: str, results: List[dict], provider: str) -> str: + """Format search results into a consistent markdown format.""" + if not results: + return "" + + snippets = [] + for idx, r in enumerate(results, 1): + title = r.get("title", "No title") + url = r.get("url", "") + snippet = r.get("snippet", "") + date = r.get("date", "") + + # Build result entry + entry = f"{idx}. [{title}]({url})" + if date: + entry += f"\nDate: {date}" + if snippet: + entry += f"\n{snippet}" + snippets.append(entry) + + header = f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + return header + "\n\n".join(snippets) + + # ============================================================================= # Search Providers # ============================================================================= @@ -48,10 +84,16 @@ def search_exa(query: str, num_results: int = 10) -> Optional[str]: """ Exa.ai - Neural/semantic search engine. Best for finding conceptually relevant results, not just keyword matches. + + API Docs: https://docs.exa.ai/reference/search """ if not EXA_API_KEY: return None + query = sanitize_query(query) + if not query: + return None + try: response = requests.post( "https://api.exa.ai/search", @@ -61,50 +103,65 @@ def search_exa(query: str, num_results: int = 10) -> Optional[str]: }, json={ "query": query, - "numResults": num_results, - "useAutoprompt": True, - "type": "neural", + "numResults": min(num_results, 100), # API max is 100 + "type": "auto", # Let Exa choose best search type }, - timeout=30, + timeout=REQUEST_TIMEOUT, ) + # Handle error responses if response.status_code == 401: - print("[Exa] Invalid API key") + print("[Exa] Invalid or expired API key") return None if response.status_code == 429: - print("[Exa] Rate limited") + print("[Exa] Rate limited - too many requests") + return None + if response.status_code == 402: + print("[Exa] Payment required - credits exhausted") return None if response.status_code != 200: - print(f"[Exa] Error {response.status_code}: {response.text[:200]}") + error_msg = response.text[:200] if response.text else "Unknown error" + print(f"[Exa] Error {response.status_code}: {error_msg}") return None data = response.json() - results = data.get("results", []) + api_results = data.get("results", []) - if not results: + if not api_results: return None - snippets = [] - for idx, r in enumerate(results, 1): - title = r.get("title", "No title") + # Normalize results + results = [] + for r in api_results: + title = r.get("title") or "No title" url = r.get("url", "") - text = r.get("text", "")[:300] if r.get("text") else "" + text = r.get("text", "") published = r.get("publishedDate", "") - snippet = f"{idx}. [{title}]({url})" - if published: - snippet += f"\nDate published: {published[:10]}" - if text: - snippet += f"\n{text}" - snippets.append(snippet) + # Truncate text for snippet + snippet = text[:300] + "..." if len(text) > 300 else text + date = published[:10] if published else "" + + results.append({ + "title": title, + "url": url, + "snippet": snippet, + "date": date, + }) - return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) + return format_results(query, results, "Exa") except requests.Timeout: print("[Exa] Request timeout") return None + except requests.ConnectionError: + print("[Exa] Connection error - network issue") + return None + except json.JSONDecodeError: + print("[Exa] Invalid JSON response") + return None except Exception as e: - print(f"[Exa] Error: {e}") + print(f"[Exa] Unexpected error: {type(e).__name__}: {e}") return None @@ -112,60 +169,89 @@ def search_tavily(query: str, num_results: int = 10) -> Optional[str]: """ Tavily - Search API designed specifically for RAG and LLM applications. Returns AI-optimized snippets and supports advanced filtering. + + API Docs: https://docs.tavily.com/documentation/api-reference/endpoint/search """ if not TAVILY_API_KEY: return None + query = sanitize_query(query) + if not query: + return None + try: + # Tavily supports both Bearer token and api_key in body + # Using Bearer token as it's more standard response = requests.post( "https://api.tavily.com/search", - headers={"Content-Type": "application/json"}, + headers={ + "Authorization": f"Bearer {TAVILY_API_KEY}", + "Content-Type": "application/json", + }, json={ - "api_key": TAVILY_API_KEY, "query": query, - "max_results": num_results, - "search_depth": "advanced", + "max_results": min(num_results, 20), # API max is 20 + "search_depth": "basic", # Use basic (1 credit) vs advanced (2 credits) "include_answer": False, "include_raw_content": False, }, - timeout=30, + timeout=REQUEST_TIMEOUT, ) + # Handle error responses if response.status_code == 401: - print("[Tavily] Invalid API key") + print("[Tavily] Invalid or expired API key") return None if response.status_code == 429: - print("[Tavily] Rate limited") + print("[Tavily] Rate limited - too many requests") + return None + if response.status_code == 432: + print("[Tavily] Plan limit exceeded - upgrade required") + return None + if response.status_code == 433: + print("[Tavily] Pay-as-you-go limit exceeded") return None if response.status_code != 200: - print(f"[Tavily] Error {response.status_code}: {response.text[:200]}") + error_msg = response.text[:200] if response.text else "Unknown error" + print(f"[Tavily] Error {response.status_code}: {error_msg}") return None data = response.json() - results = data.get("results", []) + api_results = data.get("results", []) - if not results: + if not api_results: return None - snippets = [] - for idx, r in enumerate(results, 1): - title = r.get("title", "No title") + # Normalize results + results = [] + for r in api_results: + title = r.get("title") or "No title" url = r.get("url", "") - content = r.get("content", "")[:300] - score = r.get("score", 0) + content = r.get("content", "") - snippet = f"{idx}. [{title}]({url})" - if content: - snippet += f"\n{content}" - snippets.append(snippet) + # Truncate content for snippet + snippet = content[:300] + "..." if len(content) > 300 else content + + results.append({ + "title": title, + "url": url, + "snippet": snippet, + "date": "", + }) - return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) + return format_results(query, results, "Tavily") except requests.Timeout: print("[Tavily] Request timeout") return None + except requests.ConnectionError: + print("[Tavily] Connection error - network issue") + return None + except json.JSONDecodeError: + print("[Tavily] Invalid JSON response") + return None except Exception as e: - print(f"[Tavily] Error: {e}") + print(f"[Tavily] Unexpected error: {type(e).__name__}: {e}") return None @@ -173,83 +259,98 @@ def search_serper(query: str, num_results: int = 10) -> Optional[str]: """ Serper - Google Search API (SERP results). Fast and reliable Google search results. + + API Docs: https://serper.dev/ """ if not SERPER_KEY: return None + query = sanitize_query(query) + if not query: + return None + try: - conn = http.client.HTTPSConnection("google.serper.dev") - + # Determine locale based on query content if contains_chinese(query): - payload = json.dumps({ + payload = { "q": query, - "location": "China", "gl": "cn", "hl": "zh-cn", - "num": num_results, - }) + "num": min(num_results, 100), + } else: - payload = json.dumps({ + payload = { "q": query, - "location": "United States", "gl": "us", "hl": "en", - "num": num_results, - }) + "num": min(num_results, 100), + } - headers = { - "X-API-KEY": SERPER_KEY, - "Content-Type": "application/json", - } - - res = None - for attempt in range(3): - try: - conn.request("POST", "/search", payload, headers) - res = conn.getresponse() - break - except Exception as e: - if attempt == 2: - print(f"[Serper] Connection error: {e}") - return None - time.sleep(1) - continue + # Use requests instead of http.client for consistency and better error handling + response = requests.post( + "https://google.serper.dev/search", + headers={ + "X-API-KEY": SERPER_KEY, + "Content-Type": "application/json", + }, + json=payload, + timeout=REQUEST_TIMEOUT, + ) - if res is None: + # Handle error responses + if response.status_code == 401: + print("[Serper] Invalid API key") + return None + if response.status_code == 429: + print("[Serper] Rate limited") + return None + if response.status_code != 200: + error_msg = response.text[:200] if response.text else "Unknown error" + print(f"[Serper] Error {response.status_code}: {error_msg}") return None - data = json.loads(res.read().decode("utf-8")) + data = response.json() + # Check for API-level errors if "error" in data: print(f"[Serper] API error: {data['error']}") return None - if "organic" not in data: + organic = data.get("organic", []) + if not organic: return None - snippets = [] - for idx, page in enumerate(data["organic"], 1): - title = page.get("title", "No title") + # Normalize results + results = [] + for page in organic: + title = page.get("title") or "No title" url = page.get("link", "") snippet_text = page.get("snippet", "") date = page.get("date", "") - source = page.get("source", "") - result = f"{idx}. [{title}]({url})" - if date: - result += f"\nDate published: {date}" - if source: - result += f"\nSource: {source}" - if snippet_text: - result += f"\n{snippet_text}" + # Clean up snippet + snippet = snippet_text.replace("Your browser can't play this video.", "").strip() - result = result.replace("Your browser can't play this video.", "") - snippets.append(result) + results.append({ + "title": title, + "url": url, + "snippet": snippet, + "date": date, + }) - return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) + return format_results(query, results, "Serper") + except requests.Timeout: + print("[Serper] Request timeout") + return None + except requests.ConnectionError: + print("[Serper] Connection error - network issue") + return None + except json.JSONDecodeError: + print("[Serper] Invalid JSON response") + return None except Exception as e: - print(f"[Serper] Error: {e}") + print(f"[Serper] Unexpected error: {type(e).__name__}: {e}") return None @@ -260,41 +361,56 @@ def search_duckduckgo(query: str, num_results: int = 10) -> Optional[str]: """ try: from duckduckgo_search import DDGS - from duckduckgo_search.exceptions import RatelimitException + from duckduckgo_search.exceptions import RatelimitException, DuckDuckGoSearchException except ImportError: - print("[DuckDuckGo] duckduckgo-search package not installed") + print("[DuckDuckGo] duckduckgo-search package not installed. Run: pip install duckduckgo-search") return None - retries = 3 - for attempt in range(retries): + query = sanitize_query(query) + if not query: + return None + + max_retries = 3 + for attempt in range(max_retries): try: with DDGS() as ddgs: - results = list(ddgs.text(query, max_results=num_results)) + api_results = list(ddgs.text(query, max_results=min(num_results, 25))) - if not results: + if not api_results: return None - snippets = [] - for idx, r in enumerate(results, 1): - title = r.get("title", "No title") - url = r.get("href", r.get("link", "")) - body = r.get("body", "")[:300] + # Normalize results + results = [] + for r in api_results: + title = r.get("title") or "No title" + url = r.get("href") or r.get("link", "") + body = r.get("body", "") - snippet = f"{idx}. [{title}]({url})" - if body: - snippet += f"\n{body}" - snippets.append(snippet) + # Truncate body for snippet + snippet = body[:300] + "..." if len(body) > 300 else body + + results.append({ + "title": title, + "url": url, + "snippet": snippet, + "date": "", + }) - return f"A search for '{query}' found {len(snippets)} results:\n\n## Web Results\n\n" + "\n\n".join(snippets) + return format_results(query, results, "DuckDuckGo") except RatelimitException: - if attempt < retries - 1: - time.sleep(2 ** attempt) + if attempt < max_retries - 1: + wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s + print(f"[DuckDuckGo] Rate limited, waiting {wait_time}s...") + time.sleep(wait_time) continue - print("[DuckDuckGo] Rate limited after retries") + print("[DuckDuckGo] Rate limited after all retries") + return None + except DuckDuckGoSearchException as e: + print(f"[DuckDuckGo] Search error: {e}") return None except Exception as e: - print(f"[DuckDuckGo] Error: {e}") + print(f"[DuckDuckGo] Unexpected error: {type(e).__name__}: {e}") return None return None @@ -309,13 +425,18 @@ def multi_provider_search(query: str, num_results: int = 10) -> str: Search using multiple providers with automatic fallback. Provider priority (by quality): - 1. Exa.ai - Best semantic search - 2. Tavily - Purpose-built for LLMs - 3. Serper - Google SERP results - 4. DuckDuckGo - Free fallback + 1. Exa.ai - Best semantic search + 2. Tavily - Purpose-built for LLMs + 3. Serper - Google SERP results + 4. DuckDuckGo - Free fallback Returns the first successful result or an error message. """ + # Validate query + query = sanitize_query(query) + if not query: + return "[Search] Empty query provided. Please provide a search term." + providers = [ ("Exa", search_exa), ("Tavily", search_tavily), @@ -323,15 +444,16 @@ def multi_provider_search(query: str, num_results: int = 10) -> str: ("DuckDuckGo", search_duckduckgo), ] - errors = [] + failed_providers = [] for name, search_fn in providers: result = search_fn(query, num_results) if result: return result - errors.append(name) + failed_providers.append(name) - return f"No results found for '{query}'. All providers failed: {', '.join(errors)}. Try a different query." + # All providers failed + return f"No results found for '{query}'. Providers attempted: {', '.join(failed_providers)}. Try a different or simpler query." # ============================================================================= @@ -359,7 +481,7 @@ class Search(BaseTool): def __init__(self, cfg: Optional[dict] = None): super().__init__(cfg) - # Log which providers are available + # Log which providers are available at initialization available = [] if EXA_API_KEY: available.append("Exa") @@ -367,27 +489,48 @@ def __init__(self, cfg: Optional[dict] = None): available.append("Tavily") if SERPER_KEY: available.append("Serper") - available.append("DuckDuckGo") + available.append("DuckDuckGo") # Always available - print(f"[Search] Available providers: {', '.join(available)}") + print(f"[Search] Initialized with providers: {', '.join(available)}") def call(self, params: Union[str, dict], **kwargs) -> str: + # Handle string input (invalid) if isinstance(params, str): - return "[Search] Invalid request format: Input must be a JSON object containing 'query' field" + return "[Search] Invalid request: Input must be a JSON object with 'query' field, not a string." + + # Handle None or non-dict + if not isinstance(params, dict): + return "[Search] Invalid request: Input must be a JSON object with 'query' field." - params_dict: dict = params - query = params_dict.get("query") + query = params.get("query") + + # Handle missing query if query is None: - return "[Search] Invalid request format: Input must be a JSON object containing 'query' field" + return "[Search] Missing 'query' field in request." + # Handle single string query if isinstance(query, str): + query = query.strip() + if not query: + return "[Search] Empty query string provided." return multi_provider_search(query) - if not isinstance(query, list): - return "[Search] Invalid query format: 'query' must be a string or array of strings" - - responses = [] - for q in query: - responses.append(multi_provider_search(q)) + # Handle list of queries + if isinstance(query, list): + if not query: + return "[Search] Empty query list provided." + + # Filter out empty strings + valid_queries = [q.strip() for q in query if isinstance(q, str) and q.strip()] + + if not valid_queries: + return "[Search] No valid queries in list (all empty or non-string)." + + responses = [] + for q in valid_queries: + responses.append(multi_provider_search(q)) + + return "\n=======\n".join(responses) - return "\n=======\n".join(responses) + # Invalid query type + return f"[Search] Invalid 'query' type: expected string or array, got {type(query).__name__}."