diff --git a/app/clients/__init__.py b/app/clients/__init__.py index f24ea1f..3afecaf 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -1,5 +1,5 @@ from .base_client import BaseClient -from .deepseek_client import DeepSeekClient +from .deepseek_client import DeepSeekClient, DropContent from .claude_client import ClaudeClient -__all__ = ['BaseClient', 'DeepSeekClient', 'ClaudeClient'] +__all__ = ['BaseClient', 'DeepSeekClient', 'ClaudeClient', 'DropContent'] diff --git a/app/clients/base_client.py b/app/clients/base_client.py index ad607b1..38a579e 100644 --- a/app/clients/base_client.py +++ b/app/clients/base_client.py @@ -38,6 +38,13 @@ def __init__( self.api_url = api_url self.timeout = timeout or self.DEFAULT_TIMEOUT self.proxy = proxy + self.active_http_response = None + + def reset_state(self) -> None: + if not self.active_http_response.closed: + logger.info("关闭 deepseek 连接") + self.active_http_response.close() + del self.active_http_response async def _make_request( self, headers: dict, data: dict, timeout: Optional[aiohttp.ClientTimeout] = None @@ -81,6 +88,8 @@ async def _make_request( timeout=request_timeout, proxy=proxy_url ) as response: + self.active_http_response = response + # 检查响应状态 if not response.ok: error_text = await response.text() diff --git a/app/clients/deepseek_client.py b/app/clients/deepseek_client.py index e92e6a8..170020b 100644 --- a/app/clients/deepseek_client.py +++ b/app/clients/deepseek_client.py @@ -157,3 +157,6 @@ async def stream_chat( logger.error(f"JSON 解析错误: {e}") except Exception as e: logger.error(f"处理 chunk 时发生错误: {e}") + +class DropContent(Exception): + pass diff --git a/app/deepclaude/deepclaude.py b/app/deepclaude/deepclaude.py index 82ff7ff..42e6f6c 100644 --- a/app/deepclaude/deepclaude.py +++ b/app/deepclaude/deepclaude.py @@ -9,6 +9,7 @@ from app.clients import ClaudeClient, DeepSeekClient from app.utils.logger import logger +from ..clients.deepseek_client import DeepSeekClient, DropContent class DeepClaude: @@ -82,10 +83,9 @@ async def chat_completions_with_stream( # 队列,用于传递 DeepSeek 推理内容给 Claude claude_queue = asyncio.Queue() - # 用于存储 DeepSeek 的推理累积内容 - reasoning_content = [] - async def process_deepseek(): + # 用于存储 DeepSeek 的推理累积内容 + reasoning_content = [] logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}") try: async for content_type, content in self.deepseek_client.stream_chat( @@ -113,15 +113,17 @@ async def process_deepseek(): f"data: {json.dumps(response)}\n\n".encode("utf-8") ) elif content_type == "content": - # 当收到 content 类型时,将完整的推理内容发送到 claude_queue,并结束 DeepSeek 流处理 - logger.info( - f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}" - ) await claude_queue.put("".join(reasoning_content)) - break + raise DropContent("DeepSeek 思考推理部分完成") + + except DropContent as e: + logger.info({e}) + self.deepseek_client.reset_state() + except Exception as e: logger.error(f"处理 DeepSeek 流时发生错误: {e}") await claude_queue.put("") + # 用 None 标记 DeepSeek 任务结束 logger.info("DeepSeek 任务处理完成,标记结束") await output_queue.put(None) diff --git a/app/openai_composite/openai_composite.py b/app/openai_composite/openai_composite.py index f53fe05..5d35ba3 100644 --- a/app/openai_composite/openai_composite.py +++ b/app/openai_composite/openai_composite.py @@ -8,6 +8,7 @@ from app.clients import DeepSeekClient from app.clients.openai_compatible_client import OpenAICompatibleClient from app.utils.logger import logger +from ..clients.deepseek_client import DeepSeekClient, DropContent class OpenAICompatibleComposite: @@ -77,10 +78,10 @@ async def chat_completions_with_stream( # 队列,用于传递 DeepSeek 推理内容 reasoning_queue = asyncio.Queue() - # 用于存储 DeepSeek 的推理累积内容 - reasoning_content = [] - async def process_deepseek(): + logger.info(f"1. 第一阶段推理请求:{deepseek_model}\ndata:{messages}\n") + # 用于存储 DeepSeek 的推理累积内容 + reasoning_content = [] logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}") try: async for content_type, content in self.deepseek_client.stream_chat( @@ -108,15 +109,17 @@ async def process_deepseek(): f"data: {json.dumps(response)}\n\n".encode("utf-8") ) elif content_type == "content": - # 当收到 content 类型时,将完整的推理内容发送到 reasoning_queue - logger.info( - f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}" - ) await reasoning_queue.put("".join(reasoning_content)) - break + raise DropContent("DeepSeek 思考推理部分完成") + + except DropContent as e: + logger.info({e}) + self.deepseek_client.reset_state() + except Exception as e: logger.error(f"处理 DeepSeek 流时发生错误: {e}") await reasoning_queue.put("") + # 标记 DeepSeek 任务结束 logger.info("DeepSeek 任务处理完成,标记结束") await output_queue.put(None) @@ -138,6 +141,8 @@ async def process_openai(): Here's my another model's reasoning process:\n{reasoning}\n\n Based on this reasoning, provide your response directly to me:""" + logger.info(f"2. 第二阶段推理请求:{target_model}\ndata:{combined_content}\n") + # 检查过滤后的消息列表是否为空 if not openai_messages: raise ValueError("消息列表为空,无法处理请求")