forked from su-kaka/gcli2api
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtemp_stable.py
More file actions
795 lines (654 loc) · 28.3 KB
/
temp_stable.py
File metadata and controls
795 lines (654 loc) · 28.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
from __future__ import annotations
import json
import os
import time
import uuid
from typing import Any, Dict, Optional
import json
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from log import log
from .antigravity_api import (
build_antigravity_request_body,
send_antigravity_request_no_stream,
send_antigravity_request_stream,
)
from .anthropic_converter import convert_anthropic_request_to_antigravity_components
from .anthropic_streaming import antigravity_sse_to_anthropic_sse
from .token_estimator import estimate_input_tokens
# ====================== 导入智能降级管理器 ======================
from .fallback_manager import (
is_quota_exhausted_error,
is_retryable_error,
is_403_error,
is_credential_unavailable_error,
get_cross_pool_fallback,
is_haiku_model,
is_model_supported,
HAIKU_FALLBACK_TARGET,
)
def _get_fallback_models(model_name: str) -> list:
"""
获取模型的降级链(使用新的跨池降级逻辑)
注意:这个函数用于预计算降级目标,不会打印日志。
实际降级时会在错误处理中打印日志。
Args:
model_name: 当前模型名
Returns:
降级模型列表
"""
fallback_list = []
# Haiku 模型特殊处理
if is_haiku_model(model_name):
fallback_list.append(HAIKU_FALLBACK_TARGET)
return fallback_list
# 获取跨池降级目标(使用 debug 级别日志,避免预计算时打印)
cross_pool_fallback = get_cross_pool_fallback(model_name, log_level="debug")
if cross_pool_fallback:
fallback_list.append(cross_pool_fallback)
return fallback_list
def _should_fallback(error_msg: str) -> bool:
"""
判断是否应该触发模型降级
新逻辑:
- 只有额度用尽错误(429 + MODEL_CAPACITY_EXHAUSTED)才触发降级
- 400/普通429/5xx 错误应该重试,不触发降级
- 403 错误需要触发凭证验证
Args:
error_msg: 错误消息
Returns:
True 如果应该降级,False 如果应该重试或失败
"""
# 只有额度用尽才触发降级
return is_quota_exhausted_error(error_msg)
def _is_thinking_format_error(error_msg: str) -> bool:
"""
检测是否是 thinking 格式相关的 400 错误。
这类错误通常包含以下关键词:
- "When thinking is disabled"
- "thinking block"
- "redacted_thinking"
"""
error_lower = str(error_msg).lower()
thinking_keywords = [
"when thinking is disabled",
"thinking block",
"redacted_thinking",
"must start with a thinking block",
"expected `thinking`",
]
return any(kw in error_lower for kw in thinking_keywords)
def _strip_thinking_from_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
"""
从请求 payload 中移除所有 thinking blocks。
用于 400 错误的保底重试。
"""
if not payload:
return payload
messages = payload.get("messages")
if not isinstance(messages, list):
return payload
cleaned_messages = []
for msg in messages:
if not isinstance(msg, dict):
cleaned_messages.append(msg)
continue
role = msg.get("role")
content = msg.get("content")
if role != "assistant" or not isinstance(content, list):
cleaned_messages.append(msg)
continue
# 过滤 thinking blocks
cleaned_content = [
item for item in content
if not (isinstance(item, dict) and item.get("type") in ("thinking", "redacted_thinking"))
]
if not cleaned_content:
cleaned_content = [{"type": "text", "text": "..."}]
cleaned_msg = msg.copy()
cleaned_msg["content"] = cleaned_content
cleaned_messages.append(cleaned_msg)
new_payload = payload.copy()
new_payload["messages"] = cleaned_messages
return new_payload
router = APIRouter()
security = HTTPBearer(auto_error=False)
_DEBUG_TRUE = {"1", "true", "yes", "on"}
_REDACTED = "<REDACTED>"
_SENSITIVE_KEYS = {
"authorization",
"x-api-key",
"api_key",
"apikey",
"access_token",
"refresh_token",
"token",
"password",
"secret",
}
def _remove_nulls_for_tool_input(value: Any) -> Any:
"""
递归移除 dict/list 中值为 null/None 的字段/元素。
背景:Roo/Kilo 在 Anthropic native tool 路径下,若收到 tool_use.input 中包含 null,
可能会把 null 当作真实入参执行(例如“在 null 中搜索”)。因此在返回 tool_use.input 前做兜底清理。
"""
if isinstance(value, dict):
cleaned: Dict[str, Any] = {}
for k, v in value.items():
if v is None:
continue
cleaned[k] = _remove_nulls_for_tool_input(v)
return cleaned
if isinstance(value, list):
cleaned_list = []
for item in value:
if item is None:
continue
cleaned_list.append(_remove_nulls_for_tool_input(item))
return cleaned_list
return value
def _anthropic_debug_max_chars() -> int:
"""
调试日志中单个字符串字段的最大输出长度(避免把 base64 图片/超长 schema 打爆日志)。
"""
raw = str(os.getenv("ANTHROPIC_DEBUG_MAX_CHARS", "")).strip()
if not raw:
return 2000
try:
return max(200, int(raw))
except Exception:
return 2000
def _anthropic_debug_enabled() -> bool:
return str(os.getenv("ANTHROPIC_DEBUG", "")).strip().lower() in _DEBUG_TRUE
def _anthropic_debug_body_enabled() -> bool:
"""
是否打印请求体/下游请求体等“高体积”调试日志。
说明:`ANTHROPIC_DEBUG=1` 仅开启 token 对比等精简日志;为避免刷屏,入参/下游 body 必须显式开启。
"""
return str(os.getenv("ANTHROPIC_DEBUG_BODY", "")).strip().lower() in _DEBUG_TRUE
def _redact_for_log(value: Any, *, key_hint: str | None = None, max_chars: int) -> Any:
"""
递归脱敏/截断用于日志打印的 JSON。
目标:
- 让用户能看到“实际入参结构”(system/messages/tools 等)
- 默认避免泄露凭证/令牌
- 避免把图片 base64 或超长字段直接写入日志文件
"""
if isinstance(value, dict):
redacted: Dict[str, Any] = {}
for k, v in value.items():
k_str = str(k)
k_lower = k_str.strip().lower()
if k_lower in _SENSITIVE_KEYS:
redacted[k_str] = _REDACTED
continue
redacted[k_str] = _redact_for_log(v, key_hint=k_lower, max_chars=max_chars)
return redacted
if isinstance(value, list):
return [_redact_for_log(v, key_hint=key_hint, max_chars=max_chars) for v in value]
if isinstance(value, str):
if (key_hint or "").lower() == "data" and len(value) > 64:
return f"<base64 len={len(value)}>"
if len(value) > max_chars:
head = value[: max_chars // 2]
tail = value[-max_chars // 2 :]
return f"{head}<...省略 {len(value) - len(head) - len(tail)} 字符...>{tail}"
return value
return value
def _json_dumps_for_log(data: Any) -> str:
try:
return json.dumps(data, ensure_ascii=False, separators=(",", ":"), sort_keys=True)
except Exception:
return str(data)
def _debug_log_request_payload(request: Request, payload: Dict[str, Any]) -> None:
"""
在开启 `ANTHROPIC_DEBUG` 时打印入参(已脱敏/截断)。
"""
if not _anthropic_debug_enabled() or not _anthropic_debug_body_enabled():
return
max_chars = _anthropic_debug_max_chars()
safe_payload = _redact_for_log(payload, max_chars=max_chars)
headers_of_interest = {
"content-type": request.headers.get("content-type"),
"content-length": request.headers.get("content-length"),
"anthropic-version": request.headers.get("anthropic-version"),
"user-agent": request.headers.get("user-agent"),
}
safe_headers = _redact_for_log(headers_of_interest, max_chars=max_chars)
log.info(f"[ANTHROPIC][DEBUG] headers={_json_dumps_for_log(safe_headers)}")
log.info(f"[ANTHROPIC][DEBUG] payload={_json_dumps_for_log(safe_payload)}")
def _debug_log_downstream_request_body(request_body: Dict[str, Any]) -> None:
"""
在开启 `ANTHROPIC_DEBUG` 时打印最终转发到下游的请求体(已截断)。
"""
if not _anthropic_debug_enabled() or not _anthropic_debug_body_enabled():
return
max_chars = _anthropic_debug_max_chars()
safe_body = _redact_for_log(request_body, max_chars=max_chars)
log.info(f"[ANTHROPIC][DEBUG] downstream_request_body={_json_dumps_for_log(safe_body)}")
def _anthropic_error(
*,
status_code: int,
message: str,
error_type: str = "api_error",
) -> JSONResponse:
return JSONResponse(
status_code=status_code,
content={"type": "error", "error": {"type": error_type, "message": message}},
)
def _extract_api_token(
request: Request, credentials: Optional[HTTPAuthorizationCredentials]
) -> Optional[str]:
"""
Anthropic 生态客户端通常使用 `x-api-key`;现有项目其它路由使用 `Authorization: Bearer`。
这里同时兼容两种方式,便于“无感接入”。
"""
if credentials and credentials.credentials:
return credentials.credentials
authorization = request.headers.get("authorization")
if authorization and authorization.lower().startswith("bearer "):
return authorization.split(" ", 1)[1].strip()
x_api_key = request.headers.get("x-api-key")
if x_api_key:
return x_api_key.strip()
return None
def _infer_project_and_session(credential_data: Dict[str, Any]) -> tuple[str, str]:
project_id = credential_data.get("project_id")
session_id = f"session-{uuid.uuid4().hex}"
return str(project_id), str(session_id)
def _pick_usage_metadata_from_antigravity_response(response_data: Dict[str, Any]) -> Dict[str, Any]:
"""
兼容下游 usageMetadata 的多种落点:
- response.usageMetadata
- response.candidates[0].usageMetadata
如两者同时存在,优先选择“字段更完整”的一侧。
"""
response = response_data.get("response", {}) or {}
if not isinstance(response, dict):
return {}
response_usage = response.get("usageMetadata", {}) or {}
if not isinstance(response_usage, dict):
response_usage = {}
candidate = (response.get("candidates", []) or [{}])[0] or {}
if not isinstance(candidate, dict):
candidate = {}
candidate_usage = candidate.get("usageMetadata", {}) or {}
if not isinstance(candidate_usage, dict):
candidate_usage = {}
fields = ("promptTokenCount", "candidatesTokenCount", "totalTokenCount")
def score(d: Dict[str, Any]) -> int:
s = 0
for f in fields:
if f in d and d.get(f) is not None:
s += 1
return s
if score(candidate_usage) > score(response_usage):
return candidate_usage
return response_usage
def _convert_antigravity_response_to_anthropic_message(
response_data: Dict[str, Any],
*,
model: str,
message_id: str,
fallback_input_tokens: int = 0,
) -> Dict[str, Any]:
candidate = response_data.get("response", {}).get("candidates", [{}])[0] or {}
parts = candidate.get("content", {}).get("parts", []) or []
usage_metadata = _pick_usage_metadata_from_antigravity_response(response_data)
content = []
has_tool_use = False
for part in parts:
if not isinstance(part, dict):
continue
if part.get("thought") is True:
block: Dict[str, Any] = {"type": "thinking", "thinking": part.get("text", "")}
signature = part.get("thoughtSignature")
if signature:
block["signature"] = signature
content.append(block)
continue
if "text" in part:
content.append({"type": "text", "text": part.get("text", "")})
continue
if "functionCall" in part:
has_tool_use = True
fc = part.get("functionCall", {}) or {}
content.append(
{
"type": "tool_use",
"id": fc.get("id") or f"toolu_{uuid.uuid4().hex}",
"name": fc.get("name") or "",
"input": _remove_nulls_for_tool_input(fc.get("args", {}) or {}),
}
)
continue
if "inlineData" in part:
inline = part.get("inlineData", {}) or {}
content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": inline.get("mimeType", "image/png"),
"data": inline.get("data", ""),
},
}
)
continue
finish_reason = candidate.get("finishReason")
stop_reason = "tool_use" if has_tool_use else "end_turn"
if finish_reason == "MAX_TOKENS" and not has_tool_use:
stop_reason = "max_tokens"
input_tokens_present = isinstance(usage_metadata, dict) and "promptTokenCount" in usage_metadata
output_tokens_present = isinstance(usage_metadata, dict) and "candidatesTokenCount" in usage_metadata
input_tokens = usage_metadata.get("promptTokenCount", 0) if isinstance(usage_metadata, dict) else 0
output_tokens = usage_metadata.get("candidatesTokenCount", 0) if isinstance(usage_metadata, dict) else 0
if not input_tokens_present:
input_tokens = max(0, int(fallback_input_tokens or 0))
if not output_tokens_present:
output_tokens = 0
return {
"id": message_id,
"type": "message",
"role": "assistant",
"model": model,
"content": content,
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": {
"input_tokens": int(input_tokens or 0),
"output_tokens": int(output_tokens or 0),
},
}
@router.post("/antigravity/v1/messages")
async def anthropic_messages(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
):
from config import get_api_password
password = await get_api_password()
token = _extract_api_token(request, credentials)
if token != password:
return _anthropic_error(status_code=403, message="密码错误", error_type="authentication_error")
try:
payload = await request.json()
except Exception as e:
return _anthropic_error(
status_code=400, message=f"JSON 解析失败: {str(e)}", error_type="invalid_request_error"
)
if not isinstance(payload, dict):
return _anthropic_error(
status_code=400, message="请求体必须为 JSON object", error_type="invalid_request_error"
)
_debug_log_request_payload(request, payload)
model = payload.get("model")
max_tokens = payload.get("max_tokens")
messages = payload.get("messages")
stream = bool(payload.get("stream", False))
thinking_present = "thinking" in payload
thinking_value = payload.get("thinking")
thinking_summary = None
if thinking_present:
if isinstance(thinking_value, dict):
thinking_summary = {
"type": thinking_value.get("type"),
"budget_tokens": thinking_value.get("budget_tokens"),
}
else:
thinking_summary = thinking_value
if not model or max_tokens is None or not isinstance(messages, list):
return _anthropic_error(
status_code=400,
message="缺少必填字段:model / max_tokens / messages",
error_type="invalid_request_error",
)
try:
client_host = request.client.host if request.client else "unknown"
client_port = request.client.port if request.client else "unknown"
except Exception:
client_host = "unknown"
client_port = "unknown"
user_agent = request.headers.get("user-agent", "")
log.info(
f"[ANTHROPIC] /messages 收到请求: client={client_host}:{client_port}, model={model}, "
f"stream={stream}, messages={len(messages)}, thinking_present={thinking_present}, "
f"thinking={thinking_summary}, ua={user_agent}"
)
if len(messages) == 1 and messages[0].get("role") == "user" and messages[0].get("content") == "Hi":
return JSONResponse(
content={
"id": f"msg_{uuid.uuid4().hex}",
"type": "message",
"role": "assistant",
"model": str(model),
"content": [{"type": "text", "text": "antigravity Anthropic Messages 正常工作中"}],
"stop_reason": "end_turn",
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0},
}
)
from src.credential_manager import get_credential_manager
cred_mgr = await get_credential_manager()
cred_result = await cred_mgr.get_valid_credential(is_antigravity=True)
if not cred_result:
return _anthropic_error(status_code=500, message="当前无可用 antigravity 凭证")
_, credential_data = cred_result
project_id, session_id = _infer_project_and_session(credential_data)
try:
components = convert_anthropic_request_to_antigravity_components(payload)
except Exception as e:
log.error(f"[ANTHROPIC] 请求转换失败: {e}")
return _anthropic_error(
status_code=400, message="请求转换失败", error_type="invalid_request_error"
)
log.info(f"[ANTHROPIC] /messages 模型映射: upstream={model} -> downstream={components['model']}")
# 下游要求每条 text 内容块必须包含“非空白”文本;上游客户端偶尔会追加空白 text block(例如图片后跟一个空字符串),
# 经过转换过滤后可能导致 contents 为空,此时应在本地直接返回 400,避免把无效请求打到下游。
if not (components.get("contents") or []):
return _anthropic_error(
status_code=400,
message="messages 不能为空;text 内容块必须包含非空白文本",
error_type="invalid_request_error",
)
# 简单估算 token
estimated_tokens = 0
try:
estimated_tokens = estimate_input_tokens(payload)
except Exception as e:
log.debug(f"[ANTHROPIC] token 估算失败: {e}")
request_body = build_antigravity_request_body(
contents=components["contents"],
model=components["model"],
project_id=project_id,
session_id=session_id,
system_instruction=components["system_instruction"],
tools=components["tools"],
generation_config=components["generation_config"],
)
_debug_log_downstream_request_body(request_body)
# 获取降级模型链
fallback_models = _get_fallback_models(str(model))
original_model = str(model)
if stream:
message_id = f"msg_{uuid.uuid4().hex}"
# 流式请求的降级逻辑
current_model = components["model"]
current_payload = payload.copy()
last_error = None
for fallback_idx, fallback_model in enumerate(fallback_models):
try:
# 如果是降级模型,更新请求体
if fallback_idx > 0:
log.info(f"[ANTHROPIC] 流式请求降级: {current_model} -> {fallback_model}")
current_payload["model"] = fallback_model
current_model = fallback_model
# 重新转换请求
try:
components = convert_anthropic_request_to_antigravity_components(current_payload)
except Exception as e:
log.error(f"[ANTHROPIC] 降级请求转换失败: {e}")
continue
request_body = build_antigravity_request_body(
contents=components["contents"],
model=components["model"],
project_id=project_id,
session_id=session_id,
system_instruction=components["system_instruction"],
tools=components["tools"],
generation_config=components["generation_config"],
)
# 使用 thinking retry wrapper
resources, cred_name, _ = await _handle_request_with_thinking_retry(
request_body, current_payload, cred_mgr, stream=True
)
response, stream_ctx, client = resources
async def stream_generator():
try:
# response 现在是 filtered_lines 生成器,直接使用
async for chunk in antigravity_sse_to_anthropic_sse(
response,
model=original_model, # 返回原始请求的模型名
message_id=message_id,
initial_input_tokens=estimated_tokens,
credential_manager=cred_mgr,
credential_name=cred_name,
):
yield chunk
finally:
try:
await stream_ctx.__aexit__(None, None, None)
except Exception as e:
log.debug(f"[ANTHROPIC] 关闭 stream_ctx 失败: {e}")
try:
await client.aclose()
except Exception as e:
log.debug(f"[ANTHROPIC] 关闭 client 失败: {e}")
return StreamingResponse(stream_generator(), media_type="text/event-stream")
except Exception as e:
last_error = e
error_str = str(e)
if _should_fallback(e, fallback_idx, fallback_models):
log.warning(f"[ANTHROPIC] 流式请求失败,尝试降级: {error_str[:200]}")
continue
else:
log.error(f"[ANTHROPIC] 流式请求失败,无法降级: {error_str[:200]}")
break
# 所有降级尝试都失败
log.error(f"[ANTHROPIC] 所有流式请求尝试均失败: {last_error}")
# 返回 503 以触发 Gateway fallback 到 Copilot
log.warning(f"[ANTHROPIC FALLBACK] 所有降级模型均已尝试,返回 503 触发 Gateway fallback 到 Copilot")
return _anthropic_error(status_code=503, message="All Antigravity quota pools exhausted. Gateway should route to Copilot.", error_type="api_error")
# 非流式请求的降级逻辑
request_id = f"msg_{int(time.time() * 1000)}"
current_model = components["model"]
current_payload = payload.copy()
last_error = None
for fallback_idx, fallback_model in enumerate(fallback_models):
try:
# 如果是降级模型,更新请求体
if fallback_idx > 0:
log.info(f"[ANTHROPIC] 非流式请求降级: {current_model} -> {fallback_model}")
current_payload["model"] = fallback_model
current_model = fallback_model
# 重新转换请求
try:
components = convert_anthropic_request_to_antigravity_components(current_payload)
except Exception as e:
log.error(f"[ANTHROPIC] 降级请求转换失败: {e}")
continue
request_body = build_antigravity_request_body(
contents=components["contents"],
model=components["model"],
project_id=project_id,
session_id=session_id,
system_instruction=components["system_instruction"],
tools=components["tools"],
generation_config=components["generation_config"],
)
# 使用 thinking retry wrapper
response_data, _, _ = await _handle_request_with_thinking_retry(
request_body, current_payload, cred_mgr, stream=False
)
anthropic_response = _convert_antigravity_response_to_anthropic_message(
response_data,
model=original_model, # 返回原始请求的模型名
message_id=request_id,
fallback_input_tokens=estimated_tokens,
)
return JSONResponse(content=anthropic_response)
except Exception as e:
last_error = e
error_str = str(e)
if _should_fallback(e, fallback_idx, fallback_models):
log.warning(f"[ANTHROPIC] 非流式请求失败,尝试降级: {error_str[:200]}")
continue
else:
log.error(f"[ANTHROPIC] 非流式请求失败,无法降级: {error_str[:200]}")
break
# 所有降级尝试都失败
log.error(f"[ANTHROPIC] 所有非流式请求尝试均失败: {last_error}")
# 返回 503 以触发 Gateway fallback 到 Copilot
log.warning(f"[ANTHROPIC FALLBACK] 所有降级模型均已尝试,返回 503 触发 Gateway fallback 到 Copilot")
return _anthropic_error(status_code=503, message="All Antigravity quota pools exhausted. Gateway should route to Copilot.", error_type="api_error")
@router.post("/antigravity/v1/messages/count_tokens")
async def anthropic_messages_count_tokens(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
):
"""
Anthropic Messages API 兼容的 token 计数端点(用于 claude-cli 等客户端预检)。
返回结构尽量贴近 Anthropic:`{"input_tokens": <int>}`。
"""
from config import get_api_password
password = await get_api_password()
token = _extract_api_token(request, credentials)
if token != password:
return _anthropic_error(status_code=403, message="密码错误", error_type="authentication_error")
try:
payload = await request.json()
except Exception as e:
return _anthropic_error(
status_code=400, message=f"JSON 解析失败: {str(e)}", error_type="invalid_request_error"
)
if not isinstance(payload, dict):
return _anthropic_error(
status_code=400, message="请求体必须为 JSON object", error_type="invalid_request_error"
)
_debug_log_request_payload(request, payload)
if not payload.get("model") or not isinstance(payload.get("messages"), list):
return _anthropic_error(
status_code=400,
message="缺少必填字段:model / messages",
error_type="invalid_request_error",
)
try:
client_host = request.client.host if request.client else "unknown"
client_port = request.client.port if request.client else "unknown"
except Exception:
client_host = "unknown"
client_port = "unknown"
thinking_present = "thinking" in payload
thinking_value = payload.get("thinking")
thinking_summary = None
if thinking_present:
if isinstance(thinking_value, dict):
thinking_summary = {
"type": thinking_value.get("type"),
"budget_tokens": thinking_value.get("budget_tokens"),
}
else:
thinking_summary = thinking_value
user_agent = request.headers.get("user-agent", "")
log.info(
f"[ANTHROPIC] /messages/count_tokens 收到请求: client={client_host}:{client_port}, "
f"model={payload.get('model')}, messages={len(payload.get('messages') or [])}, "
f"thinking_present={thinking_present}, thinking={thinking_summary}, ua={user_agent}"
)
# 简单估算
input_tokens = 0
try:
input_tokens = estimate_input_tokens(payload)
except Exception as e:
log.error(f"[ANTHROPIC] token 估算失败: {e}")
return JSONResponse(content={"input_tokens": input_tokens})