Skip to content

Commit e09062e

Browse files
authored
Optimize the opera log storage logic through queue (#750)
* ✨ feat: 操作日志中间件添加批量插入功能 * Delete GEMINI.md * 🌈 style: 修复格式化错误 * 🐞 fix: 通过asyncio.wait_for兼容py3.10中asyncio.timeout不存在 * 🦄 refactor: 重新组织操作日志批量插入代码逻辑 * 优化代码实现 * 恢复默认配置 * 恢复默认 .gitignore 文件 * 更新队列批处理逻辑
1 parent 8c00492 commit e09062e

File tree

6 files changed

+79
-3
lines changed

6 files changed

+79
-3
lines changed

backend/app/admin/crud/crud_opera_log.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,21 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None:
3737
创建操作日志
3838
3939
:param db: 数据库会话
40-
:param obj: 创建操作日志参数
40+
:param obj: 操作日志创建参数
4141
:return:
4242
"""
4343
await self.create_model(db, obj)
4444

45+
async def bulk_create(self, db: AsyncSession, objs: list[CreateOperaLogParam]) -> None:
46+
"""
47+
批量创建操作日志
48+
49+
:param db: 数据库会话
50+
:param objs: 操作日志创建参数列表
51+
:return:
52+
"""
53+
await self.create_models(db, objs)
54+
4555
async def delete(self, db: AsyncSession, pks: list[int]) -> int:
4656
"""
4757
批量删除操作日志

backend/app/admin/service/opera_log_service.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ async def create(*, obj: CreateOperaLogParam) -> None:
3333
async with async_db_session.begin() as db:
3434
await opera_log_dao.create(db, obj)
3535

36+
@staticmethod
37+
async def bulk_create(*, objs: list[CreateOperaLogParam]) -> None:
38+
"""
39+
批量创建操作日志
40+
41+
:param objs: 操作日志创建参数列表
42+
:return:
43+
"""
44+
async with async_db_session.begin() as db:
45+
await opera_log_dao.bulk_create(db, objs)
46+
3647
@staticmethod
3748
async def delete(*, obj: DeleteOperaLogParam) -> int:
3849
"""

backend/common/queue.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
import asyncio
4+
5+
from asyncio import Queue
6+
7+
8+
async def batch_dequeue(queue: Queue, max_items: int, timeout: float) -> list:
9+
"""
10+
从异步队列中获取多个项目
11+
12+
:param queue: 用于获取项目的 `asyncio.Queue` 队列
13+
:param max_items: 从队列中获取的最大项目数量
14+
:param timeout: 总的等待超时时间(秒)
15+
:return:
16+
"""
17+
items = []
18+
19+
async def collector():
20+
while len(items) < max_items:
21+
item = await queue.get()
22+
items.append(item)
23+
24+
try:
25+
await asyncio.wait_for(collector(), timeout=timeout)
26+
except asyncio.TimeoutError:
27+
pass
28+
29+
return items

backend/core/conf.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ class Settings(BaseSettings):
179179
'new_password',
180180
'confirm_password',
181181
]
182+
OPERA_LOG_QUEUE_BATCH_CONSUME_SIZE: int = 100
183+
OPERA_LOG_QUEUE_TIMEOUT: int = 60 # 1 分钟
182184

183185
# Plugin 配置
184186
PLUGIN_PIP_CHINA: bool = True

backend/core/registrar.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# -*- coding: utf-8 -*-
33
import os
44

5+
from asyncio import create_task
56
from contextlib import asynccontextmanager
67
from typing import AsyncGenerator
78

@@ -47,6 +48,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
4748
prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
4849
http_callback=http_limit_callback,
4950
)
51+
# 创建操作日志任务
52+
create_task(OperaLogMiddleware.consumer())
5053

5154
yield
5255

backend/middleware/opera_log_middleware.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# -*- coding: utf-8 -*-
33
import time
44

5-
from asyncio import create_task
5+
from asyncio import Queue
66
from typing import Any
77

88
from asgiref.sync import sync_to_async
@@ -15,6 +15,7 @@
1515
from backend.app.admin.service.opera_log_service import opera_log_service
1616
from backend.common.enums import OperaLogCipherType, StatusType
1717
from backend.common.log import log
18+
from backend.common.queue import batch_dequeue
1819
from backend.core.conf import settings
1920
from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher
2021
from backend.utils.trace_id import get_request_trace_id
@@ -23,6 +24,8 @@
2324
class OperaLogMiddleware(BaseHTTPMiddleware):
2425
"""操作日志中间件"""
2526

27+
opera_log_queue: Queue = Queue(maxsize=100000)
28+
2629
async def dispatch(self, request: Request, call_next: Any) -> Response:
2730
"""
2831
处理请求并记录操作日志
@@ -107,7 +110,7 @@ async def dispatch(self, request: Request, call_next: Any) -> Response:
107110
cost_time=elapsed, # 可能和日志存在微小差异(可忽略)
108111
opera_time=request.state.start_time,
109112
)
110-
create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore
113+
await self.opera_log_queue.put(opera_log_in)
111114

112115
# 错误抛出
113116
if error:
@@ -190,3 +193,21 @@ def desensitization(args: dict[str, Any]) -> dict[str, Any]:
190193
args[key] = '******'
191194

192195
return args
196+
197+
@classmethod
198+
async def consumer(cls) -> None:
199+
"""操作日志消费者"""
200+
while True:
201+
logs = await batch_dequeue(
202+
cls.opera_log_queue,
203+
max_items=settings.OPERA_LOG_QUEUE_BATCH_CONSUME_SIZE,
204+
timeout=settings.OPERA_LOG_QUEUE_TIMEOUT,
205+
)
206+
if logs:
207+
try:
208+
if settings.DATABASE_ECHO:
209+
log.info('自动执行【操作日志批量创建】任务...')
210+
await opera_log_service.bulk_create(objs=logs)
211+
finally:
212+
if not cls.opera_log_queue.empty():
213+
cls.opera_log_queue.task_done()

0 commit comments

Comments
 (0)