From c4db50988b55f36f8daa4e401286760b5b6680bf Mon Sep 17 00:00:00 2001 From: wangchaoqun Date: Mon, 4 Aug 2025 17:15:52 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E2=9C=A8=20feat:=20=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=B8=AD=E9=97=B4=E4=BB=B6=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E6=8F=92=E5=85=A5=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + GEMINI.md | 60 +++++++++++++++++++ backend/app/admin/crud/crud_opera_log.py | 11 ++++ .../app/admin/service/opera_log_service.py | 38 +++++++++++- backend/common/queue.py | 38 ++++++++++++ backend/core/registrar.py | 5 ++ backend/middleware/opera_log_middleware.py | 3 +- 7 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 GEMINI.md create mode 100644 backend/common/queue.py diff --git a/.gitignore b/.gitignore index 779ee46e..6d1d1dc4 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ venv/ .python-version .ruff_cache/ .pytest_cache/ +.env +docker-compose-dev.yml \ No newline at end of file diff --git a/GEMINI.md b/GEMINI.md new file mode 100644 index 00000000..eaef7f9e --- /dev/null +++ b/GEMINI.md @@ -0,0 +1,60 @@ +# FastAPI Best Architecture 项目架构分析 + +## 1. 总体概述 + +本项目是一个基于 FastAPI 的企业级后端架构解决方案,名为 **FastAPI Best Architecture (FBA)**。它不仅仅是一个项目骨架,更是一个集成了众多最佳实践和核心功能的半成品应用,旨在提供一个高性能、高扩展性、高安全性的后端基础。 + +其核心设计理念是 **模块化** 和 **插件化**,将复杂的功能解耦到不同的组件和插件中,并通过一个中心化的“注册器”进行组装。 + +## 2. 核心技术栈 + +- **Web 框架**: FastAPI +- **编程语言**: Python 3.10+ +- **数据库 ORM**: SQLAlchemy 2.0 (完全异步) +- **数据库支持**: MySQL, PostgreSQL +- **数据库迁移**: Alembic +- **数据验证/序列化**: Pydantic V2, msgspec +- **异步任务队列**: Celery (支持 Redis / RabbitMQ) +- **实时通信**: Socket.IO +- **代码规范与工具链**: + - **包管理**: uv + - **代码检查/格式化**: Ruff + - **项目构建**: Hatch +- **容器化**: Docker + +## 3. 架构设计 + +项目采用了一种官方称之为“伪三层架构”的设计模式,模仿了 Java Web 开发中的经典分层思想,使得代码职责清晰,易于维护。 + +- **`api` (视图层)**: 负责处理 HTTP 请求,参数校验,并调用 `service` 层。类似于 Controller。 +- **`schema` (数据传输层)**: 定义 Pydantic 模型,用于 API 的请求体、响应体以及不同层之间的数据传输。类似于 DTO (Data Transfer Object)。 +- **`service` (业务逻辑层)**: 封装核心业务逻辑,处理复杂的业务流程,是应用的核心。 +- **`crud` (数据访问层)**: 负责与数据库的直接交互(增删改查),将业务逻辑与数据操作解耦。类似于 DAO (Data Access Object)。 +- **`model` (模型层)**: 定义 SQLAlchemy 的数据库模型。类似于 Entity。 + +## 4. 启动与装配流程 + +应用的启动和组装过程体现了其高度模块化的设计: + +1. **入口 (`backend/main.py`)**: + - 非常简洁,主要负责 **动态加载插件**。 + - 在启动时,它会自动扫描 `plugin` 目录,并为每个插件安装其声明在 `requirements.txt` 中的依赖。 + - 完成插件准备后,调用核心注册器 `register_app()` 来创建和配置 FastAPI 应用。 + +2. **注册中心 (`backend/core/registrar.py`)**: + - 这是整个应用的“装配车间”。 + - `register_app()` 函数是核心,它按顺序执行一系列注册操作,将所有组件组装到 FastAPI 实例中。 + - **生命周期管理 (`register_init`)**: 通过 FastAPI 的 `lifespan` 事件,在应用启动时自动创建数据库表、初始化 API 限流器;在应用关闭时,优雅地释放 Redis 等连接资源。 + - **中间件注册 (`register_middleware`)**: 按精心设计的顺序加载中间件,实现了请求追踪、访问日志、CORS、JWT 认证和操作日志等功能。 + - **路由注册 (`register_router`)**: 通过 `build_final_router()` 动态地从核心应用和所有插件中收集 API 路由,并统一加载。 + - **其他组件**: 依次注册日志系统、Socket.IO、静态文件服务、分页功能和全局异常处理器。 + +## 5. 核心功能特性 + +- **完备的 RBAC**: 内置了基于角色的访问控制系统,包含用户、角色、菜单、权限等功能。 +- **强大的配置系统 (`backend/core/conf.py`)**: 使用 `pydantic-settings` 管理配置,支持多环境 (`dev`/`pro`),类型安全,配置项全面。 +- **插件化扩展**: 可以通过开发新的插件来轻松扩展系统功能,而无需修改核心代码。 +- **全栈异步**: 从 Web 请求、中间件到数据库操作,均采用异步(async/await)实现,性能卓越。 +- **分布式日志追踪**: 通过 `asgi-correlation-id` 为每个请求生成唯一的 `Trace ID`,并贯穿整个日志系统,极大地方便了微服务环境下的问题排查。 +- **操作日志**: 自动记录用户的操作行为,并对密码等敏感信息进行加密或脱敏处理。 +- **开发者体验**: 提供了 CLI 工具、美观的启动日志、自动化的依赖管理和详细的 API 文档,提升了开发效率。 diff --git a/backend/app/admin/crud/crud_opera_log.py b/backend/app/admin/crud/crud_opera_log.py index 93aaa050..98fccc5e 100644 --- a/backend/app/admin/crud/crud_opera_log.py +++ b/backend/app/admin/crud/crud_opera_log.py @@ -42,6 +42,17 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None: """ await self.create_model(db, obj) + async def batch_create(self, db: AsyncSession, obj_list: list[CreateOperaLogParam]) -> None: + """ + 批量创建操作日志 + + :param db: 数据库会话 + :param obj_list: 创建操作日志参数列表 + :return: + """ + db.add_all([OperaLog(**obj.model_dump()) for obj in obj_list]) + await db.flush() + async def delete(self, db: AsyncSession, pks: list[int]) -> int: """ 批量删除操作日志 diff --git a/backend/app/admin/service/opera_log_service.py b/backend/app/admin/service/opera_log_service.py index c06ce899..742621c4 100644 --- a/backend/app/admin/service/opera_log_service.py +++ b/backend/app/admin/service/opera_log_service.py @@ -4,6 +4,8 @@ from backend.app.admin.crud.crud_opera_log import opera_log_dao from backend.app.admin.schema.opera_log import CreateOperaLogParam, DeleteOperaLogParam +from backend.common.log import log +from backend.common.queue import get_many_from_queue, opera_log_queue from backend.database.db import async_db_session @@ -25,7 +27,7 @@ async def get_select(*, username: str | None, status: int | None, ip: str | None @staticmethod async def create(*, obj: CreateOperaLogParam) -> None: """ - 创建操作日志 + 创建操作日志(同步) :param obj: 操作日志创建参数 :return: @@ -33,6 +35,40 @@ async def create(*, obj: CreateOperaLogParam) -> None: async with async_db_session.begin() as db: await opera_log_dao.create(db, obj) + @staticmethod + async def create_in_queue(*, obj: CreateOperaLogParam) -> None: + """ + 创建操作日志(入队) + + :param obj: 操作日志创建参数 + :return: + """ + await opera_log_queue.put(obj) + + @staticmethod + async def batch_create_consumer() -> None: + """ + 批量创建操作日志消费者 + + :return: + """ + while True: + try: + logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1) + if logs: + log.info(f"处理日志: {len(logs)} 条.", ) + async with async_db_session.begin() as db: + await opera_log_dao.batch_create(db, logs) + else: + log.debug("无日志可处理") + + except Exception as e: + log.error(f'批量创建操作日志失败: {e}') + finally: + # 防止队列阻塞 + if not opera_log_queue.empty(): + opera_log_queue.task_done() + @staticmethod async def delete(*, obj: DeleteOperaLogParam) -> int: """ diff --git a/backend/common/queue.py b/backend/common/queue.py new file mode 100644 index 00000000..3a311276 --- /dev/null +++ b/backend/common/queue.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import asyncio + +from asyncio import Queue +from typing import List + +# 操作日志队列 +opera_log_queue: Queue = Queue() + + +async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> List: + """ + 在指定的超时时间内,从异步队列中批量获取项目。 + + 此函数会尝试从给定的 `asyncio.Queue` 中获取最多 `max_items` 个项目。 + 它会为整个获取过程设置一个总的 `timeout` 秒数的超时限制。如果在超时 + 时间内未能收集到 `max_items` 个项目,函数将返回当前已成功获取的所有项目。 + + Args: + queue: 用于获取项目的 `asyncio.Queue` 队列。 + max_items: 希望从队列中获取的最大项目数量。 + timeout: 总的等待超时时间(秒)。 + + Returns: + 一个从队列中获取到的项目列表。如果发生超时, + 列表中的项目数量可能会少于 `max_items`。 + """ + results = [] + try: + # 设置一个总的超时范围 + async with asyncio.timeout(timeout): + while len(results) < max_items: + item = await queue.get() + results.append(item) + except asyncio.TimeoutError: + pass # 超时后返回已有的 items + return results diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 439a2420..072b07d8 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import os +from asyncio import create_task from contextlib import asynccontextmanager from typing import AsyncGenerator @@ -14,8 +15,10 @@ from starlette.middleware.authentication import AuthenticationMiddleware from starlette.staticfiles import StaticFiles +from backend.app.admin.service.opera_log_service import opera_log_service from backend.common.exception.exception_handler import register_exception from backend.common.log import set_custom_logfile, setup_logging +from backend.common.queue import opera_log_queue from backend.core.conf import settings from backend.core.path_conf import STATIC_DIR, UPLOAD_DIR from backend.database.db import create_table @@ -47,6 +50,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]: prefix=settings.REQUEST_LIMITER_REDIS_PREFIX, http_callback=http_limit_callback, ) + # 启动操作日志消费者 + app.state.opera_log_consumer = create_task(opera_log_service.batch_create_consumer()) yield diff --git a/backend/middleware/opera_log_middleware.py b/backend/middleware/opera_log_middleware.py index ce773548..a1e293f4 100644 --- a/backend/middleware/opera_log_middleware.py +++ b/backend/middleware/opera_log_middleware.py @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- import time -from asyncio import create_task from typing import Any from asgiref.sync import sync_to_async @@ -108,7 +107,7 @@ async def dispatch(self, request: Request, call_next: Any) -> Response: cost_time=elapsed, # 可能和日志存在微小差异(可忽略) opera_time=request.state.start_time, ) - create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore + await opera_log_service.create_in_queue(obj=opera_log_in) # 错误抛出 if error: From 1b1b0f688fbcd7772c2bed1744be2a31424a86f0 Mon Sep 17 00:00:00 2001 From: IAseven <38178039+IAseven@users.noreply.github.com> Date: Mon, 4 Aug 2025 17:31:50 +0800 Subject: [PATCH 2/9] Delete GEMINI.md --- GEMINI.md | 60 ------------------------------------------------------- 1 file changed, 60 deletions(-) delete mode 100644 GEMINI.md diff --git a/GEMINI.md b/GEMINI.md deleted file mode 100644 index eaef7f9e..00000000 --- a/GEMINI.md +++ /dev/null @@ -1,60 +0,0 @@ -# FastAPI Best Architecture 项目架构分析 - -## 1. 总体概述 - -本项目是一个基于 FastAPI 的企业级后端架构解决方案,名为 **FastAPI Best Architecture (FBA)**。它不仅仅是一个项目骨架,更是一个集成了众多最佳实践和核心功能的半成品应用,旨在提供一个高性能、高扩展性、高安全性的后端基础。 - -其核心设计理念是 **模块化** 和 **插件化**,将复杂的功能解耦到不同的组件和插件中,并通过一个中心化的“注册器”进行组装。 - -## 2. 核心技术栈 - -- **Web 框架**: FastAPI -- **编程语言**: Python 3.10+ -- **数据库 ORM**: SQLAlchemy 2.0 (完全异步) -- **数据库支持**: MySQL, PostgreSQL -- **数据库迁移**: Alembic -- **数据验证/序列化**: Pydantic V2, msgspec -- **异步任务队列**: Celery (支持 Redis / RabbitMQ) -- **实时通信**: Socket.IO -- **代码规范与工具链**: - - **包管理**: uv - - **代码检查/格式化**: Ruff - - **项目构建**: Hatch -- **容器化**: Docker - -## 3. 架构设计 - -项目采用了一种官方称之为“伪三层架构”的设计模式,模仿了 Java Web 开发中的经典分层思想,使得代码职责清晰,易于维护。 - -- **`api` (视图层)**: 负责处理 HTTP 请求,参数校验,并调用 `service` 层。类似于 Controller。 -- **`schema` (数据传输层)**: 定义 Pydantic 模型,用于 API 的请求体、响应体以及不同层之间的数据传输。类似于 DTO (Data Transfer Object)。 -- **`service` (业务逻辑层)**: 封装核心业务逻辑,处理复杂的业务流程,是应用的核心。 -- **`crud` (数据访问层)**: 负责与数据库的直接交互(增删改查),将业务逻辑与数据操作解耦。类似于 DAO (Data Access Object)。 -- **`model` (模型层)**: 定义 SQLAlchemy 的数据库模型。类似于 Entity。 - -## 4. 启动与装配流程 - -应用的启动和组装过程体现了其高度模块化的设计: - -1. **入口 (`backend/main.py`)**: - - 非常简洁,主要负责 **动态加载插件**。 - - 在启动时,它会自动扫描 `plugin` 目录,并为每个插件安装其声明在 `requirements.txt` 中的依赖。 - - 完成插件准备后,调用核心注册器 `register_app()` 来创建和配置 FastAPI 应用。 - -2. **注册中心 (`backend/core/registrar.py`)**: - - 这是整个应用的“装配车间”。 - - `register_app()` 函数是核心,它按顺序执行一系列注册操作,将所有组件组装到 FastAPI 实例中。 - - **生命周期管理 (`register_init`)**: 通过 FastAPI 的 `lifespan` 事件,在应用启动时自动创建数据库表、初始化 API 限流器;在应用关闭时,优雅地释放 Redis 等连接资源。 - - **中间件注册 (`register_middleware`)**: 按精心设计的顺序加载中间件,实现了请求追踪、访问日志、CORS、JWT 认证和操作日志等功能。 - - **路由注册 (`register_router`)**: 通过 `build_final_router()` 动态地从核心应用和所有插件中收集 API 路由,并统一加载。 - - **其他组件**: 依次注册日志系统、Socket.IO、静态文件服务、分页功能和全局异常处理器。 - -## 5. 核心功能特性 - -- **完备的 RBAC**: 内置了基于角色的访问控制系统,包含用户、角色、菜单、权限等功能。 -- **强大的配置系统 (`backend/core/conf.py`)**: 使用 `pydantic-settings` 管理配置,支持多环境 (`dev`/`pro`),类型安全,配置项全面。 -- **插件化扩展**: 可以通过开发新的插件来轻松扩展系统功能,而无需修改核心代码。 -- **全栈异步**: 从 Web 请求、中间件到数据库操作,均采用异步(async/await)实现,性能卓越。 -- **分布式日志追踪**: 通过 `asgi-correlation-id` 为每个请求生成唯一的 `Trace ID`,并贯穿整个日志系统,极大地方便了微服务环境下的问题排查。 -- **操作日志**: 自动记录用户的操作行为,并对密码等敏感信息进行加密或脱敏处理。 -- **开发者体验**: 提供了 CLI 工具、美观的启动日志、自动化的依赖管理和详细的 API 文档,提升了开发效率。 From 174052aa42f8f6bc54b73a9309a6ab98b74a6309 Mon Sep 17 00:00:00 2001 From: wangchaoqun Date: Tue, 5 Aug 2025 09:49:13 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=F0=9F=8C=88=20style:=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 ++- backend/app/admin/service/opera_log_service.py | 6 ++++-- backend/core/registrar.py | 1 - 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 6d1d1dc4..0186615f 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ venv/ .ruff_cache/ .pytest_cache/ .env -docker-compose-dev.yml \ No newline at end of file +docker-compose-dev.yml +GEMINI.md \ No newline at end of file diff --git a/backend/app/admin/service/opera_log_service.py b/backend/app/admin/service/opera_log_service.py index 742621c4..68158f1c 100644 --- a/backend/app/admin/service/opera_log_service.py +++ b/backend/app/admin/service/opera_log_service.py @@ -56,11 +56,13 @@ async def batch_create_consumer() -> None: try: logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1) if logs: - log.info(f"处理日志: {len(logs)} 条.", ) + log.info( + f'处理日志: {len(logs)} 条.', + ) async with async_db_session.begin() as db: await opera_log_dao.batch_create(db, logs) else: - log.debug("无日志可处理") + log.debug('无日志可处理') except Exception as e: log.error(f'批量创建操作日志失败: {e}') diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 072b07d8..234670bd 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -18,7 +18,6 @@ from backend.app.admin.service.opera_log_service import opera_log_service from backend.common.exception.exception_handler import register_exception from backend.common.log import set_custom_logfile, setup_logging -from backend.common.queue import opera_log_queue from backend.core.conf import settings from backend.core.path_conf import STATIC_DIR, UPLOAD_DIR from backend.database.db import create_table From 9656a700372f255d37911b9cff31647bc70823a0 Mon Sep 17 00:00:00 2001 From: wangchaoqun Date: Tue, 5 Aug 2025 10:15:23 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=F0=9F=90=9E=20fix:=20=E9=80=9A=E8=BF=87asy?= =?UTF-8?q?ncio.wait=5Ffor=E5=85=BC=E5=AE=B9py3.10=E4=B8=ADasyncio.timeout?= =?UTF-8?q?=E4=B8=8D=E5=AD=98=E5=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/admin/service/opera_log_service.py | 1 + backend/common/queue.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/backend/app/admin/service/opera_log_service.py b/backend/app/admin/service/opera_log_service.py index 68158f1c..62a2a51a 100644 --- a/backend/app/admin/service/opera_log_service.py +++ b/backend/app/admin/service/opera_log_service.py @@ -54,6 +54,7 @@ async def batch_create_consumer() -> None: """ while True: try: + # TODO max_items timeout Queue.maxsize 应该设置为可配置, 在 setting ? logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1) if logs: log.info( diff --git a/backend/common/queue.py b/backend/common/queue.py index 3a311276..4ff48f4e 100644 --- a/backend/common/queue.py +++ b/backend/common/queue.py @@ -6,7 +6,7 @@ from typing import List # 操作日志队列 -opera_log_queue: Queue = Queue() +opera_log_queue: Queue = Queue(maxsize=100000) async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> List: @@ -27,12 +27,14 @@ async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> L 列表中的项目数量可能会少于 `max_items`。 """ results = [] + + async def collector(): + while len(results) < max_items: + item = await queue.get() + results.append(item) + try: - # 设置一个总的超时范围 - async with asyncio.timeout(timeout): - while len(results) < max_items: - item = await queue.get() - results.append(item) + await asyncio.wait_for(collector(), timeout=timeout) except asyncio.TimeoutError: pass # 超时后返回已有的 items return results From 6b91abcd109d860c8500bf9b16ed71c111b688bb Mon Sep 17 00:00:00 2001 From: wangchaoqun Date: Tue, 5 Aug 2025 19:25:03 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=F0=9F=A6=84=20refactor:=20=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E7=BB=84=E7=BB=87=E6=93=8D=E4=BD=9C=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E6=8F=92=E5=85=A5=E4=BB=A3=E7=A0=81=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/admin/crud/crud_opera_log.py | 3 +- .../app/admin/service/opera_log_service.py | 38 +++---------------- backend/common/queue.py | 28 +++++++------- backend/core/registrar.py | 3 +- backend/middleware/opera_log_middleware.py | 25 +++++++++++- 5 files changed, 44 insertions(+), 53 deletions(-) diff --git a/backend/app/admin/crud/crud_opera_log.py b/backend/app/admin/crud/crud_opera_log.py index 98fccc5e..dc10fa6c 100644 --- a/backend/app/admin/crud/crud_opera_log.py +++ b/backend/app/admin/crud/crud_opera_log.py @@ -50,8 +50,7 @@ async def batch_create(self, db: AsyncSession, obj_list: list[CreateOperaLogPara :param obj_list: 创建操作日志参数列表 :return: """ - db.add_all([OperaLog(**obj.model_dump()) for obj in obj_list]) - await db.flush() + await self.create_models(db, obj_list) async def delete(self, db: AsyncSession, pks: list[int]) -> int: """ diff --git a/backend/app/admin/service/opera_log_service.py b/backend/app/admin/service/opera_log_service.py index 62a2a51a..f1e9edb9 100644 --- a/backend/app/admin/service/opera_log_service.py +++ b/backend/app/admin/service/opera_log_service.py @@ -4,8 +4,6 @@ from backend.app.admin.crud.crud_opera_log import opera_log_dao from backend.app.admin.schema.opera_log import CreateOperaLogParam, DeleteOperaLogParam -from backend.common.log import log -from backend.common.queue import get_many_from_queue, opera_log_queue from backend.database.db import async_db_session @@ -27,7 +25,7 @@ async def get_select(*, username: str | None, status: int | None, ip: str | None @staticmethod async def create(*, obj: CreateOperaLogParam) -> None: """ - 创建操作日志(同步) + 创建操作日志 :param obj: 操作日志创建参数 :return: @@ -36,41 +34,15 @@ async def create(*, obj: CreateOperaLogParam) -> None: await opera_log_dao.create(db, obj) @staticmethod - async def create_in_queue(*, obj: CreateOperaLogParam) -> None: + async def batch_create(*, obj_list: list[CreateOperaLogParam]) -> None: """ - 创建操作日志(入队) + 批量创建操作日志 :param obj: 操作日志创建参数 :return: """ - await opera_log_queue.put(obj) - - @staticmethod - async def batch_create_consumer() -> None: - """ - 批量创建操作日志消费者 - - :return: - """ - while True: - try: - # TODO max_items timeout Queue.maxsize 应该设置为可配置, 在 setting ? - logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1) - if logs: - log.info( - f'处理日志: {len(logs)} 条.', - ) - async with async_db_session.begin() as db: - await opera_log_dao.batch_create(db, logs) - else: - log.debug('无日志可处理') - - except Exception as e: - log.error(f'批量创建操作日志失败: {e}') - finally: - # 防止队列阻塞 - if not opera_log_queue.empty(): - opera_log_queue.task_done() + async with async_db_session.begin() as db: + await opera_log_dao.batch_create(db, obj_list) @staticmethod async def delete(*, obj: DeleteOperaLogParam) -> int: diff --git a/backend/common/queue.py b/backend/common/queue.py index 4ff48f4e..0f33f88c 100644 --- a/backend/common/queue.py +++ b/backend/common/queue.py @@ -5,26 +5,24 @@ from asyncio import Queue from typing import List -# 操作日志队列 -opera_log_queue: Queue = Queue(maxsize=100000) - async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> List: """ 在指定的超时时间内,从异步队列中批量获取项目。 - 此函数会尝试从给定的 `asyncio.Queue` 中获取最多 `max_items` 个项目。 - 它会为整个获取过程设置一个总的 `timeout` 秒数的超时限制。如果在超时 - 时间内未能收集到 `max_items` 个项目,函数将返回当前已成功获取的所有项目。 - - Args: - queue: 用于获取项目的 `asyncio.Queue` 队列。 - max_items: 希望从队列中获取的最大项目数量。 - timeout: 总的等待超时时间(秒)。 - - Returns: - 一个从队列中获取到的项目列表。如果发生超时, - 列表中的项目数量可能会少于 `max_items`。 + 此函数会尝试从给定的 ``asyncio.Queue`` 中获取最多 ``max_items`` 个项目。 + 它会为整个获取过程设置一个总的 ``timeout`` 秒数的超时限制。 + 如果在超时时间内未能收集到 ``max_items`` 个项目, + 函数将返回当前已成功获取的所有项目。 + + :param queue: 用于获取项目的 ``asyncio.Queue`` 队列。 + :type queue: asyncio.Queue + :param max_items: 希望从队列中获取的最大项目数量。 + :type max_items: int + :param timeout: 总的等待超时时间(秒)。 + :type timeout: float + :return: 一个从队列中获取到的项目列表。如果发生超时,列表中的项目数量可能会少于 ``max_items``。 + :rtype: List """ results = [] diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 234670bd..94e6cb03 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -15,7 +15,6 @@ from starlette.middleware.authentication import AuthenticationMiddleware from starlette.staticfiles import StaticFiles -from backend.app.admin.service.opera_log_service import opera_log_service from backend.common.exception.exception_handler import register_exception from backend.common.log import set_custom_logfile, setup_logging from backend.core.conf import settings @@ -50,7 +49,7 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]: http_callback=http_limit_callback, ) # 启动操作日志消费者 - app.state.opera_log_consumer = create_task(opera_log_service.batch_create_consumer()) + app.state.opera_log_consumer = create_task(OperaLogMiddleware.batch_create_consumer()) yield diff --git a/backend/middleware/opera_log_middleware.py b/backend/middleware/opera_log_middleware.py index a1e293f4..44a55f37 100644 --- a/backend/middleware/opera_log_middleware.py +++ b/backend/middleware/opera_log_middleware.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import time +from asyncio import Queue from typing import Any from asgiref.sync import sync_to_async @@ -14,6 +15,7 @@ from backend.app.admin.service.opera_log_service import opera_log_service from backend.common.enums import OperaLogCipherType, StatusType from backend.common.log import log +from backend.common.queue import get_many_from_queue from backend.core.conf import settings from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher from backend.utils.trace_id import get_request_trace_id @@ -22,6 +24,9 @@ class OperaLogMiddleware(BaseHTTPMiddleware): """操作日志中间件""" + # 操作日志队列, 指定默认队列长度为100000 + opera_log_queue: Queue = Queue(maxsize=100000) + async def dispatch(self, request: Request, call_next: Any) -> Response: """ 处理请求并记录操作日志 @@ -107,7 +112,7 @@ async def dispatch(self, request: Request, call_next: Any) -> Response: cost_time=elapsed, # 可能和日志存在微小差异(可忽略) opera_time=request.state.start_time, ) - await opera_log_service.create_in_queue(obj=opera_log_in) + await self.opera_log_queue.put(opera_log_in) # 错误抛出 if error: @@ -190,3 +195,21 @@ def desensitization(args: dict[str, Any]) -> dict[str, Any] | None: case _: args[arg_type][key] = '******' return args + + @staticmethod + async def batch_create_consumer() -> None: + """批量创建操作日志消费者""" + while True: + opera_log_queue = OperaLogMiddleware.opera_log_queue + logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1) + if len(logs) < 1: + continue + log.info(f'处理日志: {len(logs)} 条.') + try: + await opera_log_service.batch_create(obj_list=logs) + except Exception as e: + log.error(f'批量创建操作日志失败: {e}, logs: {logs}') + finally: + # 防止队列阻塞 + if not opera_log_queue.empty(): + opera_log_queue.task_done() From 14fac9755d8dd15919b94b0a97669ee777e46812 Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Wed, 6 Aug 2025 23:58:24 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/admin/crud/crud_opera_log.py | 8 ++-- .../app/admin/service/opera_log_service.py | 6 +-- backend/common/queue.py | 46 ++++++++----------- backend/core/conf.py | 4 +- backend/core/registrar.py | 4 +- backend/middleware/opera_log_middleware.py | 35 +++++++------- 6 files changed, 49 insertions(+), 54 deletions(-) diff --git a/backend/app/admin/crud/crud_opera_log.py b/backend/app/admin/crud/crud_opera_log.py index dc10fa6c..6acd3d40 100644 --- a/backend/app/admin/crud/crud_opera_log.py +++ b/backend/app/admin/crud/crud_opera_log.py @@ -37,20 +37,20 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None: 创建操作日志 :param db: 数据库会话 - :param obj: 创建操作日志参数 + :param obj: 操作日志创建参数 :return: """ await self.create_model(db, obj) - async def batch_create(self, db: AsyncSession, obj_list: list[CreateOperaLogParam]) -> None: + async def bulk_create(self, db: AsyncSession, objs: list[CreateOperaLogParam]) -> None: """ 批量创建操作日志 :param db: 数据库会话 - :param obj_list: 创建操作日志参数列表 + :param objs: 操作日志创建参数列表 :return: """ - await self.create_models(db, obj_list) + await self.create_models(db, objs) async def delete(self, db: AsyncSession, pks: list[int]) -> int: """ diff --git a/backend/app/admin/service/opera_log_service.py b/backend/app/admin/service/opera_log_service.py index f1e9edb9..c6d5c9ec 100644 --- a/backend/app/admin/service/opera_log_service.py +++ b/backend/app/admin/service/opera_log_service.py @@ -34,15 +34,15 @@ async def create(*, obj: CreateOperaLogParam) -> None: await opera_log_dao.create(db, obj) @staticmethod - async def batch_create(*, obj_list: list[CreateOperaLogParam]) -> None: + async def bulk_create(*, objs: list[CreateOperaLogParam]) -> None: """ 批量创建操作日志 - :param obj: 操作日志创建参数 + :param objs: 操作日志创建参数列表 :return: """ async with async_db_session.begin() as db: - await opera_log_dao.batch_create(db, obj_list) + await opera_log_dao.bulk_create(db, objs) @staticmethod async def delete(*, obj: DeleteOperaLogParam) -> int: diff --git a/backend/common/queue.py b/backend/common/queue.py index 0f33f88c..e7bb1be5 100644 --- a/backend/common/queue.py +++ b/backend/common/queue.py @@ -3,36 +3,30 @@ import asyncio from asyncio import Queue -from typing import List -async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> List: +async def batch_dequeue(queue: Queue, max_items: int, timeout: float) -> list: """ - 在指定的超时时间内,从异步队列中批量获取项目。 + 从异步队列中获取多个项目 - 此函数会尝试从给定的 ``asyncio.Queue`` 中获取最多 ``max_items`` 个项目。 - 它会为整个获取过程设置一个总的 ``timeout`` 秒数的超时限制。 - 如果在超时时间内未能收集到 ``max_items`` 个项目, - 函数将返回当前已成功获取的所有项目。 - - :param queue: 用于获取项目的 ``asyncio.Queue`` 队列。 - :type queue: asyncio.Queue - :param max_items: 希望从队列中获取的最大项目数量。 - :type max_items: int - :param timeout: 总的等待超时时间(秒)。 - :type timeout: float - :return: 一个从队列中获取到的项目列表。如果发生超时,列表中的项目数量可能会少于 ``max_items``。 - :rtype: List + :param queue: 用于获取项目的 `asyncio.Queue` 队列 + :param max_items: 从队列中获取的最大项目数量 + :param timeout: 总的等待超时时间(秒) + :return: """ - results = [] + items = [] + + loop = asyncio.get_event_loop() + end_time = loop.time() + timeout - async def collector(): - while len(results) < max_items: - item = await queue.get() - results.append(item) + while len(items) < max_items: + remaining = end_time - loop.time() + if remaining <= 0: + break + try: + item = await asyncio.wait_for(queue.get(), timeout=remaining) + items.append(item) + except asyncio.TimeoutError: + break - try: - await asyncio.wait_for(collector(), timeout=timeout) - except asyncio.TimeoutError: - pass # 超时后返回已有的 items - return results + return items diff --git a/backend/core/conf.py b/backend/core/conf.py index a86027ef..fc04721c 100644 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -52,7 +52,7 @@ class Settings(BaseSettings): FASTAPI_STATIC_FILES: bool = True # 数据库 - DATABASE_ECHO: bool | Literal['debug'] = False + DATABASE_ECHO: bool | Literal['debug'] = True DATABASE_POOL_ECHO: bool | Literal['debug'] = False DATABASE_SCHEMA: str = 'fba' DATABASE_CHARSET: str = 'utf8mb4' @@ -177,6 +177,8 @@ class Settings(BaseSettings): 'new_password', 'confirm_password', ] + OPERA_LOG_QUEUE_MAX: int = 100 + OPERA_LOG_QUEUE_TIMEOUT: int = 60 # 1 分钟 # Plugin 配置 PLUGIN_PIP_CHINA: bool = True diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 94e6cb03..b1173168 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -48,8 +48,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]: prefix=settings.REQUEST_LIMITER_REDIS_PREFIX, http_callback=http_limit_callback, ) - # 启动操作日志消费者 - app.state.opera_log_consumer = create_task(OperaLogMiddleware.batch_create_consumer()) + # 创建操作日志任务 + create_task(OperaLogMiddleware.consumer()) yield diff --git a/backend/middleware/opera_log_middleware.py b/backend/middleware/opera_log_middleware.py index 44a55f37..a3eca735 100644 --- a/backend/middleware/opera_log_middleware.py +++ b/backend/middleware/opera_log_middleware.py @@ -15,7 +15,7 @@ from backend.app.admin.service.opera_log_service import opera_log_service from backend.common.enums import OperaLogCipherType, StatusType from backend.common.log import log -from backend.common.queue import get_many_from_queue +from backend.common.queue import batch_dequeue from backend.core.conf import settings from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher from backend.utils.trace_id import get_request_trace_id @@ -24,7 +24,6 @@ class OperaLogMiddleware(BaseHTTPMiddleware): """操作日志中间件""" - # 操作日志队列, 指定默认队列长度为100000 opera_log_queue: Queue = Queue(maxsize=100000) async def dispatch(self, request: Request, call_next: Any) -> Response: @@ -196,20 +195,20 @@ def desensitization(args: dict[str, Any]) -> dict[str, Any] | None: args[arg_type][key] = '******' return args - @staticmethod - async def batch_create_consumer() -> None: - """批量创建操作日志消费者""" + @classmethod + async def consumer(cls) -> None: + """操作日志消费者""" while True: - opera_log_queue = OperaLogMiddleware.opera_log_queue - logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1) - if len(logs) < 1: - continue - log.info(f'处理日志: {len(logs)} 条.') - try: - await opera_log_service.batch_create(obj_list=logs) - except Exception as e: - log.error(f'批量创建操作日志失败: {e}, logs: {logs}') - finally: - # 防止队列阻塞 - if not opera_log_queue.empty(): - opera_log_queue.task_done() + logs = await batch_dequeue( + cls.opera_log_queue, + max_items=settings.OPERA_LOG_QUEUE_MAX, + timeout=settings.OPERA_LOG_QUEUE_TIMEOUT, + ) + if logs: + try: + if settings.DATABASE_ECHO: + log.info('自动执行【操作日志批量创建】任务...') + await opera_log_service.bulk_create(objs=logs) + finally: + if not cls.opera_log_queue.empty(): + cls.opera_log_queue.task_done() From d0c1208b3db5210e9856c4bc489067b7e426670e Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Wed, 6 Aug 2025 23:59:21 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E6=81=A2=E5=A4=8D=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/core/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/core/conf.py b/backend/core/conf.py index fc04721c..0ea94be0 100644 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -52,7 +52,7 @@ class Settings(BaseSettings): FASTAPI_STATIC_FILES: bool = True # 数据库 - DATABASE_ECHO: bool | Literal['debug'] = True + DATABASE_ECHO: bool | Literal['debug'] = False DATABASE_POOL_ECHO: bool | Literal['debug'] = False DATABASE_SCHEMA: str = 'fba' DATABASE_CHARSET: str = 'utf8mb4' From cd303c8dfc1aeb521caaa94aa9a43ddd0ed25511 Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Thu, 7 Aug 2025 00:01:16 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E6=81=A2=E5=A4=8D=E9=BB=98=E8=AE=A4=20.git?= =?UTF-8?q?ignore=20=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 --- 1 file changed, 3 deletions(-) diff --git a/.gitignore b/.gitignore index 0186615f..779ee46e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,3 @@ venv/ .python-version .ruff_cache/ .pytest_cache/ -.env -docker-compose-dev.yml -GEMINI.md \ No newline at end of file From e90636110f45198b00ef8e1b9b2b69865ac2e75a Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Thu, 7 Aug 2025 17:33:08 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=98=9F=E5=88=97?= =?UTF-8?q?=E6=89=B9=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/common/queue.py | 19 ++++++++----------- backend/core/conf.py | 2 +- backend/middleware/opera_log_middleware.py | 2 +- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/backend/common/queue.py b/backend/common/queue.py index e7bb1be5..7c65a722 100644 --- a/backend/common/queue.py +++ b/backend/common/queue.py @@ -16,17 +16,14 @@ async def batch_dequeue(queue: Queue, max_items: int, timeout: float) -> list: """ items = [] - loop = asyncio.get_event_loop() - end_time = loop.time() + timeout - - while len(items) < max_items: - remaining = end_time - loop.time() - if remaining <= 0: - break - try: - item = await asyncio.wait_for(queue.get(), timeout=remaining) + async def collector(): + while len(items) < max_items: + item = await queue.get() items.append(item) - except asyncio.TimeoutError: - break + + try: + await asyncio.wait_for(collector(), timeout=timeout) + except asyncio.TimeoutError: + pass return items diff --git a/backend/core/conf.py b/backend/core/conf.py index 0ea94be0..a57298e6 100644 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -177,7 +177,7 @@ class Settings(BaseSettings): 'new_password', 'confirm_password', ] - OPERA_LOG_QUEUE_MAX: int = 100 + OPERA_LOG_QUEUE_BATCH_CONSUME_SIZE: int = 100 OPERA_LOG_QUEUE_TIMEOUT: int = 60 # 1 分钟 # Plugin 配置 diff --git a/backend/middleware/opera_log_middleware.py b/backend/middleware/opera_log_middleware.py index a3eca735..263815cf 100644 --- a/backend/middleware/opera_log_middleware.py +++ b/backend/middleware/opera_log_middleware.py @@ -201,7 +201,7 @@ async def consumer(cls) -> None: while True: logs = await batch_dequeue( cls.opera_log_queue, - max_items=settings.OPERA_LOG_QUEUE_MAX, + max_items=settings.OPERA_LOG_QUEUE_BATCH_CONSUME_SIZE, timeout=settings.OPERA_LOG_QUEUE_TIMEOUT, ) if logs: