From b4df352076515521ddb7f5892e5ffb94654a800d Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Thu, 11 Dec 2025 00:29:44 +0100 Subject: [PATCH 01/16] feat(proxy): add Anthropic Messages API endpoint for Claude Code compatibility - Add /v1/messages endpoint with Anthropic-format request/response - Support both x-api-key and Bearer token authentication - Implement Anthropic <-> OpenAI format translation for messages, tools, and responses - Add streaming wrapper converting OpenAI SSE to Anthropic SSE events - Handle tool_use blocks with proper stop_reason detection - Fix NoneType iteration bug in tool_calls handling --- src/proxy_app/main.py | 690 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 689 insertions(+), 1 deletion(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 6eddf4e..f8eb2bf 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -99,7 +99,8 @@ from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware - from fastapi.responses import StreamingResponse + from fastapi.responses import StreamingResponse, JSONResponse + import uuid from fastapi.security import APIKeyHeader print(" → Loading core dependencies...") @@ -214,6 +215,112 @@ class EnrichedModelList(BaseModel): data: List[EnrichedModelCard] +# --- Anthropic API Models --- +class AnthropicTextBlock(BaseModel): + """Anthropic text content block.""" + + type: str = "text" + text: str + + +class AnthropicImageSource(BaseModel): + """Anthropic image source for base64 images.""" + + type: str = "base64" + media_type: str + data: str + + +class AnthropicImageBlock(BaseModel): + """Anthropic image content block.""" + + type: str = "image" + source: AnthropicImageSource + + +class AnthropicToolUseBlock(BaseModel): + """Anthropic tool use content block.""" + + type: str = "tool_use" + id: str + name: str + input: dict + + +class AnthropicToolResultBlock(BaseModel): + """Anthropic tool result content block.""" + + type: str = "tool_result" + tool_use_id: str + content: Union[str, List[Any]] + is_error: Optional[bool] = None + + +class AnthropicMessage(BaseModel): + """Anthropic message format.""" + + role: str + content: Union[ + str, + List[ + Union[ + AnthropicTextBlock, + AnthropicImageBlock, + AnthropicToolUseBlock, + AnthropicToolResultBlock, + dict, + ] + ], + ] + + +class AnthropicTool(BaseModel): + """Anthropic tool definition.""" + + name: str + description: Optional[str] = None + input_schema: dict + + +class AnthropicMessagesRequest(BaseModel): + """Anthropic Messages API request format.""" + + model: str + messages: List[AnthropicMessage] + max_tokens: int + system: Optional[Union[str, List[dict]]] = None + temperature: Optional[float] = None + top_p: Optional[float] = None + top_k: Optional[int] = None + stop_sequences: Optional[List[str]] = None + stream: Optional[bool] = False + tools: Optional[List[AnthropicTool]] = None + tool_choice: Optional[dict] = None + metadata: Optional[dict] = None + + +class AnthropicUsage(BaseModel): + """Anthropic usage statistics.""" + + input_tokens: int + output_tokens: int + cache_creation_input_tokens: Optional[int] = None + cache_read_input_tokens: Optional[int] = None + + +class AnthropicMessagesResponse(BaseModel): + """Anthropic Messages API response format.""" + + id: str + type: str = "message" + role: str = "assistant" + content: List[Union[AnthropicTextBlock, AnthropicToolUseBlock, dict]] + model: str + stop_reason: Optional[str] = None + stop_sequence: Optional[str] = None + usage: AnthropicUsage + + # Calculate total loading time _elapsed = time.time() - _start_time print( @@ -665,6 +772,433 @@ async def verify_api_key(auth: str = Depends(api_key_header)): return auth +# --- Anthropic API Key Header --- +anthropic_api_key_header = APIKeyHeader(name="x-api-key", auto_error=False) + + +async def verify_anthropic_api_key( + x_api_key: str = Depends(anthropic_api_key_header), + auth: str = Depends(api_key_header), +): + """ + Dependency to verify API key for Anthropic endpoints. + Accepts either x-api-key header (Anthropic style) or Authorization Bearer (OpenAI style). + """ + # Check x-api-key first (Anthropic style) + if x_api_key and x_api_key == PROXY_API_KEY: + return x_api_key + # Fall back to Bearer token (OpenAI style) + if auth and auth == f"Bearer {PROXY_API_KEY}": + return auth + raise HTTPException(status_code=401, detail="Invalid or missing API Key") + + +# --- Anthropic <-> OpenAI Format Translation --- +def anthropic_to_openai_messages( + anthropic_messages: List[dict], system: Optional[Union[str, List[dict]]] = None +) -> List[dict]: + """ + Convert Anthropic message format to OpenAI format. + + Key differences: + - Anthropic: system is a separate field, content can be string or list of blocks + - OpenAI: system is a message with role="system", content is usually string + """ + openai_messages = [] + + # Handle system message + if system: + if isinstance(system, str): + openai_messages.append({"role": "system", "content": system}) + elif isinstance(system, list): + # System can be list of text blocks in Anthropic format + system_text = " ".join( + block.get("text", "") + for block in system + if isinstance(block, dict) and block.get("type") == "text" + ) + if system_text: + openai_messages.append({"role": "system", "content": system_text}) + + for msg in anthropic_messages: + role = msg.get("role", "user") + content = msg.get("content", "") + + if isinstance(content, str): + openai_messages.append({"role": role, "content": content}) + elif isinstance(content, list): + # Handle content blocks + openai_content = [] + tool_calls = [] + + for block in content: + if isinstance(block, dict): + block_type = block.get("type", "text") + + if block_type == "text": + openai_content.append( + {"type": "text", "text": block.get("text", "")} + ) + elif block_type == "image": + # Convert Anthropic image format to OpenAI + source = block.get("source", {}) + if source.get("type") == "base64": + openai_content.append( + { + "type": "image_url", + "image_url": { + "url": f"data:{source.get('media_type', 'image/png')};base64,{source.get('data', '')}" + }, + } + ) + elif source.get("type") == "url": + openai_content.append( + { + "type": "image_url", + "image_url": {"url": source.get("url", "")}, + } + ) + elif block_type == "tool_use": + # Anthropic tool_use -> OpenAI tool_calls + tool_calls.append( + { + "id": block.get("id", ""), + "type": "function", + "function": { + "name": block.get("name", ""), + "arguments": json.dumps(block.get("input", {})), + }, + } + ) + elif block_type == "tool_result": + # Tool results become separate messages in OpenAI format + tool_content = block.get("content", "") + if isinstance(tool_content, list): + tool_content = " ".join( + b.get("text", "") + for b in tool_content + if isinstance(b, dict) and b.get("type") == "text" + ) + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": str(tool_content), + } + ) + continue # Don't add to current message + + # Build the message + if tool_calls: + # Assistant message with tool calls + msg_dict = {"role": role} + if openai_content: + # If there's text content alongside tool calls + text_parts = [ + c.get("text", "") + for c in openai_content + if c.get("type") == "text" + ] + msg_dict["content"] = " ".join(text_parts) if text_parts else None + else: + msg_dict["content"] = None + msg_dict["tool_calls"] = tool_calls + openai_messages.append(msg_dict) + elif openai_content: + # Check if it's just text or mixed content + if len(openai_content) == 1 and openai_content[0].get("type") == "text": + openai_messages.append( + {"role": role, "content": openai_content[0].get("text", "")} + ) + else: + openai_messages.append({"role": role, "content": openai_content}) + + return openai_messages + + +def anthropic_to_openai_tools( + anthropic_tools: Optional[List[dict]], +) -> Optional[List[dict]]: + """Convert Anthropic tool definitions to OpenAI format.""" + if not anthropic_tools: + return None + + openai_tools = [] + for tool in anthropic_tools: + openai_tools.append( + { + "type": "function", + "function": { + "name": tool.get("name", ""), + "description": tool.get("description", ""), + "parameters": tool.get("input_schema", {}), + }, + } + ) + return openai_tools + + +def anthropic_to_openai_tool_choice( + anthropic_tool_choice: Optional[dict], +) -> Optional[Union[str, dict]]: + """Convert Anthropic tool_choice to OpenAI format.""" + if not anthropic_tool_choice: + return None + + choice_type = anthropic_tool_choice.get("type", "auto") + + if choice_type == "auto": + return "auto" + elif choice_type == "any": + return "required" + elif choice_type == "tool": + return { + "type": "function", + "function": {"name": anthropic_tool_choice.get("name", "")}, + } + elif choice_type == "none": + return "none" + + return "auto" + + +def openai_to_anthropic_response(openai_response: dict, original_model: str) -> dict: + """ + Convert OpenAI chat completion response to Anthropic Messages format. + """ + choice = openai_response.get("choices", [{}])[0] + message = choice.get("message", {}) + usage = openai_response.get("usage", {}) + + # Build content blocks + content_blocks = [] + + # Add text content if present + text_content = message.get("content") + if text_content: + content_blocks.append({"type": "text", "text": text_content}) + + # Add tool use blocks if present + tool_calls = message.get("tool_calls") or [] + for tc in tool_calls: + func = tc.get("function", {}) + try: + input_data = json.loads(func.get("arguments", "{}")) + except json.JSONDecodeError: + input_data = {} + + content_blocks.append( + { + "type": "tool_use", + "id": tc.get("id", f"toolu_{int(time.time())}"), + "name": func.get("name", ""), + "input": input_data, + } + ) + + # Map finish_reason to stop_reason + finish_reason = choice.get("finish_reason", "end_turn") + stop_reason_map = { + "stop": "end_turn", + "length": "max_tokens", + "tool_calls": "tool_use", + "content_filter": "end_turn", + "function_call": "tool_use", + } + stop_reason = stop_reason_map.get(finish_reason, "end_turn") + + # Build usage + anthropic_usage = { + "input_tokens": usage.get("prompt_tokens", 0), + "output_tokens": usage.get("completion_tokens", 0), + } + + # Add cache tokens if present + if usage.get("prompt_tokens_details"): + details = usage["prompt_tokens_details"] + if details.get("cached_tokens"): + anthropic_usage["cache_read_input_tokens"] = details["cached_tokens"] + + return { + "id": openai_response.get("id", f"msg_{int(time.time())}"), + "type": "message", + "role": "assistant", + "content": content_blocks, + "model": original_model, + "stop_reason": stop_reason, + "stop_sequence": None, + "usage": anthropic_usage, + } + + +async def anthropic_streaming_wrapper( + request: Request, + openai_stream: AsyncGenerator[str, None], + original_model: str, + request_id: str, +) -> AsyncGenerator[str, None]: + """ + Convert OpenAI streaming format to Anthropic streaming format. + + Anthropic SSE events: + - message_start: Initial message metadata + - content_block_start: Start of a content block + - content_block_delta: Content chunk + - content_block_stop: End of a content block + - message_delta: Final message metadata (stop_reason, usage) + - message_stop: End of message + """ + message_started = False + content_block_started = False + current_block_index = 0 + accumulated_text = "" + tool_calls_by_index = {} # Track tool calls by their index + input_tokens = 0 + output_tokens = 0 + + try: + async for chunk_str in openai_stream: + if await request.is_disconnected(): + break + + if not chunk_str.strip() or not chunk_str.startswith("data:"): + continue + + data_content = chunk_str[len("data:") :].strip() + if data_content == "[DONE]": + # Close any open content blocks (text or tool_use) + if content_block_started or tool_calls_by_index: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + + # Determine stop_reason based on whether we had tool calls + stop_reason = "tool_use" if tool_calls_by_index else "end_turn" + + # Send message_delta with final info + yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "{stop_reason}", "stop_sequence": null}}, "usage": {{"output_tokens": {output_tokens}}}}}\n\n' + + # Send message_stop + yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' + break + + try: + chunk = json.loads(data_content) + except json.JSONDecodeError: + continue + + # Extract usage if present + if "usage" in chunk and chunk["usage"]: + input_tokens = chunk["usage"].get("prompt_tokens", input_tokens) + output_tokens = chunk["usage"].get("completion_tokens", output_tokens) + + # Send message_start on first chunk + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": input_tokens, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + message_started = True + + choices = chunk.get("choices", []) + if not choices: + continue + + delta = choices[0].get("delta", {}) + finish_reason = choices[0].get("finish_reason") + + # Handle text content + content = delta.get("content") + if content: + if not content_block_started: + # Start a text content block + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "text", "text": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + content_block_started = True + + # Send content delta + block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "text_delta", "text": content}, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + accumulated_text += content + + # Handle tool calls + tool_calls = delta.get("tool_calls", []) + for tc in tool_calls: + tc_index = tc.get("index", 0) + + if tc_index not in tool_calls_by_index: + # Close previous text block if open + if content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + content_block_started = False + + # Start new tool use block + tool_calls_by_index[tc_index] = { + "id": tc.get("id", f"toolu_{tc_index}"), + "name": tc.get("function", {}).get("name", ""), + "arguments": "", + } + + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": { + "type": "tool_use", + "id": tool_calls_by_index[tc_index]["id"], + "name": tool_calls_by_index[tc_index]["name"], + "input": {}, + }, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + + # Accumulate arguments + func = tc.get("function", {}) + if func.get("name"): + tool_calls_by_index[tc_index]["name"] = func["name"] + if func.get("arguments"): + tool_calls_by_index[tc_index]["arguments"] += func["arguments"] + + # Send partial JSON delta + block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "input_json_delta", + "partial_json": func["arguments"], + }, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + + # Note: We intentionally ignore finish_reason here. + # Block closing is handled when we receive [DONE] to avoid + # premature closes with providers that send finish_reason on each chunk. + + except Exception as e: + logging.error(f"Error in Anthropic streaming wrapper: {e}") + error_event = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + yield f"event: error\ndata: {json.dumps(error_event)}\n\n" + + async def streaming_response_wrapper( request: Request, request_data: dict, @@ -967,6 +1501,160 @@ async def chat_completions( raise HTTPException(status_code=500, detail=str(e)) +# --- Anthropic Messages API Endpoint --- +@app.post("/v1/messages") +async def anthropic_messages( + request: Request, + body: AnthropicMessagesRequest, + client: RotatingClient = Depends(get_rotating_client), + _=Depends(verify_anthropic_api_key), +): + """ + Anthropic-compatible Messages API endpoint. + + Accepts requests in Anthropic's format and returns responses in Anthropic's format. + Internally translates to OpenAI format for processing via LiteLLM. + + This endpoint is compatible with Claude Code and other Anthropic API clients. + """ + request_id = f"msg_{uuid.uuid4().hex[:24]}" + original_model = body.model + + # Initialize logger if enabled + logger = DetailedLogger() if ENABLE_REQUEST_LOGGING else None + + try: + # Convert Anthropic request to OpenAI format + anthropic_request = body.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + openai_tools = anthropic_to_openai_tools(anthropic_request.get("tools")) + openai_tool_choice = anthropic_to_openai_tool_choice( + anthropic_request.get("tool_choice") + ) + + # Build OpenAI-compatible request + openai_request = { + "model": body.model, + "messages": openai_messages, + "max_tokens": body.max_tokens, + "stream": body.stream or False, + } + + if body.temperature is not None: + openai_request["temperature"] = body.temperature + if body.top_p is not None: + openai_request["top_p"] = body.top_p + if body.stop_sequences: + openai_request["stop"] = body.stop_sequences + if openai_tools: + openai_request["tools"] = openai_tools + if openai_tool_choice: + openai_request["tool_choice"] = openai_tool_choice + + log_request_to_console( + url=str(request.url), + headers=dict(request.headers), + client_info=( + request.client.host if request.client else "unknown", + request.client.port if request.client else 0, + ), + request_data=openai_request, + ) + + if body.stream: + # Streaming response - acompletion returns a generator for streaming + response_generator = client.acompletion(request=request, **openai_request) + + return StreamingResponse( + anthropic_streaming_wrapper( + request, response_generator, original_model, request_id + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + else: + # Non-streaming response + response = await client.acompletion(request=request, **openai_request) + + # Convert OpenAI response to Anthropic format + openai_response = ( + response.model_dump() + if hasattr(response, "model_dump") + else dict(response) + ) + anthropic_response = openai_to_anthropic_response( + openai_response, original_model + ) + + # Override the ID with our request ID + anthropic_response["id"] = request_id + + if logger: + logger.log_final_response( + status_code=200, + headers=None, + body=anthropic_response, + ) + + return JSONResponse(content=anthropic_response) + + except ( + litellm.InvalidRequestError, + ValueError, + litellm.ContextWindowExceededError, + ) as e: + error_response = { + "type": "error", + "error": {"type": "invalid_request_error", "message": str(e)}, + } + raise HTTPException(status_code=400, detail=error_response) + except litellm.AuthenticationError as e: + error_response = { + "type": "error", + "error": {"type": "authentication_error", "message": str(e)}, + } + raise HTTPException(status_code=401, detail=error_response) + except litellm.RateLimitError as e: + error_response = { + "type": "error", + "error": {"type": "rate_limit_error", "message": str(e)}, + } + raise HTTPException(status_code=429, detail=error_response) + except (litellm.ServiceUnavailableError, litellm.APIConnectionError) as e: + error_response = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + raise HTTPException(status_code=503, detail=error_response) + except litellm.Timeout as e: + error_response = { + "type": "error", + "error": {"type": "api_error", "message": f"Request timed out: {str(e)}"}, + } + raise HTTPException(status_code=504, detail=error_response) + except Exception as e: + logging.error(f"Anthropic messages endpoint error: {e}") + if logger: + logger.log_final_response( + status_code=500, + headers=None, + body={"error": str(e)}, + ) + error_response = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + raise HTTPException(status_code=500, detail=error_response) + + @app.post("/v1/embeddings") async def embeddings( request: Request, From 7e229f4d93c2859c21df96ca2b601173bc5fcbff Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Fri, 12 Dec 2025 01:51:45 +0100 Subject: [PATCH 02/16] feat(anthropic): add extended thinking support to /v1/messages endpoint - Add AnthropicThinkingConfig model and thinking parameter to request - Translate Anthropic thinking config to reasoning_effort for providers - Handle reasoning_content in streaming wrapper (thinking_delta events) - Convert reasoning_content to thinking blocks in non-streaming responses --- src/proxy_app/main.py | 75 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index f8eb2bf..91017cb 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -282,6 +282,13 @@ class AnthropicTool(BaseModel): input_schema: dict +class AnthropicThinkingConfig(BaseModel): + """Anthropic thinking configuration.""" + + type: str # "enabled" or "disabled" + budget_tokens: Optional[int] = None + + class AnthropicMessagesRequest(BaseModel): """Anthropic Messages API request format.""" @@ -297,6 +304,7 @@ class AnthropicMessagesRequest(BaseModel): tools: Optional[List[AnthropicTool]] = None tool_choice: Optional[dict] = None metadata: Optional[dict] = None + thinking: Optional[AnthropicThinkingConfig] = None class AnthropicUsage(BaseModel): @@ -973,6 +981,15 @@ def openai_to_anthropic_response(openai_response: dict, original_model: str) -> # Build content blocks content_blocks = [] + # Add thinking content block if reasoning_content is present + reasoning_content = message.get("reasoning_content") + if reasoning_content: + content_blocks.append({ + "type": "thinking", + "thinking": reasoning_content, + "signature": "", # Signature is typically empty for proxied responses + }) + # Add text content if present text_content = message.get("content") if text_content: @@ -1050,8 +1067,10 @@ async def anthropic_streaming_wrapper( """ message_started = False content_block_started = False + thinking_block_started = False current_block_index = 0 accumulated_text = "" + accumulated_thinking = "" tool_calls_by_index = {} # Track tool calls by their index input_tokens = 0 output_tokens = 0 @@ -1066,8 +1085,8 @@ async def anthropic_streaming_wrapper( data_content = chunk_str[len("data:") :].strip() if data_content == "[DONE]": - # Close any open content blocks (text or tool_use) - if content_block_started or tool_calls_by_index: + # Close any open content blocks (thinking, text, or tool_use) + if thinking_block_started or content_block_started or tool_calls_by_index: yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' # Determine stop_reason based on whether we had tool calls @@ -1115,9 +1134,37 @@ async def anthropic_streaming_wrapper( delta = choices[0].get("delta", {}) finish_reason = choices[0].get("finish_reason") + # Handle reasoning/thinking content (from OpenAI-style reasoning_content) + reasoning_content = delta.get("reasoning_content") + if reasoning_content: + if not thinking_block_started: + # Start a thinking content block + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "thinking", "thinking": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + thinking_block_started = True + + # Send thinking delta + block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "thinking_delta", "thinking": reasoning_content}, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + accumulated_thinking += reasoning_content + # Handle text content content = delta.get("content") if content: + # If we were in a thinking block, close it first + if thinking_block_started and not content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + if not content_block_started: # Start a text content block block_start = { @@ -1143,6 +1190,12 @@ async def anthropic_streaming_wrapper( tc_index = tc.get("index", 0) if tc_index not in tool_calls_by_index: + # Close previous thinking block if open + if thinking_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + # Close previous text block if open if content_block_started: yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' @@ -1555,6 +1608,24 @@ async def anthropic_messages( if openai_tool_choice: openai_request["tool_choice"] = openai_tool_choice + # Handle Anthropic thinking config -> reasoning_effort translation + if body.thinking: + if body.thinking.type == "enabled": + # Map budget_tokens to reasoning_effort level + # Default to "medium" if enabled but budget not specified + budget = body.thinking.budget_tokens or 10000 + if budget >= 32000: + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + elif budget >= 10000: + openai_request["reasoning_effort"] = "high" + elif budget >= 5000: + openai_request["reasoning_effort"] = "medium" + else: + openai_request["reasoning_effort"] = "low" + elif body.thinking.type == "disabled": + openai_request["reasoning_effort"] = "disable" + log_request_to_console( url=str(request.url), headers=dict(request.headers), From 7aea08eee94bc34060ee31691c32a72e735bd3e9 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Fri, 12 Dec 2025 02:03:37 +0100 Subject: [PATCH 03/16] feat(anthropic): force high thinking budget for Opus models by default When no thinking config is provided in the request, Opus models now automatically use reasoning_effort=high with custom_reasoning_budget=True. This ensures Opus 4.5 uses the full 32768 token thinking budget instead of the backend's auto mode (thinkingBudget: -1) which may use less. Opus always uses the -thinking variant regardless, but this change guarantees maximum thinking capacity for better reasoning quality. --- src/proxy_app/main.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 91017cb..2cc61f0 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1625,6 +1625,12 @@ async def anthropic_messages( openai_request["reasoning_effort"] = "low" elif body.thinking.type == "disabled": openai_request["reasoning_effort"] = "disable" + elif "opus" in body.model.lower(): + # Force high thinking for Opus models when no thinking config is provided + # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget + # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True log_request_to_console( url=str(request.url), From 05d89a2a53ba610ef562d6e8ca0b2fe02e360c52 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Sat, 13 Dec 2025 12:11:23 +0100 Subject: [PATCH 04/16] fix: ensure max_tokens exceeds thinking budget and improve error handling - Add validation to ensure maxOutputTokens > thinkingBudget for Claude extended thinking (prevents 400 INVALID_ARGUMENT API errors) - Improve streaming error handling to send proper message_start and content blocks before error event for better client compatibility - Minor code formatting improvements --- src/proxy_app/main.py | 60 +++++++++++++++++-- .../providers/antigravity_provider.py | 22 +++++++ 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 2cc61f0..f763697 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -984,11 +984,13 @@ def openai_to_anthropic_response(openai_response: dict, original_model: str) -> # Add thinking content block if reasoning_content is present reasoning_content = message.get("reasoning_content") if reasoning_content: - content_blocks.append({ - "type": "thinking", - "thinking": reasoning_content, - "signature": "", # Signature is typically empty for proxied responses - }) + content_blocks.append( + { + "type": "thinking", + "thinking": reasoning_content, + "signature": "", # Signature is typically empty for proxied responses + } + ) # Add text content if present text_content = message.get("content") @@ -1086,7 +1088,11 @@ async def anthropic_streaming_wrapper( data_content = chunk_str[len("data:") :].strip() if data_content == "[DONE]": # Close any open content blocks (thinking, text, or tool_use) - if thinking_block_started or content_block_started or tool_calls_by_index: + if ( + thinking_block_started + or content_block_started + or tool_calls_by_index + ): yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' # Determine stop_reason based on whether we had tool calls @@ -1245,6 +1251,48 @@ async def anthropic_streaming_wrapper( except Exception as e: logging.error(f"Error in Anthropic streaming wrapper: {e}") + + # If we haven't sent message_start yet, send it now so the client can display the error + # Claude Code and other clients may ignore events that come before message_start + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": 0, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + + # Send the error as a text content block so it's visible to the user + error_message = f"Error: {str(e)}" + error_block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "text", "text": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(error_block_start)}\n\n" + + error_block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "text_delta", "text": error_message}, + } + yield f"event: content_block_delta\ndata: {json.dumps(error_block_delta)}\n\n" + + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + + # Send message_delta and message_stop to properly close the stream + yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "end_turn", "stop_sequence": null}}, "usage": {{"output_tokens": 0}}}}\n\n' + yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' + + # Also send the formal error event for clients that handle it error_event = { "type": "error", "error": {"type": "api_error", "message": str(e)}, diff --git a/src/rotator_library/providers/antigravity_provider.py b/src/rotator_library/providers/antigravity_provider.py index 4bd0b21..874e910 100644 --- a/src/rotator_library/providers/antigravity_provider.py +++ b/src/rotator_library/providers/antigravity_provider.py @@ -3537,6 +3537,28 @@ def _transform_to_antigravity_format( gen_config["maxOutputTokens"] = DEFAULT_MAX_OUTPUT_TOKENS # For non-Claude models without explicit max_tokens, don't set it + # CRITICAL: For Claude with extended thinking, max_tokens MUST be > thinking.budget_tokens + # Per Claude docs: https://docs.claude.com/en/docs/build-with-claude/extended-thinking + # If this constraint is violated, the API returns 400 INVALID_ARGUMENT + thinking_config = gen_config.get("thinkingConfig", {}) + thinking_budget = thinking_config.get("thinkingBudget", 0) + current_max_tokens = gen_config.get("maxOutputTokens") + + if ( + is_claude + and thinking_budget + and thinking_budget > 0 + and current_max_tokens is not None + ): + # Ensure max_tokens > thinkingBudget (add buffer for actual response content) + min_required_tokens = thinking_budget + 1024 # 1024 buffer for response + if current_max_tokens <= thinking_budget: + lib_logger.warning( + f"max_tokens ({current_max_tokens}) must be > thinkingBudget ({thinking_budget}). " + f"Adjusting to {min_required_tokens}" + ) + gen_config["maxOutputTokens"] = min_required_tokens + antigravity_payload["request"]["generationConfig"] = gen_config # Set toolConfig based on tool_choice parameter From e35f3f019812b03448e3618c5c00b36e7a0e05a3 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Sun, 14 Dec 2025 01:48:16 +0100 Subject: [PATCH 05/16] fix(anthropic): properly close all content blocks in streaming wrapper Track each tool_use block index separately and emit content_block_stop for all blocks (thinking, text, and each tool_use) when stream ends. Fixes Claude Code stopping mid-action due to malformed streaming events. --- src/proxy_app/main.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index f763697..75e21a0 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1074,6 +1074,7 @@ async def anthropic_streaming_wrapper( accumulated_text = "" accumulated_thinking = "" tool_calls_by_index = {} # Track tool calls by their index + tool_block_indices = {} # Track which block index each tool call uses input_tokens = 0 output_tokens = 0 @@ -1087,13 +1088,22 @@ async def anthropic_streaming_wrapper( data_content = chunk_str[len("data:") :].strip() if data_content == "[DONE]": - # Close any open content blocks (thinking, text, or tool_use) - if ( - thinking_block_started - or content_block_started - or tool_calls_by_index - ): + # Close any open thinking block + if thinking_block_started: yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + + # Close any open text block + if content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + content_block_started = False + + # Close all open tool_use blocks + for tc_index in sorted(tool_block_indices.keys()): + block_idx = tool_block_indices[tc_index] + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {block_idx}}}\n\n' # Determine stop_reason based on whether we had tool calls stop_reason = "tool_use" if tool_calls_by_index else "end_turn" @@ -1214,6 +1224,8 @@ async def anthropic_streaming_wrapper( "name": tc.get("function", {}).get("name", ""), "arguments": "", } + # Track which block index this tool call uses + tool_block_indices[tc_index] = current_block_index block_start = { "type": "content_block_start", @@ -1226,6 +1238,8 @@ async def anthropic_streaming_wrapper( }, } yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + # Increment for the next block + current_block_index += 1 # Accumulate arguments func = tc.get("function", {}) @@ -1234,10 +1248,10 @@ async def anthropic_streaming_wrapper( if func.get("arguments"): tool_calls_by_index[tc_index]["arguments"] += func["arguments"] - # Send partial JSON delta + # Send partial JSON delta using the correct block index for this tool block_delta = { "type": "content_block_delta", - "index": current_block_index, + "index": tool_block_indices[tc_index], "delta": { "type": "input_json_delta", "partial_json": func["arguments"], From 4ec92ec9673b9bbfc323ab9adbd89e639f0e1a3c Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Sun, 14 Dec 2025 23:50:09 +0100 Subject: [PATCH 06/16] fix(anthropic): add missing uuid import for /v1/messages endpoint --- src/proxy_app/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 75e21a0..d582ad7 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1,4 +1,5 @@ import time +import uuid # Phase 1: Minimal imports for arg parsing and TUI import asyncio From b70efdf65aa1dcd118be54399fd60aa41e2784da Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Mon, 15 Dec 2025 00:27:02 +0100 Subject: [PATCH 07/16] fix(anthropic): always set custom_reasoning_budget when thinking is enabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fixed bug where budget_tokens between 10000-32000 would get ÷4 reduction - Now any explicit thinking request sets custom_reasoning_budget=True - Added logging to show thinking budget, effort level, and custom_budget flag - Simplified budget tier logic (removed redundant >= 32000 check) Before: 31999 tokens requested → 8192 tokens actual (÷4 applied) After: 31999 tokens requested → 32768 tokens actual (full "high" budget) --- src/proxy_app/main.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index d582ad7..ed0a37a 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1672,15 +1672,16 @@ async def anthropic_messages( openai_request["tool_choice"] = openai_tool_choice # Handle Anthropic thinking config -> reasoning_effort translation + thinking_budget_requested = None if body.thinking: if body.thinking.type == "enabled": # Map budget_tokens to reasoning_effort level - # Default to "medium" if enabled but budget not specified + # Always set custom_reasoning_budget=True when client explicitly requests thinking + # This prevents the ÷4 reduction in Antigravity provider budget = body.thinking.budget_tokens or 10000 - if budget >= 32000: - openai_request["reasoning_effort"] = "high" - openai_request["custom_reasoning_budget"] = True - elif budget >= 10000: + thinking_budget_requested = budget + openai_request["custom_reasoning_budget"] = True + if budget >= 10000: openai_request["reasoning_effort"] = "high" elif budget >= 5000: openai_request["reasoning_effort"] = "medium" @@ -1688,12 +1689,21 @@ async def anthropic_messages( openai_request["reasoning_effort"] = "low" elif body.thinking.type == "disabled": openai_request["reasoning_effort"] = "disable" + thinking_budget_requested = 0 elif "opus" in body.model.lower(): # Force high thinking for Opus models when no thinking config is provided # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget - # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high openai_request["reasoning_effort"] = "high" openai_request["custom_reasoning_budget"] = True + thinking_budget_requested = "auto (high)" + + # Log thinking config for debugging + if thinking_budget_requested is not None: + logging.info( + f"🧠 Thinking: requested={thinking_budget_requested}, " + f"effort={openai_request.get('reasoning_effort', 'none')}, " + f"custom_budget={openai_request.get('custom_reasoning_budget', False)}" + ) log_request_to_console( url=str(request.url), From 4bd879b3f60a2e58a96b52d8c27b69de6f2acf70 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Mon, 15 Dec 2025 00:31:00 +0100 Subject: [PATCH 08/16] feat(openai): auto-enable full thinking budget for Opus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When using /v1/chat/completions with Opus and reasoning_effort="high" or "medium", automatically set custom_reasoning_budget=true to get full thinking tokens instead of the ÷4 reduced default. This makes the OpenAI endpoint behave consistently with the Anthropic endpoint for Opus models - if you're using Opus with high reasoning, you want the full thinking budget. Adds logging: "🧠 Thinking: auto-enabled custom_reasoning_budget for Opus" --- src/proxy_app/main.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index ed0a37a..6bd3976 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1547,6 +1547,20 @@ async def chat_completions( "custom_reasoning_budget" ) or generation_cfg.get("custom_reasoning_budget", False) + # Auto-enable full thinking budget for Opus with high reasoning effort + # Opus is THE reasoning model - if you're asking for "high", you want full budget + if ( + model + and "opus" in model.lower() + and reasoning_effort in ("high", "medium") + and not custom_reasoning_budget + ): + request_data["custom_reasoning_budget"] = True + custom_reasoning_budget = True + logging.info( + f"🧠 Thinking: auto-enabled custom_reasoning_budget for Opus (effort={reasoning_effort})" + ) + logging.getLogger("rotator_library").debug( f"Handling reasoning parameters: model={model}, reasoning_effort={reasoning_effort}, custom_reasoning_budget={custom_reasoning_budget}" ) From 758b4b53d84c7d751e01e314ddd98329a760bdce Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Mon, 15 Dec 2025 00:49:43 +0100 Subject: [PATCH 09/16] fix(anthropic): add missing JSONResponse import for non-streaming responses --- src/proxy_app/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 6bd3976..b0dce52 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -101,7 +101,6 @@ from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse - import uuid from fastapi.security import APIKeyHeader print(" → Loading core dependencies...") From f2d728849f6aee2277c5c3a15be7b1e56cc724d2 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Mon, 15 Dec 2025 21:55:10 +0100 Subject: [PATCH 10/16] fix(anthropic): ensure message_start is sent before message_stop in streaming Claude Code and other Anthropic SDK clients require message_start to be sent before any other SSE events. When a stream completed quickly without content chunks, the wrapper would send message_stop without message_start, causing clients to silently discard all output. --- src/proxy_app/main.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index b0dce52..cc7e268 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1088,6 +1088,25 @@ async def anthropic_streaming_wrapper( data_content = chunk_str[len("data:") :].strip() if data_content == "[DONE]": + # CRITICAL: Send message_start if we haven't yet (e.g., empty response) + # Claude Code and other clients require message_start before message_stop + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": input_tokens, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + message_started = True + # Close any open thinking block if thinking_block_started: yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' From de88557392e40c148af4003c0e3f90382435ea8b Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Tue, 16 Dec 2025 01:00:17 +0100 Subject: [PATCH 11/16] feat: add /context endpoint for anthropic routes Signed-off-by: Moeeze Hassan --- src/proxy_app/main.py | 95 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index cc7e268..18104a6 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -329,6 +329,24 @@ class AnthropicMessagesResponse(BaseModel): usage: AnthropicUsage +# --- Anthropic Count Tokens Models --- +class AnthropicCountTokensRequest(BaseModel): + """Anthropic count_tokens API request format.""" + + model: str + messages: List[AnthropicMessage] + system: Optional[Union[str, List[dict]]] = None + tools: Optional[List[AnthropicTool]] = None + tool_choice: Optional[dict] = None + thinking: Optional[AnthropicThinkingConfig] = None + + +class AnthropicCountTokensResponse(BaseModel): + """Anthropic count_tokens API response format.""" + + input_tokens: int + + # Calculate total loading time _elapsed = time.time() - _start_time print( @@ -1837,6 +1855,83 @@ async def anthropic_messages( raise HTTPException(status_code=500, detail=error_response) +# --- Anthropic Count Tokens Endpoint --- +@app.post("/v1/messages/count_tokens") +async def anthropic_count_tokens( + request: Request, + body: AnthropicCountTokensRequest, + client: RotatingClient = Depends(get_rotating_client), + _=Depends(verify_anthropic_api_key), +): + """ + Anthropic-compatible count_tokens endpoint. + + Counts the number of tokens that would be used by a Messages API request. + This is useful for estimating costs and managing context windows. + + Accepts requests in Anthropic's format and returns token count in Anthropic's format. + """ + try: + # Convert Anthropic request to OpenAI format for token counting + anthropic_request = body.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + # Count tokens for messages + message_tokens = client.token_count( + model=body.model, + messages=openai_messages, + ) + + # Count tokens for tools if present + tool_tokens = 0 + if body.tools: + # Tools add tokens based on their definitions + # Convert to JSON string and count tokens for tool definitions + openai_tools = anthropic_to_openai_tools( + [tool.model_dump() for tool in body.tools] + ) + if openai_tools: + # Serialize tools to count their token contribution + tools_text = json.dumps(openai_tools) + tool_tokens = client.token_count( + model=body.model, + text=tools_text, + ) + + total_tokens = message_tokens + tool_tokens + + return JSONResponse( + content={"input_tokens": total_tokens} + ) + + except ( + litellm.InvalidRequestError, + ValueError, + litellm.ContextWindowExceededError, + ) as e: + error_response = { + "type": "error", + "error": {"type": "invalid_request_error", "message": str(e)}, + } + raise HTTPException(status_code=400, detail=error_response) + except litellm.AuthenticationError as e: + error_response = { + "type": "error", + "error": {"type": "authentication_error", "message": str(e)}, + } + raise HTTPException(status_code=401, detail=error_response) + except Exception as e: + logging.error(f"Anthropic count_tokens endpoint error: {e}") + error_response = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + raise HTTPException(status_code=500, detail=error_response) + + @app.post("/v1/embeddings") async def embeddings( request: Request, From beed0bc2c26c23b52fce65be71e083ab45055ee7 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Fri, 19 Dec 2025 14:46:00 +0100 Subject: [PATCH 12/16] Revert "feat(openai): auto-enable full thinking budget for Opus" This reverts commit e80645e6191c6965f94b70fb0842f5689294a884. --- src/proxy_app/main.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 18104a6..5f33a7e 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1583,20 +1583,6 @@ async def chat_completions( "custom_reasoning_budget" ) or generation_cfg.get("custom_reasoning_budget", False) - # Auto-enable full thinking budget for Opus with high reasoning effort - # Opus is THE reasoning model - if you're asking for "high", you want full budget - if ( - model - and "opus" in model.lower() - and reasoning_effort in ("high", "medium") - and not custom_reasoning_budget - ): - request_data["custom_reasoning_budget"] = True - custom_reasoning_budget = True - logging.info( - f"🧠 Thinking: auto-enabled custom_reasoning_budget for Opus (effort={reasoning_effort})" - ) - logging.getLogger("rotator_library").debug( f"Handling reasoning parameters: model={model}, reasoning_effort={reasoning_effort}, custom_reasoning_budget={custom_reasoning_budget}" ) From 2c93a68a7828f5a0d73486097b6452dbb67af636 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Fri, 19 Dec 2025 14:52:36 +0100 Subject: [PATCH 13/16] Revert "fix(anthropic): always set custom_reasoning_budget when thinking is enabled" This reverts commit 2ee549d997bedf1b4d77f1f70639d3767cb59d77. --- src/proxy_app/main.py | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 5f33a7e..277aa17 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1708,16 +1708,15 @@ async def anthropic_messages( openai_request["tool_choice"] = openai_tool_choice # Handle Anthropic thinking config -> reasoning_effort translation - thinking_budget_requested = None if body.thinking: if body.thinking.type == "enabled": # Map budget_tokens to reasoning_effort level - # Always set custom_reasoning_budget=True when client explicitly requests thinking - # This prevents the ÷4 reduction in Antigravity provider + # Default to "medium" if enabled but budget not specified budget = body.thinking.budget_tokens or 10000 - thinking_budget_requested = budget - openai_request["custom_reasoning_budget"] = True - if budget >= 10000: + if budget >= 32000: + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + elif budget >= 10000: openai_request["reasoning_effort"] = "high" elif budget >= 5000: openai_request["reasoning_effort"] = "medium" @@ -1725,21 +1724,12 @@ async def anthropic_messages( openai_request["reasoning_effort"] = "low" elif body.thinking.type == "disabled": openai_request["reasoning_effort"] = "disable" - thinking_budget_requested = 0 elif "opus" in body.model.lower(): # Force high thinking for Opus models when no thinking config is provided # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget + # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high openai_request["reasoning_effort"] = "high" openai_request["custom_reasoning_budget"] = True - thinking_budget_requested = "auto (high)" - - # Log thinking config for debugging - if thinking_budget_requested is not None: - logging.info( - f"🧠 Thinking: requested={thinking_budget_requested}, " - f"effort={openai_request.get('reasoning_effort', 'none')}, " - f"custom_budget={openai_request.get('custom_reasoning_budget', False)}" - ) log_request_to_console( url=str(request.url), From b19526cd76593ac0db1ae96f12a0018ee7490abc Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Sat, 20 Dec 2025 22:16:11 +0100 Subject: [PATCH 14/16] refactor: Move Anthropic translation layer to rotator_library - Create rotator_library/anthropic_compat module with models, translator, and streaming - Add anthropic_messages() and anthropic_count_tokens() methods to RotatingClient - Simplify main.py endpoints to use library methods - Remove ~762 lines of duplicate code from main.py - Fix: Use UUID instead of time.time() for tool/message IDs (avoids collisions) - Fix: Remove unused accumulated_text/accumulated_thinking variables - Fix: Map top_k parameter from Anthropic to OpenAI format --- src/proxy_app/main.py | 796 +----------------- src/rotator_library/__init__.py | 8 +- .../anthropic_compat/__init__.py | 67 ++ .../anthropic_compat/models.py | 144 ++++ .../anthropic_compat/streaming.py | 308 +++++++ .../anthropic_compat/translator.py | 363 ++++++++ src/rotator_library/client.py | 130 +++ 7 files changed, 1036 insertions(+), 780 deletions(-) create mode 100644 src/rotator_library/anthropic_compat/__init__.py create mode 100644 src/rotator_library/anthropic_compat/models.py create mode 100644 src/rotator_library/anthropic_compat/streaming.py create mode 100644 src/rotator_library/anthropic_compat/translator.py diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 277aa17..16a64bd 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -215,136 +215,11 @@ class EnrichedModelList(BaseModel): data: List[EnrichedModelCard] -# --- Anthropic API Models --- -class AnthropicTextBlock(BaseModel): - """Anthropic text content block.""" - - type: str = "text" - text: str - - -class AnthropicImageSource(BaseModel): - """Anthropic image source for base64 images.""" - - type: str = "base64" - media_type: str - data: str - - -class AnthropicImageBlock(BaseModel): - """Anthropic image content block.""" - - type: str = "image" - source: AnthropicImageSource - - -class AnthropicToolUseBlock(BaseModel): - """Anthropic tool use content block.""" - - type: str = "tool_use" - id: str - name: str - input: dict - - -class AnthropicToolResultBlock(BaseModel): - """Anthropic tool result content block.""" - - type: str = "tool_result" - tool_use_id: str - content: Union[str, List[Any]] - is_error: Optional[bool] = None - - -class AnthropicMessage(BaseModel): - """Anthropic message format.""" - - role: str - content: Union[ - str, - List[ - Union[ - AnthropicTextBlock, - AnthropicImageBlock, - AnthropicToolUseBlock, - AnthropicToolResultBlock, - dict, - ] - ], - ] - - -class AnthropicTool(BaseModel): - """Anthropic tool definition.""" - - name: str - description: Optional[str] = None - input_schema: dict - - -class AnthropicThinkingConfig(BaseModel): - """Anthropic thinking configuration.""" - - type: str # "enabled" or "disabled" - budget_tokens: Optional[int] = None - - -class AnthropicMessagesRequest(BaseModel): - """Anthropic Messages API request format.""" - - model: str - messages: List[AnthropicMessage] - max_tokens: int - system: Optional[Union[str, List[dict]]] = None - temperature: Optional[float] = None - top_p: Optional[float] = None - top_k: Optional[int] = None - stop_sequences: Optional[List[str]] = None - stream: Optional[bool] = False - tools: Optional[List[AnthropicTool]] = None - tool_choice: Optional[dict] = None - metadata: Optional[dict] = None - thinking: Optional[AnthropicThinkingConfig] = None - - -class AnthropicUsage(BaseModel): - """Anthropic usage statistics.""" - - input_tokens: int - output_tokens: int - cache_creation_input_tokens: Optional[int] = None - cache_read_input_tokens: Optional[int] = None - - -class AnthropicMessagesResponse(BaseModel): - """Anthropic Messages API response format.""" - - id: str - type: str = "message" - role: str = "assistant" - content: List[Union[AnthropicTextBlock, AnthropicToolUseBlock, dict]] - model: str - stop_reason: Optional[str] = None - stop_sequence: Optional[str] = None - usage: AnthropicUsage - - -# --- Anthropic Count Tokens Models --- -class AnthropicCountTokensRequest(BaseModel): - """Anthropic count_tokens API request format.""" - - model: str - messages: List[AnthropicMessage] - system: Optional[Union[str, List[dict]]] = None - tools: Optional[List[AnthropicTool]] = None - tool_choice: Optional[dict] = None - thinking: Optional[AnthropicThinkingConfig] = None - - -class AnthropicCountTokensResponse(BaseModel): - """Anthropic count_tokens API response format.""" - - input_tokens: int +# --- Anthropic API Models (imported from library) --- +from rotator_library.anthropic_compat import ( + AnthropicMessagesRequest, + AnthropicCountTokensRequest, +) # Calculate total loading time @@ -819,538 +694,6 @@ async def verify_anthropic_api_key( raise HTTPException(status_code=401, detail="Invalid or missing API Key") -# --- Anthropic <-> OpenAI Format Translation --- -def anthropic_to_openai_messages( - anthropic_messages: List[dict], system: Optional[Union[str, List[dict]]] = None -) -> List[dict]: - """ - Convert Anthropic message format to OpenAI format. - - Key differences: - - Anthropic: system is a separate field, content can be string or list of blocks - - OpenAI: system is a message with role="system", content is usually string - """ - openai_messages = [] - - # Handle system message - if system: - if isinstance(system, str): - openai_messages.append({"role": "system", "content": system}) - elif isinstance(system, list): - # System can be list of text blocks in Anthropic format - system_text = " ".join( - block.get("text", "") - for block in system - if isinstance(block, dict) and block.get("type") == "text" - ) - if system_text: - openai_messages.append({"role": "system", "content": system_text}) - - for msg in anthropic_messages: - role = msg.get("role", "user") - content = msg.get("content", "") - - if isinstance(content, str): - openai_messages.append({"role": role, "content": content}) - elif isinstance(content, list): - # Handle content blocks - openai_content = [] - tool_calls = [] - - for block in content: - if isinstance(block, dict): - block_type = block.get("type", "text") - - if block_type == "text": - openai_content.append( - {"type": "text", "text": block.get("text", "")} - ) - elif block_type == "image": - # Convert Anthropic image format to OpenAI - source = block.get("source", {}) - if source.get("type") == "base64": - openai_content.append( - { - "type": "image_url", - "image_url": { - "url": f"data:{source.get('media_type', 'image/png')};base64,{source.get('data', '')}" - }, - } - ) - elif source.get("type") == "url": - openai_content.append( - { - "type": "image_url", - "image_url": {"url": source.get("url", "")}, - } - ) - elif block_type == "tool_use": - # Anthropic tool_use -> OpenAI tool_calls - tool_calls.append( - { - "id": block.get("id", ""), - "type": "function", - "function": { - "name": block.get("name", ""), - "arguments": json.dumps(block.get("input", {})), - }, - } - ) - elif block_type == "tool_result": - # Tool results become separate messages in OpenAI format - tool_content = block.get("content", "") - if isinstance(tool_content, list): - tool_content = " ".join( - b.get("text", "") - for b in tool_content - if isinstance(b, dict) and b.get("type") == "text" - ) - openai_messages.append( - { - "role": "tool", - "tool_call_id": block.get("tool_use_id", ""), - "content": str(tool_content), - } - ) - continue # Don't add to current message - - # Build the message - if tool_calls: - # Assistant message with tool calls - msg_dict = {"role": role} - if openai_content: - # If there's text content alongside tool calls - text_parts = [ - c.get("text", "") - for c in openai_content - if c.get("type") == "text" - ] - msg_dict["content"] = " ".join(text_parts) if text_parts else None - else: - msg_dict["content"] = None - msg_dict["tool_calls"] = tool_calls - openai_messages.append(msg_dict) - elif openai_content: - # Check if it's just text or mixed content - if len(openai_content) == 1 and openai_content[0].get("type") == "text": - openai_messages.append( - {"role": role, "content": openai_content[0].get("text", "")} - ) - else: - openai_messages.append({"role": role, "content": openai_content}) - - return openai_messages - - -def anthropic_to_openai_tools( - anthropic_tools: Optional[List[dict]], -) -> Optional[List[dict]]: - """Convert Anthropic tool definitions to OpenAI format.""" - if not anthropic_tools: - return None - - openai_tools = [] - for tool in anthropic_tools: - openai_tools.append( - { - "type": "function", - "function": { - "name": tool.get("name", ""), - "description": tool.get("description", ""), - "parameters": tool.get("input_schema", {}), - }, - } - ) - return openai_tools - - -def anthropic_to_openai_tool_choice( - anthropic_tool_choice: Optional[dict], -) -> Optional[Union[str, dict]]: - """Convert Anthropic tool_choice to OpenAI format.""" - if not anthropic_tool_choice: - return None - - choice_type = anthropic_tool_choice.get("type", "auto") - - if choice_type == "auto": - return "auto" - elif choice_type == "any": - return "required" - elif choice_type == "tool": - return { - "type": "function", - "function": {"name": anthropic_tool_choice.get("name", "")}, - } - elif choice_type == "none": - return "none" - - return "auto" - - -def openai_to_anthropic_response(openai_response: dict, original_model: str) -> dict: - """ - Convert OpenAI chat completion response to Anthropic Messages format. - """ - choice = openai_response.get("choices", [{}])[0] - message = choice.get("message", {}) - usage = openai_response.get("usage", {}) - - # Build content blocks - content_blocks = [] - - # Add thinking content block if reasoning_content is present - reasoning_content = message.get("reasoning_content") - if reasoning_content: - content_blocks.append( - { - "type": "thinking", - "thinking": reasoning_content, - "signature": "", # Signature is typically empty for proxied responses - } - ) - - # Add text content if present - text_content = message.get("content") - if text_content: - content_blocks.append({"type": "text", "text": text_content}) - - # Add tool use blocks if present - tool_calls = message.get("tool_calls") or [] - for tc in tool_calls: - func = tc.get("function", {}) - try: - input_data = json.loads(func.get("arguments", "{}")) - except json.JSONDecodeError: - input_data = {} - - content_blocks.append( - { - "type": "tool_use", - "id": tc.get("id", f"toolu_{int(time.time())}"), - "name": func.get("name", ""), - "input": input_data, - } - ) - - # Map finish_reason to stop_reason - finish_reason = choice.get("finish_reason", "end_turn") - stop_reason_map = { - "stop": "end_turn", - "length": "max_tokens", - "tool_calls": "tool_use", - "content_filter": "end_turn", - "function_call": "tool_use", - } - stop_reason = stop_reason_map.get(finish_reason, "end_turn") - - # Build usage - anthropic_usage = { - "input_tokens": usage.get("prompt_tokens", 0), - "output_tokens": usage.get("completion_tokens", 0), - } - - # Add cache tokens if present - if usage.get("prompt_tokens_details"): - details = usage["prompt_tokens_details"] - if details.get("cached_tokens"): - anthropic_usage["cache_read_input_tokens"] = details["cached_tokens"] - - return { - "id": openai_response.get("id", f"msg_{int(time.time())}"), - "type": "message", - "role": "assistant", - "content": content_blocks, - "model": original_model, - "stop_reason": stop_reason, - "stop_sequence": None, - "usage": anthropic_usage, - } - - -async def anthropic_streaming_wrapper( - request: Request, - openai_stream: AsyncGenerator[str, None], - original_model: str, - request_id: str, -) -> AsyncGenerator[str, None]: - """ - Convert OpenAI streaming format to Anthropic streaming format. - - Anthropic SSE events: - - message_start: Initial message metadata - - content_block_start: Start of a content block - - content_block_delta: Content chunk - - content_block_stop: End of a content block - - message_delta: Final message metadata (stop_reason, usage) - - message_stop: End of message - """ - message_started = False - content_block_started = False - thinking_block_started = False - current_block_index = 0 - accumulated_text = "" - accumulated_thinking = "" - tool_calls_by_index = {} # Track tool calls by their index - tool_block_indices = {} # Track which block index each tool call uses - input_tokens = 0 - output_tokens = 0 - - try: - async for chunk_str in openai_stream: - if await request.is_disconnected(): - break - - if not chunk_str.strip() or not chunk_str.startswith("data:"): - continue - - data_content = chunk_str[len("data:") :].strip() - if data_content == "[DONE]": - # CRITICAL: Send message_start if we haven't yet (e.g., empty response) - # Claude Code and other clients require message_start before message_stop - if not message_started: - message_start = { - "type": "message_start", - "message": { - "id": request_id, - "type": "message", - "role": "assistant", - "content": [], - "model": original_model, - "stop_reason": None, - "stop_sequence": None, - "usage": {"input_tokens": input_tokens, "output_tokens": 0}, - }, - } - yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" - message_started = True - - # Close any open thinking block - if thinking_block_started: - yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' - current_block_index += 1 - thinking_block_started = False - - # Close any open text block - if content_block_started: - yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' - current_block_index += 1 - content_block_started = False - - # Close all open tool_use blocks - for tc_index in sorted(tool_block_indices.keys()): - block_idx = tool_block_indices[tc_index] - yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {block_idx}}}\n\n' - - # Determine stop_reason based on whether we had tool calls - stop_reason = "tool_use" if tool_calls_by_index else "end_turn" - - # Send message_delta with final info - yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "{stop_reason}", "stop_sequence": null}}, "usage": {{"output_tokens": {output_tokens}}}}}\n\n' - - # Send message_stop - yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' - break - - try: - chunk = json.loads(data_content) - except json.JSONDecodeError: - continue - - # Extract usage if present - if "usage" in chunk and chunk["usage"]: - input_tokens = chunk["usage"].get("prompt_tokens", input_tokens) - output_tokens = chunk["usage"].get("completion_tokens", output_tokens) - - # Send message_start on first chunk - if not message_started: - message_start = { - "type": "message_start", - "message": { - "id": request_id, - "type": "message", - "role": "assistant", - "content": [], - "model": original_model, - "stop_reason": None, - "stop_sequence": None, - "usage": {"input_tokens": input_tokens, "output_tokens": 0}, - }, - } - yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" - message_started = True - - choices = chunk.get("choices", []) - if not choices: - continue - - delta = choices[0].get("delta", {}) - finish_reason = choices[0].get("finish_reason") - - # Handle reasoning/thinking content (from OpenAI-style reasoning_content) - reasoning_content = delta.get("reasoning_content") - if reasoning_content: - if not thinking_block_started: - # Start a thinking content block - block_start = { - "type": "content_block_start", - "index": current_block_index, - "content_block": {"type": "thinking", "thinking": ""}, - } - yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" - thinking_block_started = True - - # Send thinking delta - block_delta = { - "type": "content_block_delta", - "index": current_block_index, - "delta": {"type": "thinking_delta", "thinking": reasoning_content}, - } - yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" - accumulated_thinking += reasoning_content - - # Handle text content - content = delta.get("content") - if content: - # If we were in a thinking block, close it first - if thinking_block_started and not content_block_started: - yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' - current_block_index += 1 - thinking_block_started = False - - if not content_block_started: - # Start a text content block - block_start = { - "type": "content_block_start", - "index": current_block_index, - "content_block": {"type": "text", "text": ""}, - } - yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" - content_block_started = True - - # Send content delta - block_delta = { - "type": "content_block_delta", - "index": current_block_index, - "delta": {"type": "text_delta", "text": content}, - } - yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" - accumulated_text += content - - # Handle tool calls - tool_calls = delta.get("tool_calls", []) - for tc in tool_calls: - tc_index = tc.get("index", 0) - - if tc_index not in tool_calls_by_index: - # Close previous thinking block if open - if thinking_block_started: - yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' - current_block_index += 1 - thinking_block_started = False - - # Close previous text block if open - if content_block_started: - yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' - current_block_index += 1 - content_block_started = False - - # Start new tool use block - tool_calls_by_index[tc_index] = { - "id": tc.get("id", f"toolu_{tc_index}"), - "name": tc.get("function", {}).get("name", ""), - "arguments": "", - } - # Track which block index this tool call uses - tool_block_indices[tc_index] = current_block_index - - block_start = { - "type": "content_block_start", - "index": current_block_index, - "content_block": { - "type": "tool_use", - "id": tool_calls_by_index[tc_index]["id"], - "name": tool_calls_by_index[tc_index]["name"], - "input": {}, - }, - } - yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" - # Increment for the next block - current_block_index += 1 - - # Accumulate arguments - func = tc.get("function", {}) - if func.get("name"): - tool_calls_by_index[tc_index]["name"] = func["name"] - if func.get("arguments"): - tool_calls_by_index[tc_index]["arguments"] += func["arguments"] - - # Send partial JSON delta using the correct block index for this tool - block_delta = { - "type": "content_block_delta", - "index": tool_block_indices[tc_index], - "delta": { - "type": "input_json_delta", - "partial_json": func["arguments"], - }, - } - yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" - - # Note: We intentionally ignore finish_reason here. - # Block closing is handled when we receive [DONE] to avoid - # premature closes with providers that send finish_reason on each chunk. - - except Exception as e: - logging.error(f"Error in Anthropic streaming wrapper: {e}") - - # If we haven't sent message_start yet, send it now so the client can display the error - # Claude Code and other clients may ignore events that come before message_start - if not message_started: - message_start = { - "type": "message_start", - "message": { - "id": request_id, - "type": "message", - "role": "assistant", - "content": [], - "model": original_model, - "stop_reason": None, - "stop_sequence": None, - "usage": {"input_tokens": 0, "output_tokens": 0}, - }, - } - yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" - - # Send the error as a text content block so it's visible to the user - error_message = f"Error: {str(e)}" - error_block_start = { - "type": "content_block_start", - "index": current_block_index, - "content_block": {"type": "text", "text": ""}, - } - yield f"event: content_block_start\ndata: {json.dumps(error_block_start)}\n\n" - - error_block_delta = { - "type": "content_block_delta", - "index": current_block_index, - "delta": {"type": "text_delta", "text": error_message}, - } - yield f"event: content_block_delta\ndata: {json.dumps(error_block_delta)}\n\n" - - yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' - - # Send message_delta and message_stop to properly close the stream - yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "end_turn", "stop_sequence": null}}, "usage": {{"output_tokens": 0}}}}\n\n' - yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' - - # Also send the formal error event for clients that handle it - error_event = { - "type": "error", - "error": {"type": "api_error", "message": str(e)}, - } - yield f"event: error\ndata: {json.dumps(error_event)}\n\n" - - async def streaming_response_wrapper( request: Request, request_data: dict, @@ -1669,68 +1012,11 @@ async def anthropic_messages( This endpoint is compatible with Claude Code and other Anthropic API clients. """ - request_id = f"msg_{uuid.uuid4().hex[:24]}" - original_model = body.model - # Initialize logger if enabled logger = DetailedLogger() if ENABLE_REQUEST_LOGGING else None try: - # Convert Anthropic request to OpenAI format - anthropic_request = body.model_dump(exclude_none=True) - - openai_messages = anthropic_to_openai_messages( - anthropic_request.get("messages", []), anthropic_request.get("system") - ) - - openai_tools = anthropic_to_openai_tools(anthropic_request.get("tools")) - openai_tool_choice = anthropic_to_openai_tool_choice( - anthropic_request.get("tool_choice") - ) - - # Build OpenAI-compatible request - openai_request = { - "model": body.model, - "messages": openai_messages, - "max_tokens": body.max_tokens, - "stream": body.stream or False, - } - - if body.temperature is not None: - openai_request["temperature"] = body.temperature - if body.top_p is not None: - openai_request["top_p"] = body.top_p - if body.stop_sequences: - openai_request["stop"] = body.stop_sequences - if openai_tools: - openai_request["tools"] = openai_tools - if openai_tool_choice: - openai_request["tool_choice"] = openai_tool_choice - - # Handle Anthropic thinking config -> reasoning_effort translation - if body.thinking: - if body.thinking.type == "enabled": - # Map budget_tokens to reasoning_effort level - # Default to "medium" if enabled but budget not specified - budget = body.thinking.budget_tokens or 10000 - if budget >= 32000: - openai_request["reasoning_effort"] = "high" - openai_request["custom_reasoning_budget"] = True - elif budget >= 10000: - openai_request["reasoning_effort"] = "high" - elif budget >= 5000: - openai_request["reasoning_effort"] = "medium" - else: - openai_request["reasoning_effort"] = "low" - elif body.thinking.type == "disabled": - openai_request["reasoning_effort"] = "disable" - elif "opus" in body.model.lower(): - # Force high thinking for Opus models when no thinking config is provided - # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget - # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high - openai_request["reasoning_effort"] = "high" - openai_request["custom_reasoning_budget"] = True - + # Log the request to console log_request_to_console( url=str(request.url), headers=dict(request.headers), @@ -1738,17 +1024,16 @@ async def anthropic_messages( request.client.host if request.client else "unknown", request.client.port if request.client else 0, ), - request_data=openai_request, + request_data=body.model_dump(exclude_none=True), ) - if body.stream: - # Streaming response - acompletion returns a generator for streaming - response_generator = client.acompletion(request=request, **openai_request) + # Use the library method to handle the request + result = await client.anthropic_messages(body, raw_request=request) + if body.stream: + # Streaming response return StreamingResponse( - anthropic_streaming_wrapper( - request, response_generator, original_model, request_id - ), + result, media_type="text/event-stream", headers={ "Cache-Control": "no-cache", @@ -1758,29 +1043,13 @@ async def anthropic_messages( ) else: # Non-streaming response - response = await client.acompletion(request=request, **openai_request) - - # Convert OpenAI response to Anthropic format - openai_response = ( - response.model_dump() - if hasattr(response, "model_dump") - else dict(response) - ) - anthropic_response = openai_to_anthropic_response( - openai_response, original_model - ) - - # Override the ID with our request ID - anthropic_response["id"] = request_id - if logger: logger.log_final_response( status_code=200, headers=None, - body=anthropic_response, + body=result, ) - - return JSONResponse(content=anthropic_response) + return JSONResponse(content=result) except ( litellm.InvalidRequestError, @@ -1848,40 +1117,9 @@ async def anthropic_count_tokens( Accepts requests in Anthropic's format and returns token count in Anthropic's format. """ try: - # Convert Anthropic request to OpenAI format for token counting - anthropic_request = body.model_dump(exclude_none=True) - - openai_messages = anthropic_to_openai_messages( - anthropic_request.get("messages", []), anthropic_request.get("system") - ) - - # Count tokens for messages - message_tokens = client.token_count( - model=body.model, - messages=openai_messages, - ) - - # Count tokens for tools if present - tool_tokens = 0 - if body.tools: - # Tools add tokens based on their definitions - # Convert to JSON string and count tokens for tool definitions - openai_tools = anthropic_to_openai_tools( - [tool.model_dump() for tool in body.tools] - ) - if openai_tools: - # Serialize tools to count their token contribution - tools_text = json.dumps(openai_tools) - tool_tokens = client.token_count( - model=body.model, - text=tools_text, - ) - - total_tokens = message_tokens + tool_tokens - - return JSONResponse( - content={"input_tokens": total_tokens} - ) + # Use the library method to handle the request + result = await client.anthropic_count_tokens(body) + return JSONResponse(content=result) except ( litellm.InvalidRequestError, diff --git a/src/rotator_library/__init__.py b/src/rotator_library/__init__.py index 7944443..b05e470 100644 --- a/src/rotator_library/__init__.py +++ b/src/rotator_library/__init__.py @@ -8,6 +8,7 @@ from .providers import PROVIDER_PLUGINS from .providers.provider_interface import ProviderInterface from .model_info_service import ModelInfoService, ModelInfo, ModelMetadata + from . import anthropic_compat __all__ = [ "RotatingClient", @@ -15,11 +16,12 @@ "ModelInfoService", "ModelInfo", "ModelMetadata", + "anthropic_compat", ] def __getattr__(name): - """Lazy-load PROVIDER_PLUGINS and ModelInfoService to speed up module import.""" + """Lazy-load PROVIDER_PLUGINS, ModelInfoService, and anthropic_compat to speed up module import.""" if name == "PROVIDER_PLUGINS": from .providers import PROVIDER_PLUGINS @@ -36,4 +38,8 @@ def __getattr__(name): from .model_info_service import ModelMetadata return ModelMetadata + if name == "anthropic_compat": + from . import anthropic_compat + + return anthropic_compat raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/rotator_library/anthropic_compat/__init__.py b/src/rotator_library/anthropic_compat/__init__.py new file mode 100644 index 0000000..8572ac7 --- /dev/null +++ b/src/rotator_library/anthropic_compat/__init__.py @@ -0,0 +1,67 @@ +""" +Anthropic API compatibility module for rotator_library. + +This module provides format translation between Anthropic's Messages API +and OpenAI's Chat Completions API, enabling any OpenAI-compatible provider +to work with Anthropic clients like Claude Code. + +Usage: + from rotator_library.anthropic_compat import ( + AnthropicMessagesRequest, + AnthropicMessagesResponse, + translate_anthropic_request, + openai_to_anthropic_response, + anthropic_streaming_wrapper, + ) +""" + +from .models import ( + AnthropicTextBlock, + AnthropicImageSource, + AnthropicImageBlock, + AnthropicToolUseBlock, + AnthropicToolResultBlock, + AnthropicMessage, + AnthropicTool, + AnthropicThinkingConfig, + AnthropicMessagesRequest, + AnthropicUsage, + AnthropicMessagesResponse, + AnthropicCountTokensRequest, + AnthropicCountTokensResponse, +) + +from .translator import ( + anthropic_to_openai_messages, + anthropic_to_openai_tools, + anthropic_to_openai_tool_choice, + openai_to_anthropic_response, + translate_anthropic_request, +) + +from .streaming import anthropic_streaming_wrapper + +__all__ = [ + # Models + "AnthropicTextBlock", + "AnthropicImageSource", + "AnthropicImageBlock", + "AnthropicToolUseBlock", + "AnthropicToolResultBlock", + "AnthropicMessage", + "AnthropicTool", + "AnthropicThinkingConfig", + "AnthropicMessagesRequest", + "AnthropicUsage", + "AnthropicMessagesResponse", + "AnthropicCountTokensRequest", + "AnthropicCountTokensResponse", + # Translator functions + "anthropic_to_openai_messages", + "anthropic_to_openai_tools", + "anthropic_to_openai_tool_choice", + "openai_to_anthropic_response", + "translate_anthropic_request", + # Streaming + "anthropic_streaming_wrapper", +] diff --git a/src/rotator_library/anthropic_compat/models.py b/src/rotator_library/anthropic_compat/models.py new file mode 100644 index 0000000..c579f2e --- /dev/null +++ b/src/rotator_library/anthropic_compat/models.py @@ -0,0 +1,144 @@ +""" +Pydantic models for the Anthropic Messages API. + +These models define the request and response formats for Anthropic's Messages API, +enabling compatibility with Claude Code and other Anthropic API clients. +""" + +from typing import Any, List, Optional, Union +from pydantic import BaseModel + + +# --- Content Blocks --- +class AnthropicTextBlock(BaseModel): + """Anthropic text content block.""" + + type: str = "text" + text: str + + +class AnthropicImageSource(BaseModel): + """Anthropic image source for base64 images.""" + + type: str = "base64" + media_type: str + data: str + + +class AnthropicImageBlock(BaseModel): + """Anthropic image content block.""" + + type: str = "image" + source: AnthropicImageSource + + +class AnthropicToolUseBlock(BaseModel): + """Anthropic tool use content block.""" + + type: str = "tool_use" + id: str + name: str + input: dict + + +class AnthropicToolResultBlock(BaseModel): + """Anthropic tool result content block.""" + + type: str = "tool_result" + tool_use_id: str + content: Union[str, List[Any]] + is_error: Optional[bool] = None + + +# --- Message and Tool Definitions --- +class AnthropicMessage(BaseModel): + """Anthropic message format.""" + + role: str + content: Union[ + str, + List[ + Union[ + AnthropicTextBlock, + AnthropicImageBlock, + AnthropicToolUseBlock, + AnthropicToolResultBlock, + dict, + ] + ], + ] + + +class AnthropicTool(BaseModel): + """Anthropic tool definition.""" + + name: str + description: Optional[str] = None + input_schema: dict + + +class AnthropicThinkingConfig(BaseModel): + """Anthropic thinking configuration.""" + + type: str # "enabled" or "disabled" + budget_tokens: Optional[int] = None + + +# --- Messages Request --- +class AnthropicMessagesRequest(BaseModel): + """Anthropic Messages API request format.""" + + model: str + messages: List[AnthropicMessage] + max_tokens: int + system: Optional[Union[str, List[dict]]] = None + temperature: Optional[float] = None + top_p: Optional[float] = None + top_k: Optional[int] = None + stop_sequences: Optional[List[str]] = None + stream: Optional[bool] = False + tools: Optional[List[AnthropicTool]] = None + tool_choice: Optional[dict] = None + metadata: Optional[dict] = None + thinking: Optional[AnthropicThinkingConfig] = None + + +# --- Messages Response --- +class AnthropicUsage(BaseModel): + """Anthropic usage statistics.""" + + input_tokens: int + output_tokens: int + cache_creation_input_tokens: Optional[int] = None + cache_read_input_tokens: Optional[int] = None + + +class AnthropicMessagesResponse(BaseModel): + """Anthropic Messages API response format.""" + + id: str + type: str = "message" + role: str = "assistant" + content: List[Union[AnthropicTextBlock, AnthropicToolUseBlock, dict]] + model: str + stop_reason: Optional[str] = None + stop_sequence: Optional[str] = None + usage: AnthropicUsage + + +# --- Count Tokens --- +class AnthropicCountTokensRequest(BaseModel): + """Anthropic count_tokens API request format.""" + + model: str + messages: List[AnthropicMessage] + system: Optional[Union[str, List[dict]]] = None + tools: Optional[List[AnthropicTool]] = None + tool_choice: Optional[dict] = None + thinking: Optional[AnthropicThinkingConfig] = None + + +class AnthropicCountTokensResponse(BaseModel): + """Anthropic count_tokens API response format.""" + + input_tokens: int diff --git a/src/rotator_library/anthropic_compat/streaming.py b/src/rotator_library/anthropic_compat/streaming.py new file mode 100644 index 0000000..5ceb714 --- /dev/null +++ b/src/rotator_library/anthropic_compat/streaming.py @@ -0,0 +1,308 @@ +""" +Streaming wrapper for converting OpenAI streaming format to Anthropic streaming format. + +This module provides a framework-agnostic streaming wrapper that converts +OpenAI SSE (Server-Sent Events) format to Anthropic's streaming format. +""" + +import json +import logging +import uuid +from typing import AsyncGenerator, Callable, Optional, Awaitable + +logger = logging.getLogger("rotator_library.anthropic_compat") + + +async def anthropic_streaming_wrapper( + openai_stream: AsyncGenerator[str, None], + original_model: str, + request_id: Optional[str] = None, + is_disconnected: Optional[Callable[[], Awaitable[bool]]] = None, +) -> AsyncGenerator[str, None]: + """ + Convert OpenAI streaming format to Anthropic streaming format. + + This is a framework-agnostic wrapper that can be used with any async web framework. + Instead of taking a FastAPI Request object, it accepts an optional callback function + to check for client disconnection. + + Anthropic SSE events: + - message_start: Initial message metadata + - content_block_start: Start of a content block + - content_block_delta: Content chunk + - content_block_stop: End of a content block + - message_delta: Final message metadata (stop_reason, usage) + - message_stop: End of message + + Args: + openai_stream: AsyncGenerator yielding OpenAI SSE format strings + original_model: The model name to include in responses + request_id: Optional request ID (auto-generated if not provided) + is_disconnected: Optional async callback that returns True if client disconnected + + Yields: + SSE format strings in Anthropic's streaming format + """ + if request_id is None: + request_id = f"msg_{uuid.uuid4().hex[:24]}" + + message_started = False + content_block_started = False + thinking_block_started = False + current_block_index = 0 + tool_calls_by_index = {} # Track tool calls by their index + tool_block_indices = {} # Track which block index each tool call uses + input_tokens = 0 + output_tokens = 0 + + try: + async for chunk_str in openai_stream: + # Check for client disconnection if callback provided + if is_disconnected is not None and await is_disconnected(): + break + + if not chunk_str.strip() or not chunk_str.startswith("data:"): + continue + + data_content = chunk_str[len("data:") :].strip() + if data_content == "[DONE]": + # CRITICAL: Send message_start if we haven't yet (e.g., empty response) + # Claude Code and other clients require message_start before message_stop + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": input_tokens, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + message_started = True + + # Close any open thinking block + if thinking_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + + # Close any open text block + if content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + content_block_started = False + + # Close all open tool_use blocks + for tc_index in sorted(tool_block_indices.keys()): + block_idx = tool_block_indices[tc_index] + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {block_idx}}}\n\n' + + # Determine stop_reason based on whether we had tool calls + stop_reason = "tool_use" if tool_calls_by_index else "end_turn" + + # Send message_delta with final info + yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "{stop_reason}", "stop_sequence": null}}, "usage": {{"output_tokens": {output_tokens}}}}}\n\n' + + # Send message_stop + yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' + break + + try: + chunk = json.loads(data_content) + except json.JSONDecodeError: + continue + + # Extract usage if present + if "usage" in chunk and chunk["usage"]: + input_tokens = chunk["usage"].get("prompt_tokens", input_tokens) + output_tokens = chunk["usage"].get("completion_tokens", output_tokens) + + # Send message_start on first chunk + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": input_tokens, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + message_started = True + + choices = chunk.get("choices", []) + if not choices: + continue + + delta = choices[0].get("delta", {}) + + # Handle reasoning/thinking content (from OpenAI-style reasoning_content) + reasoning_content = delta.get("reasoning_content") + if reasoning_content: + if not thinking_block_started: + # Start a thinking content block + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "thinking", "thinking": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + thinking_block_started = True + + # Send thinking delta + block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "thinking_delta", "thinking": reasoning_content}, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + + # Handle text content + content = delta.get("content") + if content: + # If we were in a thinking block, close it first + if thinking_block_started and not content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + + if not content_block_started: + # Start a text content block + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "text", "text": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + content_block_started = True + + # Send content delta + block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "text_delta", "text": content}, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + + # Handle tool calls + tool_calls = delta.get("tool_calls", []) + for tc in tool_calls: + tc_index = tc.get("index", 0) + + if tc_index not in tool_calls_by_index: + # Close previous thinking block if open + if thinking_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + + # Close previous text block if open + if content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + content_block_started = False + + # Start new tool use block + tool_calls_by_index[tc_index] = { + "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"), + "name": tc.get("function", {}).get("name", ""), + "arguments": "", + } + # Track which block index this tool call uses + tool_block_indices[tc_index] = current_block_index + + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": { + "type": "tool_use", + "id": tool_calls_by_index[tc_index]["id"], + "name": tool_calls_by_index[tc_index]["name"], + "input": {}, + }, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + # Increment for the next block + current_block_index += 1 + + # Accumulate arguments + func = tc.get("function", {}) + if func.get("name"): + tool_calls_by_index[tc_index]["name"] = func["name"] + if func.get("arguments"): + tool_calls_by_index[tc_index]["arguments"] += func["arguments"] + + # Send partial JSON delta using the correct block index for this tool + block_delta = { + "type": "content_block_delta", + "index": tool_block_indices[tc_index], + "delta": { + "type": "input_json_delta", + "partial_json": func["arguments"], + }, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + + # Note: We intentionally ignore finish_reason here. + # Block closing is handled when we receive [DONE] to avoid + # premature closes with providers that send finish_reason on each chunk. + + except Exception as e: + logger.error(f"Error in Anthropic streaming wrapper: {e}") + + # If we haven't sent message_start yet, send it now so the client can display the error + # Claude Code and other clients may ignore events that come before message_start + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": 0, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + + # Send the error as a text content block so it's visible to the user + error_message = f"Error: {str(e)}" + error_block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "text", "text": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(error_block_start)}\n\n" + + error_block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "text_delta", "text": error_message}, + } + yield f"event: content_block_delta\ndata: {json.dumps(error_block_delta)}\n\n" + + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + + # Send message_delta and message_stop to properly close the stream + yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "end_turn", "stop_sequence": null}}, "usage": {{"output_tokens": 0}}}}\n\n' + yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' + + # Also send the formal error event for clients that handle it + error_event = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + yield f"event: error\ndata: {json.dumps(error_event)}\n\n" diff --git a/src/rotator_library/anthropic_compat/translator.py b/src/rotator_library/anthropic_compat/translator.py new file mode 100644 index 0000000..451abfa --- /dev/null +++ b/src/rotator_library/anthropic_compat/translator.py @@ -0,0 +1,363 @@ +""" +Format translation functions between Anthropic and OpenAI API formats. + +This module provides functions to convert requests and responses between +Anthropic's Messages API format and OpenAI's Chat Completions API format. +This enables any OpenAI-compatible provider to work with Anthropic clients. +""" + +import json +import uuid +from typing import Any, Dict, List, Optional, Union + +from .models import AnthropicMessagesRequest + + +def anthropic_to_openai_messages( + anthropic_messages: List[dict], system: Optional[Union[str, List[dict]]] = None +) -> List[dict]: + """ + Convert Anthropic message format to OpenAI format. + + Key differences: + - Anthropic: system is a separate field, content can be string or list of blocks + - OpenAI: system is a message with role="system", content is usually string + + Args: + anthropic_messages: List of messages in Anthropic format + system: Optional system message (string or list of text blocks) + + Returns: + List of messages in OpenAI format + """ + openai_messages = [] + + # Handle system message + if system: + if isinstance(system, str): + openai_messages.append({"role": "system", "content": system}) + elif isinstance(system, list): + # System can be list of text blocks in Anthropic format + system_text = " ".join( + block.get("text", "") + for block in system + if isinstance(block, dict) and block.get("type") == "text" + ) + if system_text: + openai_messages.append({"role": "system", "content": system_text}) + + for msg in anthropic_messages: + role = msg.get("role", "user") + content = msg.get("content", "") + + if isinstance(content, str): + openai_messages.append({"role": role, "content": content}) + elif isinstance(content, list): + # Handle content blocks + openai_content = [] + tool_calls = [] + + for block in content: + if isinstance(block, dict): + block_type = block.get("type", "text") + + if block_type == "text": + openai_content.append( + {"type": "text", "text": block.get("text", "")} + ) + elif block_type == "image": + # Convert Anthropic image format to OpenAI + source = block.get("source", {}) + if source.get("type") == "base64": + openai_content.append( + { + "type": "image_url", + "image_url": { + "url": f"data:{source.get('media_type', 'image/png')};base64,{source.get('data', '')}" + }, + } + ) + elif source.get("type") == "url": + openai_content.append( + { + "type": "image_url", + "image_url": {"url": source.get("url", "")}, + } + ) + elif block_type == "tool_use": + # Anthropic tool_use -> OpenAI tool_calls + tool_calls.append( + { + "id": block.get("id", ""), + "type": "function", + "function": { + "name": block.get("name", ""), + "arguments": json.dumps(block.get("input", {})), + }, + } + ) + elif block_type == "tool_result": + # Tool results become separate messages in OpenAI format + tool_content = block.get("content", "") + if isinstance(tool_content, list): + tool_content = " ".join( + b.get("text", "") + for b in tool_content + if isinstance(b, dict) and b.get("type") == "text" + ) + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": str(tool_content), + } + ) + continue # Don't add to current message + + # Build the message + if tool_calls: + # Assistant message with tool calls + msg_dict = {"role": role} + if openai_content: + # If there's text content alongside tool calls + text_parts = [ + c.get("text", "") + for c in openai_content + if c.get("type") == "text" + ] + msg_dict["content"] = " ".join(text_parts) if text_parts else None + else: + msg_dict["content"] = None + msg_dict["tool_calls"] = tool_calls + openai_messages.append(msg_dict) + elif openai_content: + # Check if it's just text or mixed content + if len(openai_content) == 1 and openai_content[0].get("type") == "text": + openai_messages.append( + {"role": role, "content": openai_content[0].get("text", "")} + ) + else: + openai_messages.append({"role": role, "content": openai_content}) + + return openai_messages + + +def anthropic_to_openai_tools( + anthropic_tools: Optional[List[dict]], +) -> Optional[List[dict]]: + """ + Convert Anthropic tool definitions to OpenAI format. + + Args: + anthropic_tools: List of tools in Anthropic format + + Returns: + List of tools in OpenAI format, or None if no tools provided + """ + if not anthropic_tools: + return None + + openai_tools = [] + for tool in anthropic_tools: + openai_tools.append( + { + "type": "function", + "function": { + "name": tool.get("name", ""), + "description": tool.get("description", ""), + "parameters": tool.get("input_schema", {}), + }, + } + ) + return openai_tools + + +def anthropic_to_openai_tool_choice( + anthropic_tool_choice: Optional[dict], +) -> Optional[Union[str, dict]]: + """ + Convert Anthropic tool_choice to OpenAI format. + + Args: + anthropic_tool_choice: Tool choice in Anthropic format + + Returns: + Tool choice in OpenAI format + """ + if not anthropic_tool_choice: + return None + + choice_type = anthropic_tool_choice.get("type", "auto") + + if choice_type == "auto": + return "auto" + elif choice_type == "any": + return "required" + elif choice_type == "tool": + return { + "type": "function", + "function": {"name": anthropic_tool_choice.get("name", "")}, + } + elif choice_type == "none": + return "none" + + return "auto" + + +def openai_to_anthropic_response(openai_response: dict, original_model: str) -> dict: + """ + Convert OpenAI chat completion response to Anthropic Messages format. + + Args: + openai_response: Response from OpenAI-compatible API + original_model: The model name requested by the client + + Returns: + Response in Anthropic Messages format + """ + choice = openai_response.get("choices", [{}])[0] + message = choice.get("message", {}) + usage = openai_response.get("usage", {}) + + # Build content blocks + content_blocks = [] + + # Add thinking content block if reasoning_content is present + reasoning_content = message.get("reasoning_content") + if reasoning_content: + content_blocks.append( + { + "type": "thinking", + "thinking": reasoning_content, + "signature": "", # Signature is typically empty for proxied responses + } + ) + + # Add text content if present + text_content = message.get("content") + if text_content: + content_blocks.append({"type": "text", "text": text_content}) + + # Add tool use blocks if present + tool_calls = message.get("tool_calls") or [] + for tc in tool_calls: + func = tc.get("function", {}) + try: + input_data = json.loads(func.get("arguments", "{}")) + except json.JSONDecodeError: + input_data = {} + + content_blocks.append( + { + "type": "tool_use", + "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"), + "name": func.get("name", ""), + "input": input_data, + } + ) + + # Map finish_reason to stop_reason + finish_reason = choice.get("finish_reason", "end_turn") + stop_reason_map = { + "stop": "end_turn", + "length": "max_tokens", + "tool_calls": "tool_use", + "content_filter": "end_turn", + "function_call": "tool_use", + } + stop_reason = stop_reason_map.get(finish_reason, "end_turn") + + # Build usage + anthropic_usage = { + "input_tokens": usage.get("prompt_tokens", 0), + "output_tokens": usage.get("completion_tokens", 0), + } + + # Add cache tokens if present + if usage.get("prompt_tokens_details"): + details = usage["prompt_tokens_details"] + if details.get("cached_tokens"): + anthropic_usage["cache_read_input_tokens"] = details["cached_tokens"] + + return { + "id": openai_response.get("id", f"msg_{uuid.uuid4().hex[:24]}"), + "type": "message", + "role": "assistant", + "content": content_blocks, + "model": original_model, + "stop_reason": stop_reason, + "stop_sequence": None, + "usage": anthropic_usage, + } + + +def translate_anthropic_request(request: AnthropicMessagesRequest) -> Dict[str, Any]: + """ + Translate a complete Anthropic Messages API request to OpenAI format. + + This is a high-level function that handles all aspects of request translation, + including messages, tools, tool_choice, and thinking configuration. + + Args: + request: An AnthropicMessagesRequest object + + Returns: + Dictionary containing the OpenAI-compatible request parameters + """ + anthropic_request = request.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + openai_tools = anthropic_to_openai_tools(anthropic_request.get("tools")) + openai_tool_choice = anthropic_to_openai_tool_choice( + anthropic_request.get("tool_choice") + ) + + # Build OpenAI-compatible request + openai_request = { + "model": request.model, + "messages": openai_messages, + "max_tokens": request.max_tokens, + "stream": request.stream or False, + } + + if request.temperature is not None: + openai_request["temperature"] = request.temperature + if request.top_p is not None: + openai_request["top_p"] = request.top_p + if request.top_k is not None: + openai_request["top_k"] = request.top_k + if request.stop_sequences: + openai_request["stop"] = request.stop_sequences + if openai_tools: + openai_request["tools"] = openai_tools + if openai_tool_choice: + openai_request["tool_choice"] = openai_tool_choice + + # Handle Anthropic thinking config -> reasoning_effort translation + if request.thinking: + if request.thinking.type == "enabled": + # Map budget_tokens to reasoning_effort level + # Default to "medium" if enabled but budget not specified + budget = request.thinking.budget_tokens or 10000 + if budget >= 32000: + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + elif budget >= 10000: + openai_request["reasoning_effort"] = "high" + elif budget >= 5000: + openai_request["reasoning_effort"] = "medium" + else: + openai_request["reasoning_effort"] = "low" + elif request.thinking.type == "disabled": + openai_request["reasoning_effort"] = "disable" + elif "opus" in request.model.lower(): + # Force high thinking for Opus models when no thinking config is provided + # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget + # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + + return openai_request diff --git a/src/rotator_library/client.py b/src/rotator_library/client.py index 49d6179..f06313a 100644 --- a/src/rotator_library/client.py +++ b/src/rotator_library/client.py @@ -3017,3 +3017,133 @@ async def force_refresh_quota( result["duration_ms"] = int((time.time() - start_time) * 1000) return result + + # --- Anthropic API Compatibility Methods --- + + async def anthropic_messages( + self, + request: "AnthropicMessagesRequest", + raw_request: Optional[Any] = None, + pre_request_callback: Optional[callable] = None, + ) -> Any: + """ + Handle Anthropic Messages API requests. + + This method accepts requests in Anthropic's format, translates them to + OpenAI format internally, processes them through the existing acompletion + method, and returns responses in Anthropic's format. + + Args: + request: An AnthropicMessagesRequest object + raw_request: Optional raw request object for disconnect checks + pre_request_callback: Optional async callback before each API request + + Returns: + For non-streaming: dict in Anthropic Messages format + For streaming: AsyncGenerator yielding Anthropic SSE format strings + """ + from .anthropic_compat import ( + translate_anthropic_request, + openai_to_anthropic_response, + anthropic_streaming_wrapper, + ) + import uuid + + request_id = f"msg_{uuid.uuid4().hex[:24]}" + original_model = request.model + + # Translate Anthropic request to OpenAI format + openai_request = translate_anthropic_request(request) + + if request.stream: + # Streaming response + response_generator = self.acompletion( + request=raw_request, + pre_request_callback=pre_request_callback, + **openai_request, + ) + + # Create disconnect checker if raw_request provided + is_disconnected = None + if raw_request is not None and hasattr(raw_request, "is_disconnected"): + is_disconnected = raw_request.is_disconnected + + # Return the streaming wrapper + return anthropic_streaming_wrapper( + openai_stream=response_generator, + original_model=original_model, + request_id=request_id, + is_disconnected=is_disconnected, + ) + else: + # Non-streaming response + response = await self.acompletion( + request=raw_request, + pre_request_callback=pre_request_callback, + **openai_request, + ) + + # Convert OpenAI response to Anthropic format + openai_response = ( + response.model_dump() if hasattr(response, "model_dump") else dict(response) + ) + anthropic_response = openai_to_anthropic_response(openai_response, original_model) + + # Override the ID with our request ID + anthropic_response["id"] = request_id + + return anthropic_response + + async def anthropic_count_tokens( + self, + request: "AnthropicCountTokensRequest", + ) -> dict: + """ + Handle Anthropic count_tokens API requests. + + Counts the number of tokens that would be used by a Messages API request. + This is useful for estimating costs and managing context windows. + + Args: + request: An AnthropicCountTokensRequest object + + Returns: + Dict with input_tokens count in Anthropic format + """ + from .anthropic_compat import ( + anthropic_to_openai_messages, + anthropic_to_openai_tools, + ) + import json + + anthropic_request = request.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + # Count tokens for messages + message_tokens = self.token_count( + model=request.model, + messages=openai_messages, + ) + + # Count tokens for tools if present + tool_tokens = 0 + if request.tools: + # Tools add tokens based on their definitions + # Convert to JSON string and count tokens for tool definitions + openai_tools = anthropic_to_openai_tools( + [tool.model_dump() for tool in request.tools] + ) + if openai_tools: + # Serialize tools to count their token contribution + tools_text = json.dumps(openai_tools) + tool_tokens = self.token_count( + model=request.model, + text=tools_text, + ) + + total_tokens = message_tokens + tool_tokens + + return {"input_tokens": total_tokens} From d91f98bcb57dcd16ab523ae0461a3158d17e4796 Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Sat, 20 Dec 2025 22:18:56 +0100 Subject: [PATCH 15/16] fix(anthropic): improve model detection and document thinking budget - Add comment explaining metadata parameter is intentionally not mapped (OpenAI doesn't have an equivalent field) - Use safer regex pattern matching for Opus model detection (avoids false positives like "magnum-opus-model") - Document reasoning budget thresholds and // 4 reduction behavior - Conserve thinking tokens for Opus auto-detection (use // 4 like other models) Only set custom_reasoning_budget=True when user explicitly requests 32000+ tokens --- .../anthropic_compat/translator.py | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/src/rotator_library/anthropic_compat/translator.py b/src/rotator_library/anthropic_compat/translator.py index 451abfa..54f077d 100644 --- a/src/rotator_library/anthropic_compat/translator.py +++ b/src/rotator_library/anthropic_compat/translator.py @@ -336,28 +336,73 @@ def translate_anthropic_request(request: AnthropicMessagesRequest) -> Dict[str, if openai_tool_choice: openai_request["tool_choice"] = openai_tool_choice + # Note: request.metadata is intentionally not mapped. + # OpenAI's API doesn't have an equivalent field for client-side metadata. + # The metadata is typically used by Anthropic clients for tracking purposes + # and doesn't affect the model's behavior. + # Handle Anthropic thinking config -> reasoning_effort translation + # The provider (antigravity_provider.py) applies a // 4 reduction to thinking budget + # unless custom_reasoning_budget is True. This conserves thinking tokens. + # + # Reasoning budget thresholds map to provider budgets: + # - Claude "high" = 32768 tokens (but // 4 = 8192 unless custom_reasoning_budget) + # - Claude "medium" = 16384 tokens (// 4 = 4096) + # - Claude "low" = 8192 tokens (// 4 = 2048) + # + # We only set custom_reasoning_budget=True when user explicitly requests + # a large budget (32000+), indicating they want full thinking capacity. if request.thinking: if request.thinking.type == "enabled": - # Map budget_tokens to reasoning_effort level - # Default to "medium" if enabled but budget not specified budget = request.thinking.budget_tokens or 10000 if budget >= 32000: + # User explicitly wants full thinking capacity openai_request["reasoning_effort"] = "high" openai_request["custom_reasoning_budget"] = True elif budget >= 10000: openai_request["reasoning_effort"] = "high" + # custom_reasoning_budget defaults to False, so // 4 applies elif budget >= 5000: openai_request["reasoning_effort"] = "medium" else: openai_request["reasoning_effort"] = "low" elif request.thinking.type == "disabled": openai_request["reasoning_effort"] = "disable" - elif "opus" in request.model.lower(): - # Force high thinking for Opus models when no thinking config is provided - # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget - # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high + elif _is_opus_model(request.model): + # Enable thinking for Opus models when no thinking config is provided + # Use "high" effort but NOT custom_reasoning_budget, so // 4 applies + # This gives 8192 thinking tokens (32768 // 4) which is reasonable for most tasks + # Users who want full capacity can explicitly set thinking.budget_tokens >= 32000 openai_request["reasoning_effort"] = "high" - openai_request["custom_reasoning_budget"] = True + # Note: NOT setting custom_reasoning_budget here to conserve tokens return openai_request + + +def _is_opus_model(model_name: str) -> bool: + """ + Check if a model name refers to a Claude Opus model. + + Uses specific pattern matching to avoid false positives with model names + that might contain "opus" as part of another word. + + Args: + model_name: The model name to check + + Returns: + True if the model is a Claude Opus model, False otherwise + """ + import re + + model_lower = model_name.lower() + # Match Claude Opus models specifically: + # - "claude-opus-4-5", "claude-4-opus", "claude_opus" + # - "opus-4", "opus-4.5", "opus4" (standalone with version) + # - "antigravity/claude-opus-4-5" + # Avoid matching things like "magnum-opus" or other non-Claude models + opus_patterns = [ + r'claude[-_]?opus', # "claude-opus", "claude_opus", "claudeopus" + r'opus[-_]?\d', # "opus-4", "opus_4", "opus4" (with version number) + r'\d[-_]?opus(?:[-_]|$)', # "4-opus", "4_opus" at word boundary + ] + return any(re.search(pattern, model_lower) for pattern in opus_patterns) From 16c889f367669321b74161d578411f99b5da5aaf Mon Sep 17 00:00:00 2001 From: Moeeze Hassan Date: Tue, 23 Dec 2025 00:55:16 +0100 Subject: [PATCH 16/16] fix(anthropic): handle images in tool results for Claude Code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tool results with images (e.g., from Read tool) were being dropped during Anthropic→OpenAI translation, and not properly converted to Gemini format. - translator.py: Extract image blocks from tool_result content and convert to OpenAI image_url format - antigravity_provider.py: Handle multimodal tool responses by converting image_url to Gemini inlineData format --- .../anthropic_compat/translator.py | 92 ++++++++++++++++--- .../providers/antigravity_provider.py | 57 +++++++++++- 2 files changed, 134 insertions(+), 15 deletions(-) diff --git a/src/rotator_library/anthropic_compat/translator.py b/src/rotator_library/anthropic_compat/translator.py index 54f077d..70fa1cf 100644 --- a/src/rotator_library/anthropic_compat/translator.py +++ b/src/rotator_library/anthropic_compat/translator.py @@ -98,20 +98,88 @@ def anthropic_to_openai_messages( ) elif block_type == "tool_result": # Tool results become separate messages in OpenAI format + # Content can be string, or list of text/image blocks tool_content = block.get("content", "") - if isinstance(tool_content, list): - tool_content = " ".join( - b.get("text", "") - for b in tool_content - if isinstance(b, dict) and b.get("type") == "text" + if isinstance(tool_content, str): + # Simple string content + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": tool_content, + } + ) + elif isinstance(tool_content, list): + # List of content blocks - may include text and images + tool_content_parts = [] + for b in tool_content: + if not isinstance(b, dict): + continue + b_type = b.get("type", "") + if b_type == "text": + tool_content_parts.append( + {"type": "text", "text": b.get("text", "")} + ) + elif b_type == "image": + # Convert Anthropic image format to OpenAI format + source = b.get("source", {}) + if source.get("type") == "base64": + tool_content_parts.append( + { + "type": "image_url", + "image_url": { + "url": f"data:{source.get('media_type', 'image/png')};base64,{source.get('data', '')}" + }, + } + ) + elif source.get("type") == "url": + tool_content_parts.append( + { + "type": "image_url", + "image_url": {"url": source.get("url", "")}, + } + ) + + # If we only have text parts, join them as a string for compatibility + # Otherwise use the array format for multimodal content + if all(p.get("type") == "text" for p in tool_content_parts): + combined_text = " ".join( + p.get("text", "") for p in tool_content_parts + ) + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": combined_text, + } + ) + elif tool_content_parts: + # Multimodal content (includes images) + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": tool_content_parts, + } + ) + else: + # Empty content + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": "", + } + ) + else: + # Fallback for unexpected content type + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": str(tool_content) if tool_content else "", + } ) - openai_messages.append( - { - "role": "tool", - "tool_call_id": block.get("tool_use_id", ""), - "content": str(tool_content), - } - ) continue # Don't add to current message # Build the message diff --git a/src/rotator_library/providers/antigravity_provider.py b/src/rotator_library/providers/antigravity_provider.py index 874e910..83b585a 100644 --- a/src/rotator_library/providers/antigravity_provider.py +++ b/src/rotator_library/providers/antigravity_provider.py @@ -2438,7 +2438,12 @@ def _get_cached_thinking( def _transform_tool_message( self, msg: Dict[str, Any], model: str, tool_id_to_name: Dict[str, str] ) -> List[Dict[str, Any]]: - """Transform tool response message.""" + """Transform tool response message. + + Handles both text-only and multimodal (text + images) tool responses. + For multimodal responses, images are converted to inlineData format + and returned as separate parts alongside the functionResponse. + """ tool_id = msg.get("tool_call_id", "") func_name = tool_id_to_name.get(tool_id, "unknown_function") content = msg.get("content", "{}") @@ -2449,14 +2454,60 @@ def _transform_tool_message( f"[ID Mismatch] Tool response has ID '{tool_id}' which was not found in tool_id_to_name map. " f"Available IDs: {list(tool_id_to_name.keys())}" ) - # else: - # lib_logger.debug(f"[ID Mapping] Tool response matched: id={tool_id}, name={func_name}") # Add prefix for Gemini 3 (and rename problematic tools) if self._is_gemini_3(model) and self._enable_gemini3_tool_fix: func_name = GEMINI3_TOOL_RENAMES.get(func_name, func_name) func_name = f"{self._gemini3_tool_prefix}{func_name}" + # Handle multimodal content (array with text and images) + if isinstance(content, list): + text_parts = [] + image_parts = [] + + for item in content: + if not isinstance(item, dict): + continue + item_type = item.get("type", "") + + if item_type == "text": + text_parts.append(item.get("text", "")) + elif item_type == "image_url": + # Convert OpenAI image_url format to Gemini inlineData + image_url = item.get("image_url", {}).get("url", "") + if image_url.startswith("data:"): + try: + # Parse: data:image/png;base64,iVBORw0KG... + header, data = image_url.split(",", 1) + mime_type = header.split(":")[1].split(";")[0] + image_parts.append({ + "inlineData": { + "mimeType": mime_type, + "data": data, + } + }) + except Exception as e: + lib_logger.warning(f"Failed to parse image data URL in tool response: {e}") + + # Build the result parts + parts = [] + + # Add function response with text content + text_result = " ".join(text_parts) if text_parts else "" + parts.append({ + "functionResponse": { + "name": func_name, + "response": {"result": text_result if text_result else "Image content provided"}, + "id": tool_id, + } + }) + + # Add image parts separately (Gemini handles these as additional parts) + parts.extend(image_parts) + + return parts + + # Handle string content (text-only) try: parsed_content = json.loads(content) except (json.JSONDecodeError, TypeError):