diff --git a/.gitignore b/.gitignore index 4e6507d..2e08828 100644 --- a/.gitignore +++ b/.gitignore @@ -63,7 +63,7 @@ config/setting.toml config/setting_warp.toml config/setting_warp_example.toml -# 测试文件 -test_* +# 根目录临时测试文件 +/test_* tests/__pycache__/ *.har diff --git a/README.md b/README.md index b91ea0c..177f8b9 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,14 @@ python main.py - **用户名**: `admin` - **密码**: `admin` +## 📈 监控接口 + +- `GET /health`:公开健康检查,返回服务是否存活、活跃 Token 数、即将过期 Token 数、已过期 Token 数、429 禁用数等摘要 +- `GET /metrics`:Prometheus 指标接口 +- `GET /api/tokens`:管理接口,返回 `at_expires`、`at_expired`、`at_expiring_within_1h`、`ban_reason`、`consecutive_error_count` 等 Token 状态 + +Prometheus 可直接抓 `/metrics`。如果部署到 Kubernetes,建议只在集群内抓取,并在 Ingress/Gateway 层单独限制 `/metrics` 的外部访问。 + ### 模型测试页面 访问 **http://localhost:8000/test** 可打开内置的模型测试页面,支持: diff --git a/requirements.txt b/requirements.txt index c420d96..a8e8fe4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ python-multipart==0.0.20 python-dateutil==2.8.2 playwright>=1.40.0 nodriver>=0.48.0 +prometheus-client==0.22.1 diff --git a/src/api/admin.py b/src/api/admin.py index 9b0fd9e..5d22051 100644 --- a/src/api/admin.py +++ b/src/api/admin.py @@ -1,6 +1,7 @@ """Admin API routes""" import asyncio import json +from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException, Header, Request from fastapi.responses import JSONResponse from pydantic import BaseModel @@ -15,6 +16,7 @@ from ..core.auth import AuthManager from ..core.database import Database from ..core.config import config +from ..core.monitoring import build_public_health_snapshot from ..services.token_manager import TokenManager from ..services.proxy_manager import ProxyManager from ..services.concurrency_manager import ConcurrencyManager @@ -639,12 +641,31 @@ async def get_tokens(token: str = Depends(verify_admin_token)): """Get all tokens with statistics""" token_rows = await db.get_all_tokens_with_stats() to_iso = lambda value: value.isoformat() if hasattr(value, "isoformat") else value + now = datetime.now(timezone.utc) + + def normalize_dt(value): + if not value: + return None + if isinstance(value, str): + try: + value = datetime.fromisoformat(value.replace("Z", "+00:00")) + except Exception: + return None + if getattr(value, "tzinfo", None) is None: + return value.replace(tzinfo=timezone.utc) + return value.astimezone(timezone.utc) return [{ "id": row.get("id"), "st": row.get("st"), # Session Token for editing "at": row.get("at"), # Access Token for editing (从ST转换而来) "at_expires": to_iso(row.get("at_expires")) if row.get("at_expires") else None, # 🆕 AT过期时间 + "at_expired": bool(normalize_dt(row.get("at_expires")) and normalize_dt(row.get("at_expires")) <= now), + "at_expiring_within_1h": bool( + normalize_dt(row.get("at_expires")) + and normalize_dt(row.get("at_expires")) > now + and (normalize_dt(row.get("at_expires")) - now).total_seconds() < 3600 + ), "token": row.get("at"), # 兼容前端 token.token 的访问方式 "email": row.get("email"), "name": row.get("name"), @@ -664,7 +685,12 @@ async def get_tokens(token: str = Depends(verify_admin_token)): "video_concurrency": row.get("video_concurrency"), "image_count": row.get("image_count", 0), "video_count": row.get("video_count", 0), - "error_count": row.get("error_count", 0) + "error_count": row.get("error_count", 0), + "today_error_count": row.get("today_error_count", 0), + "consecutive_error_count": row.get("consecutive_error_count", 0), + "last_error_at": to_iso(row.get("last_error_at")) if row.get("last_error_at") else None, + "ban_reason": row.get("ban_reason"), + "banned_at": to_iso(row.get("banned_at")) if row.get("banned_at") else None, } for row in token_rows] # 直接返回数组,兼容前端 @@ -1232,11 +1258,9 @@ async def logout(token: str = Depends(verify_admin_token)): async def health_check(): """Public health check endpoint - no auth required""" try: - stats = await db.get_dashboard_stats() - has_active_tokens = stats.get("active_tokens", 0) > 0 + return await build_public_health_snapshot(db) except Exception: return {"backend_running": True, "has_active_tokens": False} - return {"backend_running": True, "has_active_tokens": has_active_tokens} @router.get("/api/stats") diff --git a/src/core/database.py b/src/core/database.py index 71dc8c1..02a1e1d 100644 --- a/src/core/database.py +++ b/src/core/database.py @@ -3,7 +3,7 @@ import aiosqlite import json from contextlib import asynccontextmanager -from datetime import datetime +from datetime import date, datetime from typing import Optional, List, Dict, Any from pathlib import Path from .models import Token, TokenStats, Task, RequestLog, AdminConfig, ProxyConfig, GenerationConfig, CacheConfig, Project, CaptchaConfig, PluginConfig, CallLogicConfig @@ -32,6 +32,10 @@ async def _configure_connection(self, db): await db.execute(f"PRAGMA busy_timeout = {self._busy_timeout_ms}") await db.execute("PRAGMA foreign_keys = ON") + def _current_stats_date(self) -> str: + """Return the logical date used by daily token statistics.""" + return date.today().isoformat() + @asynccontextmanager async def _connect(self, *, write: bool = False): """Open a configured SQLite connection and optionally serialize writes.""" @@ -901,16 +905,22 @@ async def get_all_tokens_with_stats(self) -> List[Dict[str, Any]]: """Get all tokens with merged statistics in one query""" async with self._connect() as db: db.row_factory = aiosqlite.Row + today = self._current_stats_date() cursor = await db.execute(""" SELECT t.*, COALESCE(ts.image_count, 0) AS image_count, COALESCE(ts.video_count, 0) AS video_count, - COALESCE(ts.error_count, 0) AS error_count + COALESCE(ts.error_count, 0) AS error_count, + COALESCE(CASE WHEN ts.today_date = ? THEN ts.today_image_count ELSE 0 END, 0) AS today_image_count, + COALESCE(CASE WHEN ts.today_date = ? THEN ts.today_video_count ELSE 0 END, 0) AS today_video_count, + COALESCE(CASE WHEN ts.today_date = ? THEN ts.today_error_count ELSE 0 END, 0) AS today_error_count, + COALESCE(ts.consecutive_error_count, 0) AS consecutive_error_count, + ts.last_error_at AS last_error_at FROM tokens t LEFT JOIN token_stats ts ON ts.token_id = t.id ORDER BY t.created_at DESC - """) + """, (today, today, today)) rows = await cursor.fetchall() return [dict(row) for row in rows] @@ -918,6 +928,7 @@ async def get_dashboard_stats(self) -> Dict[str, int]: """Get dashboard counters with aggregated SQL queries""" async with self._connect() as db: db.row_factory = aiosqlite.Row + today = self._current_stats_date() token_cursor = await db.execute(""" SELECT @@ -932,11 +943,11 @@ async def get_dashboard_stats(self) -> Dict[str, int]: COALESCE(SUM(image_count), 0) AS total_images, COALESCE(SUM(video_count), 0) AS total_videos, COALESCE(SUM(error_count), 0) AS total_errors, - COALESCE(SUM(today_image_count), 0) AS today_images, - COALESCE(SUM(today_video_count), 0) AS today_videos, - COALESCE(SUM(today_error_count), 0) AS today_errors + COALESCE(SUM(CASE WHEN today_date = ? THEN today_image_count ELSE 0 END), 0) AS today_images, + COALESCE(SUM(CASE WHEN today_date = ? THEN today_video_count ELSE 0 END), 0) AS today_videos, + COALESCE(SUM(CASE WHEN today_date = ? THEN today_error_count ELSE 0 END), 0) AS today_errors FROM token_stats - """) + """, (today, today, today)) stats_row = await stats_cursor.fetchone() token_data = dict(token_row) if token_row else {} @@ -987,9 +998,8 @@ async def update_token(self, token_id: int, **kwargs): params = [] for key, value in kwargs.items(): - if value is not None: - updates.append(f"{key} = ?") - params.append(value) + updates.append(f"{key} = ?") + params.append(value) if updates: params.append(token_id) @@ -1114,19 +1124,20 @@ async def get_token_stats(self, token_id: int) -> Optional[TokenStats]: async def increment_image_count(self, token_id: int): """Increment image generation count with daily reset""" - from datetime import date async with self._connect(write=True) as db: - today = str(date.today()) + today = self._current_stats_date() # Get current stats cursor = await db.execute("SELECT today_date FROM token_stats WHERE token_id = ?", (token_id,)) row = await cursor.fetchone() - # If date changed, reset today's count + # If date changed, reset all daily counters before recording today's image usage. if row and row[0] != today: await db.execute(""" UPDATE token_stats SET image_count = image_count + 1, today_image_count = 1, + today_video_count = 0, + today_error_count = 0, today_date = ? WHERE token_id = ? """, (today, token_id)) @@ -1143,19 +1154,20 @@ async def increment_image_count(self, token_id: int): async def increment_video_count(self, token_id: int): """Increment video generation count with daily reset""" - from datetime import date async with self._connect(write=True) as db: - today = str(date.today()) + today = self._current_stats_date() # Get current stats cursor = await db.execute("SELECT today_date FROM token_stats WHERE token_id = ?", (token_id,)) row = await cursor.fetchone() - # If date changed, reset today's count + # If date changed, reset all daily counters before recording today's video usage. if row and row[0] != today: await db.execute(""" UPDATE token_stats SET video_count = video_count + 1, + today_image_count = 0, today_video_count = 1, + today_error_count = 0, today_date = ? WHERE token_id = ? """, (today, token_id)) @@ -1178,19 +1190,20 @@ async def increment_error_count(self, token_id: int): - consecutive_error_count: Consecutive errors (reset on success/enable) - today_error_count: Today's errors (reset on date change) """ - from datetime import date async with self._connect(write=True) as db: - today = str(date.today()) + today = self._current_stats_date() # Get current stats cursor = await db.execute("SELECT today_date FROM token_stats WHERE token_id = ?", (token_id,)) row = await cursor.fetchone() - # If date changed, reset today's error count + # If date changed, reset all daily counters before recording today's error. if row and row[0] != today: await db.execute(""" UPDATE token_stats SET error_count = error_count + 1, consecutive_error_count = consecutive_error_count + 1, + today_image_count = 0, + today_video_count = 0, today_error_count = 1, today_date = ?, last_error_at = CURRENT_TIMESTAMP diff --git a/src/core/monitoring.py b/src/core/monitoring.py new file mode 100644 index 0000000..e2bc432 --- /dev/null +++ b/src/core/monitoring.py @@ -0,0 +1,528 @@ +"""Prometheus monitoring helpers for Flow2API.""" + +from __future__ import annotations + +import asyncio +import json +import time +import urllib.error +import urllib.request +from datetime import datetime, timezone +from typing import Any, Optional + +from prometheus_client import ( + CONTENT_TYPE_LATEST, + CollectorRegistry, + Counter, + Gauge, + Histogram, + generate_latest, +) + +from .config import config + +_PROCESS_START_TIME = time.time() + + +def _to_utc_datetime(value: Any) -> Optional[datetime]: + if value is None: + return None + + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value.astimezone(timezone.utc) + + if isinstance(value, str): + raw = value.strip() + if not raw: + return None + try: + parsed = datetime.fromisoformat(raw.replace("Z", "+00:00")) + except Exception: + return None + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + return None + + +def _to_timestamp(value: Any) -> float: + dt = _to_utc_datetime(value) + if dt is None: + return 0.0 + return float(dt.timestamp()) + + +async def _probe_remote_browser_health(base_url: str, timeout_seconds: float = 3.0) -> tuple[bool, float]: + normalized = (base_url or "").strip().rstrip("/") + if not normalized: + return False, 0.0 + + url = f"{normalized}/api/v1/health" + started_at = time.perf_counter() + + def do_request() -> tuple[int, str]: + request = urllib.request.Request( + url, + headers={"Accept": "application/json"}, + method="GET", + ) + opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + with opener.open(request, timeout=max(0.5, float(timeout_seconds))) as response: + status_code = int(getattr(response, "status", 0) or response.getcode() or 0) + body = response.read() + charset = response.headers.get_content_charset() or "utf-8" + return status_code, body.decode(charset, errors="replace") + + try: + status_code, body_text = await asyncio.to_thread(do_request) + ok = 200 <= status_code < 300 + if ok and body_text: + try: + payload = json.loads(body_text) + except Exception: + payload = None + if isinstance(payload, dict) and payload.get("ok") is False: + ok = False + latency = time.perf_counter() - started_at + return ok, latency + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError): + latency = time.perf_counter() - started_at + return False, latency + except Exception: + latency = time.perf_counter() - started_at + return False, latency + + +MAIN_REGISTRY = CollectorRegistry(auto_describe=True) + +MAIN_UP = Gauge( + "flow2api_up", + "Whether the Flow2API service process is running.", + registry=MAIN_REGISTRY, +) +MAIN_PROCESS_START_TIME = Gauge( + "flow2api_process_start_time_seconds", + "Flow2API process start time since unix epoch in seconds.", + registry=MAIN_REGISTRY, +) +GENERATION_REQUESTS_TOTAL = Counter( + "flow2api_generation_requests_total", + "Logical generation request outcomes handled by Flow2API.", + ["generation_type", "result"], + registry=MAIN_REGISTRY, +) +GENERATION_DURATION_SECONDS = Histogram( + "flow2api_generation_duration_seconds", + "Generation request duration in seconds.", + ["generation_type", "result"], + buckets=(0.1, 0.3, 0.5, 1, 2, 5, 10, 30, 60, 120, 300, 600, 1800), + registry=MAIN_REGISTRY, +) +TOKEN_REFRESH_TOTAL = Counter( + "flow2api_token_refresh_total", + "Token refresh attempts grouped by kind and result.", + ["kind", "result"], + registry=MAIN_REGISTRY, +) +TOKENS_TOTAL = Gauge( + "flow2api_tokens_total", + "Total number of configured tokens.", + registry=MAIN_REGISTRY, +) +TOKENS_ACTIVE = Gauge( + "flow2api_tokens_active", + "Number of active tokens.", + registry=MAIN_REGISTRY, +) +TOKENS_INACTIVE = Gauge( + "flow2api_tokens_inactive", + "Number of inactive tokens.", + registry=MAIN_REGISTRY, +) +TOKENS_MISSING_AT = Gauge( + "flow2api_tokens_missing_at", + "Number of tokens without an access token.", + registry=MAIN_REGISTRY, +) +TOKENS_EXPIRED = Gauge( + "flow2api_tokens_expired", + "Number of tokens whose access token is already expired.", + registry=MAIN_REGISTRY, +) +TOKENS_EXPIRING_SOON = Gauge( + "flow2api_tokens_expiring_within_hour", + "Number of tokens whose access token will expire within the next hour.", + registry=MAIN_REGISTRY, +) +TOKENS_BANNED_429 = Gauge( + "flow2api_tokens_banned_429", + "Number of tokens currently disabled because of 429 rate limit bans.", + registry=MAIN_REGISTRY, +) +TOKENS_CREDITS_TOTAL = Gauge( + "flow2api_token_credits_total", + "Sum of credits across all tokens.", + registry=MAIN_REGISTRY, +) +ACTIVE_TOKENS_CREDITS_TOTAL = Gauge( + "flow2api_active_token_credits_total", + "Sum of credits across active tokens.", + registry=MAIN_REGISTRY, +) +TOKENS_ERROR_TOTAL = Gauge( + "flow2api_token_error_total", + "Sum of historical token errors across all tokens.", + registry=MAIN_REGISTRY, +) +TOKENS_TODAY_ERROR_TOTAL = Gauge( + "flow2api_token_today_error_total", + "Sum of today's token errors across all tokens.", + registry=MAIN_REGISTRY, +) +IMAGE_INFLIGHT_TOTAL = Gauge( + "flow2api_image_inflight_total", + "Total in-flight image requests tracked by the concurrency manager.", + registry=MAIN_REGISTRY, +) +VIDEO_INFLIGHT_TOTAL = Gauge( + "flow2api_video_inflight_total", + "Total in-flight video requests tracked by the concurrency manager.", + registry=MAIN_REGISTRY, +) +DASHBOARD_TOTAL_IMAGES = Gauge( + "flow2api_dashboard_total_images", + "Dashboard total image count from persisted token statistics.", + registry=MAIN_REGISTRY, +) +DASHBOARD_TOTAL_VIDEOS = Gauge( + "flow2api_dashboard_total_videos", + "Dashboard total video count from persisted token statistics.", + registry=MAIN_REGISTRY, +) +DASHBOARD_TOTAL_ERRORS = Gauge( + "flow2api_dashboard_total_errors", + "Dashboard total error count from persisted token statistics.", + registry=MAIN_REGISTRY, +) +DASHBOARD_TODAY_IMAGES = Gauge( + "flow2api_dashboard_today_images", + "Dashboard today image count from persisted token statistics.", + registry=MAIN_REGISTRY, +) +DASHBOARD_TODAY_VIDEOS = Gauge( + "flow2api_dashboard_today_videos", + "Dashboard today video count from persisted token statistics.", + registry=MAIN_REGISTRY, +) +DASHBOARD_TODAY_ERRORS = Gauge( + "flow2api_dashboard_today_errors", + "Dashboard today error count from persisted token statistics.", + registry=MAIN_REGISTRY, +) +REMOTE_BROWSER_CONFIGURED = Gauge( + "flow2api_remote_browser_configured", + "Whether remote_browser mode has a target base URL configured.", + registry=MAIN_REGISTRY, +) +REMOTE_BROWSER_TARGET_UP = Gauge( + "flow2api_remote_browser_target_up", + "Whether the configured remote_browser target responded successfully.", + registry=MAIN_REGISTRY, +) +REMOTE_BROWSER_TARGET_LATENCY_SECONDS = Gauge( + "flow2api_remote_browser_target_latency_seconds", + "Probe latency of the configured remote_browser target in seconds.", + registry=MAIN_REGISTRY, +) +TOKEN_ACTIVE = Gauge( + "flow2api_token_active", + "Whether a token is active.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_AT_EXPIRES_TIMESTAMP = Gauge( + "flow2api_token_at_expires_timestamp_seconds", + "AT expiration time for a token since unix epoch in seconds.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_EXPIRED = Gauge( + "flow2api_token_expired", + "Whether a token access token is expired.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_EXPIRING_SOON = Gauge( + "flow2api_token_expiring_within_hour", + "Whether a token access token will expire within the next hour.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_MISSING_AT = Gauge( + "flow2api_token_missing_at", + "Whether a token is missing an access token.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_BANNED = Gauge( + "flow2api_token_banned", + "Whether a token is banned.", + ["token_id", "reason"], + registry=MAIN_REGISTRY, +) +TOKEN_CREDITS = Gauge( + "flow2api_token_credits", + "Current credits for a token.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_ERROR_TOTAL = Gauge( + "flow2api_token_error_count", + "Historical total error count for a token.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_TODAY_ERROR_TOTAL = Gauge( + "flow2api_token_today_error_count", + "Today's error count for a token.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_CONSECUTIVE_ERROR_TOTAL = Gauge( + "flow2api_token_consecutive_error_count", + "Current consecutive error count for a token.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_LAST_USED_TIMESTAMP = Gauge( + "flow2api_token_last_used_timestamp_seconds", + "Last-used timestamp for a token since unix epoch in seconds.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_LAST_ERROR_TIMESTAMP = Gauge( + "flow2api_token_last_error_timestamp_seconds", + "Last-error timestamp for a token since unix epoch in seconds.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_IMAGE_INFLIGHT = Gauge( + "flow2api_token_image_inflight", + "Current in-flight image requests for a token.", + ["token_id"], + registry=MAIN_REGISTRY, +) +TOKEN_VIDEO_INFLIGHT = Gauge( + "flow2api_token_video_inflight", + "Current in-flight video requests for a token.", + ["token_id"], + registry=MAIN_REGISTRY, +) + +MAIN_UP.set(1.0) +MAIN_PROCESS_START_TIME.set(_PROCESS_START_TIME) + + +def record_generation_result(generation_type: str, result: str, duration_seconds: Optional[float]) -> None: + normalized_type = generation_type if generation_type in {"image", "video"} else "unknown" + normalized_result = result if result in {"success", "failed", "cancelled", "no_token", "invalid"} else "unknown" + GENERATION_REQUESTS_TOTAL.labels( + generation_type=normalized_type, + result=normalized_result, + ).inc() + if duration_seconds is not None and duration_seconds >= 0: + GENERATION_DURATION_SECONDS.labels( + generation_type=normalized_type, + result=normalized_result, + ).observe(float(duration_seconds)) + + +def record_token_refresh(kind: str, result: str) -> None: + normalized_kind = kind if kind in {"at", "st"} else "unknown" + normalized_result = result if result in {"success", "failure"} else "unknown" + TOKEN_REFRESH_TOTAL.labels(kind=normalized_kind, result=normalized_result).inc() + + +async def update_main_runtime_metrics(db: Any, concurrency_manager: Optional[Any] = None) -> None: + rows = await db.get_all_tokens_with_stats() + now = datetime.now(timezone.utc) + + TOKEN_ACTIVE.clear() + TOKEN_AT_EXPIRES_TIMESTAMP.clear() + TOKEN_EXPIRED.clear() + TOKEN_EXPIRING_SOON.clear() + TOKEN_MISSING_AT.clear() + TOKEN_BANNED.clear() + TOKEN_CREDITS.clear() + TOKEN_ERROR_TOTAL.clear() + TOKEN_TODAY_ERROR_TOTAL.clear() + TOKEN_CONSECUTIVE_ERROR_TOTAL.clear() + TOKEN_LAST_USED_TIMESTAMP.clear() + TOKEN_LAST_ERROR_TIMESTAMP.clear() + TOKEN_IMAGE_INFLIGHT.clear() + TOKEN_VIDEO_INFLIGHT.clear() + + total_tokens = len(rows) + active_tokens = 0 + inactive_tokens = 0 + missing_at_tokens = 0 + expired_tokens = 0 + expiring_soon_tokens = 0 + banned_429_tokens = 0 + total_credits = 0 + active_total_credits = 0 + total_errors = 0 + total_today_errors = 0 + total_image_inflight = 0 + total_video_inflight = 0 + + for row in rows: + token_id = str(row.get("id") or "") + if not token_id: + continue + + is_active = bool(row.get("is_active")) + at_value = str(row.get("at") or "").strip() + at_expires = _to_utc_datetime(row.get("at_expires")) + ban_reason = str(row.get("ban_reason") or "").strip() or "none" + credits = int(row.get("credits") or 0) + error_count = int(row.get("error_count") or 0) + today_error_count = int(row.get("today_error_count") or 0) + consecutive_error_count = int(row.get("consecutive_error_count") or 0) + last_used_at = _to_timestamp(row.get("last_used_at")) + last_error_at = _to_timestamp(row.get("last_error_at")) + + expired = False + expiring_soon = False + if at_expires is not None: + expired = at_expires <= now + expiring_soon = not expired and (at_expires - now).total_seconds() < 3600 + + if is_active: + active_tokens += 1 + active_total_credits += credits + else: + inactive_tokens += 1 + + if not at_value: + missing_at_tokens += 1 + if expired: + expired_tokens += 1 + if expiring_soon: + expiring_soon_tokens += 1 + if (not is_active) and ban_reason == "429_rate_limit": + banned_429_tokens += 1 + + total_credits += credits + total_errors += error_count + total_today_errors += today_error_count + + image_inflight = 0 + video_inflight = 0 + if concurrency_manager is not None: + image_inflight = int(await concurrency_manager.get_image_inflight(int(token_id))) + video_inflight = int(await concurrency_manager.get_video_inflight(int(token_id))) + total_image_inflight += image_inflight + total_video_inflight += video_inflight + + TOKEN_ACTIVE.labels(token_id=token_id).set(1.0 if is_active else 0.0) + TOKEN_AT_EXPIRES_TIMESTAMP.labels(token_id=token_id).set(_to_timestamp(at_expires)) + TOKEN_EXPIRED.labels(token_id=token_id).set(1.0 if expired else 0.0) + TOKEN_EXPIRING_SOON.labels(token_id=token_id).set(1.0 if expiring_soon else 0.0) + TOKEN_MISSING_AT.labels(token_id=token_id).set(1.0 if not at_value else 0.0) + TOKEN_BANNED.labels(token_id=token_id, reason=ban_reason).set( + 1.0 if (not is_active and ban_reason != "none") else 0.0 + ) + TOKEN_CREDITS.labels(token_id=token_id).set(float(credits)) + TOKEN_ERROR_TOTAL.labels(token_id=token_id).set(float(error_count)) + TOKEN_TODAY_ERROR_TOTAL.labels(token_id=token_id).set(float(today_error_count)) + TOKEN_CONSECUTIVE_ERROR_TOTAL.labels(token_id=token_id).set(float(consecutive_error_count)) + TOKEN_LAST_USED_TIMESTAMP.labels(token_id=token_id).set(last_used_at) + TOKEN_LAST_ERROR_TIMESTAMP.labels(token_id=token_id).set(last_error_at) + TOKEN_IMAGE_INFLIGHT.labels(token_id=token_id).set(float(image_inflight)) + TOKEN_VIDEO_INFLIGHT.labels(token_id=token_id).set(float(video_inflight)) + + TOKENS_TOTAL.set(float(total_tokens)) + TOKENS_ACTIVE.set(float(active_tokens)) + TOKENS_INACTIVE.set(float(inactive_tokens)) + TOKENS_MISSING_AT.set(float(missing_at_tokens)) + TOKENS_EXPIRED.set(float(expired_tokens)) + TOKENS_EXPIRING_SOON.set(float(expiring_soon_tokens)) + TOKENS_BANNED_429.set(float(banned_429_tokens)) + TOKENS_CREDITS_TOTAL.set(float(total_credits)) + ACTIVE_TOKENS_CREDITS_TOTAL.set(float(active_total_credits)) + TOKENS_ERROR_TOTAL.set(float(total_errors)) + TOKENS_TODAY_ERROR_TOTAL.set(float(total_today_errors)) + IMAGE_INFLIGHT_TOTAL.set(float(total_image_inflight)) + VIDEO_INFLIGHT_TOTAL.set(float(total_video_inflight)) + + dashboard_stats = await db.get_dashboard_stats() + DASHBOARD_TOTAL_IMAGES.set(float(dashboard_stats.get("total_images") or 0)) + DASHBOARD_TOTAL_VIDEOS.set(float(dashboard_stats.get("total_videos") or 0)) + DASHBOARD_TOTAL_ERRORS.set(float(dashboard_stats.get("total_errors") or 0)) + DASHBOARD_TODAY_IMAGES.set(float(dashboard_stats.get("today_images") or 0)) + DASHBOARD_TODAY_VIDEOS.set(float(dashboard_stats.get("today_videos") or 0)) + DASHBOARD_TODAY_ERRORS.set(float(dashboard_stats.get("today_errors") or 0)) + + remote_browser_base_url = (config.remote_browser_base_url or "").strip() + remote_browser_configured = config.captcha_method == "remote_browser" and bool(remote_browser_base_url) + REMOTE_BROWSER_CONFIGURED.set(1.0 if remote_browser_configured else 0.0) + + if remote_browser_configured: + remote_browser_up, remote_browser_latency = await _probe_remote_browser_health(remote_browser_base_url) + REMOTE_BROWSER_TARGET_UP.set(1.0 if remote_browser_up else 0.0) + REMOTE_BROWSER_TARGET_LATENCY_SECONDS.set(float(remote_browser_latency)) + else: + REMOTE_BROWSER_TARGET_UP.set(0.0) + REMOTE_BROWSER_TARGET_LATENCY_SECONDS.set(0.0) + + +async def render_main_metrics(db: Any, concurrency_manager: Optional[Any] = None) -> bytes: + await update_main_runtime_metrics(db, concurrency_manager=concurrency_manager) + return generate_latest(MAIN_REGISTRY) + + +async def build_public_health_snapshot(db: Any) -> dict[str, Any]: + rows = await db.get_all_tokens_with_stats() + now = datetime.now(timezone.utc) + + active_tokens = 0 + missing_at_tokens = 0 + expired_tokens = 0 + expiring_soon_tokens = 0 + banned_429_tokens = 0 + + for row in rows: + if bool(row.get("is_active")): + active_tokens += 1 + if not str(row.get("at") or "").strip(): + missing_at_tokens += 1 + + at_expires = _to_utc_datetime(row.get("at_expires")) + if at_expires is not None: + if at_expires <= now: + expired_tokens += 1 + elif (at_expires - now).total_seconds() < 3600: + expiring_soon_tokens += 1 + + if (not bool(row.get("is_active"))) and str(row.get("ban_reason") or "").strip() == "429_rate_limit": + banned_429_tokens += 1 + + return { + "backend_running": True, + "has_active_tokens": active_tokens > 0, + "total_tokens": len(rows), + "active_tokens": active_tokens, + "tokens_missing_at": missing_at_tokens, + "tokens_expired": expired_tokens, + "tokens_expiring_within_1h": expiring_soon_tokens, + "banned_429_tokens": banned_429_tokens, + "captcha_method": config.captcha_method, + "remote_browser_configured": ( + config.captcha_method == "remote_browser" + and bool((config.remote_browser_base_url or "").strip()) + ), + } diff --git a/src/main.py b/src/main.py index b46ceb9..ef77c3f 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,6 @@ """FastAPI application initialization""" from fastapi import FastAPI -from fastapi.responses import HTMLResponse, FileResponse +from fastapi.responses import HTMLResponse, FileResponse, Response from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager @@ -8,6 +8,7 @@ from .core.config import config from .core.database import Database +from .core.monitoring import CONTENT_TYPE_LATEST, render_main_metrics from .services.flow_client import FlowClient from .services.proxy_manager import ProxyManager from .services.token_manager import TokenManager @@ -236,3 +237,10 @@ async def test_page(): if test_file.exists(): return FileResponse(str(test_file)) return HTMLResponse(content="

Test Page Not Found

", status_code=404) + + +@app.get("/metrics") +async def metrics(): + """Prometheus metrics endpoint for the main Flow2API service.""" + payload = await render_main_metrics(db, concurrency_manager=concurrency_manager) + return Response(content=payload, media_type=CONTENT_TYPE_LATEST) diff --git a/src/services/generation_handler.py b/src/services/generation_handler.py index 509f0b6..0fd1019 100644 --- a/src/services/generation_handler.py +++ b/src/services/generation_handler.py @@ -7,6 +7,7 @@ from typing import Optional, AsyncGenerator, List, Dict, Any from ..core.logger import debug_logger from ..core.config import config +from ..core.monitoring import record_generation_result from ..core.models import Task, RequestLog from ..core.account_tiers import ( PAYGATE_TIER_NOT_PAID, @@ -809,6 +810,7 @@ async def handle_generation( if model not in MODEL_CONFIG: error_msg = f"不支持的模型: {model}" debug_logger.log_error(error_msg) + record_generation_result("unknown", "invalid", time.time() - start_time) yield self._create_error_response(error_msg, status_code=400) return @@ -873,6 +875,7 @@ async def handle_generation( if not error_msg: error_msg = self._get_no_token_error_message(generation_type) debug_logger.log_error(f"[GENERATION] {error_msg}") + record_generation_result(generation_type, "no_token", time.time() - start_time) await self._log_request( token_id=None, operation=request_operation, @@ -917,6 +920,7 @@ async def handle_generation( if not token: error_msg = "Token AT无效或刷新失败" debug_logger.log_error(f"[GENERATION] {error_msg}") + record_generation_result(generation_type, "failed", time.time() - start_time) if stream: yield self._create_stream_chunk(f"❌ {error_msg}\n") yield self._create_error_response(error_msg, status_code=503) @@ -929,6 +933,7 @@ async def handle_generation( required_tier = get_required_paygate_tier_for_model(model) error_msg = "当前模型需要 " + get_paygate_tier_label(required_tier) + " 账号: " + model debug_logger.log_error(f"[GENERATION] {error_msg}") + record_generation_result(generation_type, "failed", time.time() - start_time) if stream: yield self._create_stream_chunk(f"❌ {error_msg}\n") yield self._create_error_response(error_msg, status_code=403) @@ -985,6 +990,7 @@ async def handle_generation( if token: await self.token_manager.record_error(token.id) duration = time.time() - start_time + record_generation_result(generation_type, "failed", duration) perf_trace["status"] = "failed" perf_trace["total_ms"] = int(duration * 1000) perf_trace["error"] = error_msg @@ -1016,6 +1022,7 @@ async def handle_generation( # 7. 记录成功日志 duration = time.time() - start_time + record_generation_result(generation_type, "success", duration) perf_trace["status"] = "success" perf_trace["total_ms"] = int(duration * 1000) # 日志中保留更完整的 prompt,避免管理页只看到过短内容 @@ -1064,6 +1071,7 @@ async def handle_generation( error_msg = "生成已取消: 客户端连接已断开" debug_logger.log_warning(f"[GENERATION] ⚠️ {error_msg}") duration = time.time() - start_time + record_generation_result(generation_type or "unknown", "cancelled", duration) perf_trace["status"] = "failed" perf_trace["total_ms"] = int(duration * 1000) perf_trace["error"] = error_msg @@ -1089,6 +1097,7 @@ async def handle_generation( # 先将最终失败状态落库,再返回错误响应,避免日志停在 102。 duration = time.time() - start_time + record_generation_result(generation_type or "unknown", "failed", duration) perf_trace["status"] = "failed" perf_trace["total_ms"] = int(duration * 1000) perf_trace["error"] = error_msg diff --git a/src/services/token_manager.py b/src/services/token_manager.py index 168df7e..23b0e0f 100644 --- a/src/services/token_manager.py +++ b/src/services/token_manager.py @@ -6,6 +6,7 @@ from ..core.config import config from ..core.models import Token, Project from ..core.logger import debug_logger +from ..core.monitoring import record_token_refresh from .flow_client import FlowClient from .proxy_manager import ProxyManager @@ -188,7 +189,7 @@ async def delete_token(self, token_id: int): async def enable_token(self, token_id: int): """Enable a token and reset error count""" # Enable the token - await self.db.update_token(token_id, is_active=True) + await self.db.update_token(token_id, is_active=True, ban_reason=None, banned_at=None) # Reset error count when enabling (only reset total error_count, keep today_error_count) await self.db.reset_error_count(token_id) @@ -519,20 +520,24 @@ async def _do_refresh_at(self, token_id: int, st: str) -> bool: user_paygate_tier=credits_result.get("userPaygateTier"), ) debug_logger.log_info(f"[AT_REFRESH] Token {token_id}: AT 验证成功(余额: {credits_result.get('credits', 0)})") + record_token_refresh("at", "success") return True except Exception as verify_err: # AT 验证失败(可能返回 401),说明 ST 已过期 error_msg = str(verify_err) if "401" in error_msg or "UNAUTHENTICATED" in error_msg: debug_logger.log_warning(f"[AT_REFRESH] Token {token_id}: AT 验证失败 (401),ST 可能已过期") + record_token_refresh("at", "failure") return False else: # 其他错误(如网络问题),仍视为成功 debug_logger.log_warning(f"[AT_REFRESH] Token {token_id}: AT 验证时发生非认证错误: {error_msg}") + record_token_refresh("at", "success") return True except Exception as e: debug_logger.log_error(f"[AT_REFRESH] Token {token_id}: AT刷新失败 - {str(e)}") + record_token_refresh("at", "failure") return False async def _try_refresh_st(self, token_id: int, token) -> Optional[str]: @@ -574,21 +579,26 @@ async def _try_refresh_st(self, token_id: int, token) -> Optional[str]: debug_logger.log_error( f"[ST_REFRESH] Token {token_id}: 刷新 ST 超时 ({refresh_timeout_seconds:.0f}s)" ) + record_token_refresh("st", "failure") return None if new_st and new_st != token.st: # 更新数据库中的 ST await self.db.update_token(token_id, st=new_st) debug_logger.log_info(f"[ST_REFRESH] Token {token_id}: ST 已自动更新") + record_token_refresh("st", "success") return new_st elif new_st == token.st: debug_logger.log_warning(f"[ST_REFRESH] Token {token_id}: 获取到的 ST 与原 ST 相同,可能登录已失效") + record_token_refresh("st", "failure") return None else: debug_logger.log_warning(f"[ST_REFRESH] Token {token_id}: 无法获取新 ST") + record_token_refresh("st", "failure") return None except Exception as e: debug_logger.log_error(f"[ST_REFRESH] Token {token_id}: 刷新 ST 失败 - {str(e)}") + record_token_refresh("st", "failure") return None async def ensure_project_exists(self, token_id: int) -> str: diff --git a/tests/test_daily_stats_reset.py b/tests/test_daily_stats_reset.py new file mode 100644 index 0000000..db68692 --- /dev/null +++ b/tests/test_daily_stats_reset.py @@ -0,0 +1,85 @@ +import tempfile +import unittest + +from src.core.database import Database +from src.core.models import Token + + +class DailyStatsResetTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self._temp_dir = tempfile.TemporaryDirectory() + self.db = Database(db_path=f"{self._temp_dir.name}/flow.db") + await self.db.init_db() + self.token_id = await self.db.add_token( + Token( + st="st-test", + at="at-test", + email="tester@example.com", + name="tester", + ) + ) + + async def asyncTearDown(self): + self._temp_dir.cleanup() + + async def test_dashboard_stats_ignore_stale_previous_day_counts(self): + async with self.db._connect(write=True) as conn: + await conn.execute( + """ + UPDATE token_stats + SET today_image_count = 9, + today_video_count = 4, + today_error_count = 2, + today_date = '2000-01-01' + WHERE token_id = ? + """, + (self.token_id,), + ) + await conn.commit() + + stats = await self.db.get_dashboard_stats() + token_rows = await self.db.get_all_tokens_with_stats() + + self.assertEqual(stats["today_images"], 0) + self.assertEqual(stats["today_videos"], 0) + self.assertEqual(stats["today_errors"], 0) + self.assertEqual(token_rows[0]["today_image_count"], 0) + self.assertEqual(token_rows[0]["today_video_count"], 0) + self.assertEqual(token_rows[0]["today_error_count"], 0) + + async def test_cross_day_video_increment_resets_other_daily_counters(self): + async with self.db._connect(write=True) as conn: + await conn.execute( + """ + UPDATE token_stats + SET image_count = 12, + video_count = 3, + error_count = 5, + today_image_count = 7, + today_video_count = 2, + today_error_count = 1, + today_date = '2000-01-01' + WHERE token_id = ? + """, + (self.token_id,), + ) + await conn.commit() + + await self.db.increment_video_count(self.token_id) + + stats = await self.db.get_dashboard_stats() + token_rows = await self.db.get_all_tokens_with_stats() + token_row = token_rows[0] + + self.assertEqual(stats["today_images"], 0) + self.assertEqual(stats["today_videos"], 1) + self.assertEqual(stats["today_errors"], 0) + self.assertEqual(token_row["image_count"], 12) + self.assertEqual(token_row["video_count"], 4) + self.assertEqual(token_row["today_image_count"], 0) + self.assertEqual(token_row["today_video_count"], 1) + self.assertEqual(token_row["today_error_count"], 0) + + +if __name__ == "__main__": + unittest.main()