From 962427a6645872171f78e126fa01d4f95e555944 Mon Sep 17 00:00:00 2001 From: welsir <1824379011@qq.com> Date: Mon, 25 Aug 2025 00:51:58 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E6=9E=84=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/UserInteractionController.java | 246 +++++++++--------- 1 file changed, 127 insertions(+), 119 deletions(-) diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java index 63533c1..0e83fee 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java @@ -1,5 +1,7 @@ package io.github.timemachinelab.controller; +import io.github.timemachinelab.core.fingerprint.FingerprintService; +import io.github.timemachinelab.core.fingerprint.UserFingerprint; import io.github.timemachinelab.core.session.application.MessageProcessingService; import io.github.timemachinelab.core.session.application.SessionManagementService; import io.github.timemachinelab.core.session.application.SseNotificationService; @@ -16,8 +18,10 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.annotation.Resource; +import javax.servlet.http.HttpServletRequest; import javax.validation.Valid; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -25,7 +29,7 @@ /** * 用户交互控制器 * 提供用户交互相关的API接口 - * + * * @author suifeng * @date 2025/1/20 */ @@ -40,91 +44,68 @@ public class UserInteractionController { private SessionManagementService sessionManagementService; @Resource private SseNotificationService sseNotificationService; + @Resource + private FingerprintService fingerprintService; /** * 建立SSE连接 + * 1. 生成指纹(如果不存在),返回空的sessionList + * 2. 如果生成的指纹已经存在,获取对应的sessionList返回 + * @param request HTTP请求对象 + * @return SSE连接 */ @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public SseEmitter streamConversation(@RequestParam(required = false) String sessionId, - @RequestParam String userId) { - log.info("建立SSE连接 - 会话ID: {}, 用户ID: {}", sessionId, userId); + public SseEmitter streamConversation(HttpServletRequest request) { + log.info("建立SSE连接 - 来源IP: {}", request.getRemoteAddr()); - boolean isNewSession = false; - ConversationSession session; - try { - if (sessionId == null || sessionId.isEmpty()) { - // 新建会话 - session = sessionManagementService.createNewSession(userId); - sessionId = session.getSessionId(); - isNewSession = true; - log.info("创建新会话 - 用户ID: {}, 会话ID: {}", userId, sessionId); - } else { - // 使用现有会话 - session = sessionManagementService.validateAndGetSession(userId, sessionId); - if (session == null) { - log.warn("会话不存在或无效 - 用户ID: {}, 会话ID: {}", userId, sessionId); - // 创建新会话作为fallback - session = sessionManagementService.createNewSession(userId != null ? userId : "anonymous_" + UUID.randomUUID().toString().substring(0, 8)); - sessionId = session.getSessionId(); - isNewSession = true; - } - } - + // 1. 自动生成或获取用户指纹 + UserFingerprint userFingerprint = fingerprintService.getOrCreateUserFingerprint(request); + String fingerprint = userFingerprint.getFingerprint(); + + log.info("用户指纹: {}, 是否新用户: {}, 访问次数: {}", + fingerprint, + userFingerprint.getVisitCount() == 1, + userFingerprint.getVisitCount()); + + // 2. 使用指纹作为用户ID + + // 3. 获取用户的会话ID列表 + List sessionList = sessionManagementService.getUserSessionIds(fingerprint); + log.info("用户指纹: {} 对应的会话ID列表: {}", fingerprint, sessionList); + + // 4. 创建SSE连接(使用指纹作为连接标识) SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); - sseNotificationService.registerSseConnection(sessionId, emitter); - - // 连接建立时发送会话信息 + sseNotificationService.registerSseConnection(fingerprint, emitter); + + // 5. 连接建立时发送用户信息和sessionList Map connectionData = new ConcurrentHashMap<>(); - connectionData.put("sessionId", sessionId); - connectionData.put("userId", session.getUserId()); - connectionData.put("isNewSession", isNewSession); + connectionData.put("fingerprintId", fingerprint); + connectionData.put("sessionList", sessionList); // 空集合或现有会话列表 + connectionData.put("isNewUser", userFingerprint.getVisitCount() == 1); + connectionData.put("displayName", userFingerprint.getDisplayName()); + connectionData.put("visitCount", userFingerprint.getVisitCount()); connectionData.put("timestamp", System.currentTimeMillis()); - - // 根据会话状态返回nodeId - if (isNewSession) { - // 新会话返回根节点ID - connectionData.put("nodeId", "1"); - log.info("新会话返回根节点ID: 1 - 会话: {}", sessionId); - } else if (session.getQaTree() != null && session.getQaTree().getRoot() != null) { - // 已存在会话,返回根节点ID(因为qaTree只有根节点) - String rootNodeId = session.getQaTree().getRoot().getId(); - connectionData.put("nodeId", rootNodeId); - log.info("已存在会话返回根节点ID: {} - 会话: {}", rootNodeId, sessionId); - - // 返回qaTree - try { - String qaTreeJson = io.github.timemachinelab.util.QaTreeSerializeUtil.serialize(session.getQaTree()); - connectionData.put("qaTree", qaTreeJson); - } catch (Exception e) { - log.error("序列化qaTree失败: {}", e.getMessage()); - } - } else { - // 兜底情况,返回根节点ID - connectionData.put("nodeId", "1"); - log.info("兜底返回根节点ID: 1 - 会话: {}", sessionId); - } - - sseNotificationService.sendWelcomeMessage(sessionId, connectionData); - + + sseNotificationService.sendWelcomeMessage(fingerprint, connectionData); + // 设置连接事件处理 - String finalSessionId = sessionId; emitter.onCompletion(() -> { - log.info("SSE连接完成: {}", finalSessionId); + log.info("SSE连接完成: {}", fingerprint); }); emitter.onTimeout(() -> { - log.info("SSE连接超时: {}", finalSessionId); - sseNotificationService.removeSseConnection(finalSessionId); + log.info("SSE连接超时: {}", fingerprint); + sseNotificationService.removeSseConnection(fingerprint); }); - + emitter.onError((ex) -> { - log.error("SSE连接错误: {} - {}", finalSessionId, ex.getMessage()); - sseNotificationService.removeSseConnection(finalSessionId); + log.error("SSE连接错误: {} - {}", fingerprint, ex.getMessage()); + sseNotificationService.removeSseConnection(fingerprint); }); - + return emitter; - + } catch (Exception e) { log.error("建立SSE连接失败: {}", e.getMessage()); SseEmitter errorEmitter = new SseEmitter(Long.MAX_VALUE); @@ -141,14 +122,14 @@ public SseEmitter streamConversation(@RequestParam(required = false) String sess /** * 重试接口 - * + * * @param request 重试请求参数 * @return 重试结果 */ @PostMapping("/retry") public ResponseEntity> retry(@Valid @RequestBody RetryRequest request) { try { - log.info("收到重试请求 - nodeId: {}, sessionId: {}, whyretry: {}", + log.info("收到重试请求 - nodeId: {}, sessionId: {}, whyretry: {}", request.getNodeId(), request.getSessionId(), request.getWhyretry()); // 使用应用服务验证节点存在性 @@ -157,7 +138,7 @@ public ResponseEntity> retry(@Valid @RequestBody RetryR log.warn("节点不存在 - nodeId: {}, sessionId: {}", request.getNodeId(), request.getSessionId()); return ResponseEntity.badRequest().body(ApiResult.error("指定的节点不存在")); } - + // 使用应用服务获取问题内容 String question = sessionManagementService.getNodeQuestion(request.getSessionId(), request.getNodeId()); if (question == null) { @@ -171,14 +152,14 @@ public ResponseEntity> retry(@Valid @RequestBody RetryR log.warn("会话不存在 - sessionId: {}", request.getSessionId()); return ResponseEntity.badRequest().body(ApiResult.error("会话不存在")); } - + // 移除要重试的节点(AI会基于parentId重新创建节点) boolean nodeRemoved = sessionManagementService.removeNode(request.getSessionId(), request.getNodeId()); if (!nodeRemoved) { - log.warn("移除节点失败,但继续处理重试 - sessionId: {}, nodeId: {}", + log.warn("移除节点失败,但继续处理重试 - sessionId: {}, nodeId: {}", request.getSessionId(), request.getNodeId()); } - + // 使用MessageProcessingService处理重试消息 String processedMessage = messageProcessingService.processRetryMessage( request.getSessionId(), @@ -186,10 +167,10 @@ public ResponseEntity> retry(@Valid @RequestBody RetryR request.getWhyretry(), session ); - + // 发送处理后的消息给AI服务 messageProcessingService.processAndSendMessage(session, processedMessage); - + // 构建响应数据 RetryResponse response = RetryResponse.builder() .nodeId(request.getNodeId()) @@ -197,12 +178,12 @@ public ResponseEntity> retry(@Valid @RequestBody RetryR .whyretry(request.getWhyretry()) .processTime(System.currentTimeMillis()) .build(); - - log.info("重试请求处理成功 - nodeId: {}, sessionId: {}", + + log.info("重试请求处理成功 - nodeId: {}, sessionId: {}", request.getNodeId(), request.getSessionId()); - + return ResponseEntity.ok(ApiResult.success("重试请求处理成功", response)); - + } catch (Exception e) { log.error("重试请求处理失败: {}", e.getMessage(), e); return ResponseEntity.badRequest().body(ApiResult.serverError("重试请求处理失败: " + e.getMessage())); @@ -210,87 +191,114 @@ public ResponseEntity> retry(@Valid @RequestBody RetryR } /** - * 处理统一答案请求 + * 处理统一答案请求(基于用户指纹) * 支持单选、多选、输入框、表单等多种问题类型的回答 + * 逻辑: 如果没有带sessionId,默认就是新建对话,不需要传入NodeId + * 如果带了sessionId就不是新建会话,需要nodeId + * + * @param request 统一答案请求 + * @param httpRequest HTTP请求对象 + * @return 处理结果 */ @PostMapping("/message") - public ResponseEntity processAnswer(@Validated @RequestBody UnifiedAnswerRequest request) { + public ResponseEntity processAnswer(@Validated @RequestBody UnifiedAnswerRequest request, + HttpServletRequest httpRequest) { try { log.info("接收到答案请求 - 会话ID: {}, 节点ID: {}, 问题类型: {}", request.getSessionId(), request.getNodeId(), request.getQuestionType()); - // 2. 会话管理和验证 - String userId = request.getUserId(); - if (userId == null || userId.trim().isEmpty()) { - log.warn("缺少必需的userId参数"); - return ResponseEntity.badRequest().body("userId参数是必需的"); + // 1. 获取和验证用户指纹 + String fingerprint = request.getUserId(); // 假设前端传递的userId实际上是指纹 + if (fingerprint == null || fingerprint.trim().isEmpty()) { + log.warn("缺少必需的用户指纹参数"); + return ResponseEntity.badRequest().body("用户指纹参数是必需的"); } - // 3. 验证会话是否存在 - ConversationSession session = sessionManagementService.validateAndGetSession(userId, request.getSessionId()); - if (session == null) { - log.warn("会话不存在或无效 - 用户ID: {}, 会话ID: {}", userId, request.getSessionId()); - return ResponseEntity.badRequest().body("会话不存在或无效"); + // 2. 验证指纹是否存在 + UserFingerprint userFingerprint = fingerprintService.getUserFingerprintByFingerprint(fingerprint); + if (userFingerprint == null) { + log.warn("无效的用户指纹: {}", fingerprint); + // 尝试重新生成指纹 + userFingerprint = fingerprintService.getOrCreateUserFingerprint(httpRequest); + fingerprint = userFingerprint.getFingerprint(); + log.info("为无效指纹重新生成: {}", fingerprint); } - // 4. nodeId验证逻辑 - String nodeId = request.getNodeId(); - if (nodeId == null || nodeId.trim().isEmpty()) { - // nodeId为空,表示这是新建会话的第一个问题 - if (session.getQaTree() != null && session.getQaTree().getRoot() != null) { - log.warn("会话已存在qaTree,但nodeId为空 - 会话: {}", session.getSessionId()); - return ResponseEntity.badRequest().body("现有会话必须提供nodeId"); + // 4. 根据sessionId是否为空判断新建还是继续会话 + String sessionId = request.getSessionId(); + boolean isNewConversation = (sessionId == null || sessionId.trim().isEmpty()); + + ConversationSession session; + + if (isNewConversation) { + // 新建对话,不需要nodeId + log.info("新建对话 - 用户指纹: {}", fingerprint); + + // 创建新会话 + session = sessionManagementService.createNewSession(fingerprint); + sessionId = session.getSessionId(); + + log.info("已创建新会话 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + + // 新对话不需要验证nodeId,直接跳过验证 + } else { + // 继续现有对话,需要验证sessionId和nodeId + log.info("继续现有对话 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + + // 验证会话是否存在 + session = sessionManagementService.validateAndGetSession(fingerprint, sessionId); + if (session == null) { + log.warn("会话不存在或无效 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + return ResponseEntity.badRequest().body("会话不存在或无效"); } - log.info("新建会话的第一个问题 - 会话: {}", session.getSessionId()); - } else if ("1".equals(nodeId)) { - // nodeId为'1',表示这是根节点的回答 - if (session.getQaTree() == null || session.getQaTree().getRoot() == null) { - log.info("根节点回答,但qaTree未初始化 - 会话: {}", session.getSessionId()); - // 允许继续处理,后续会创建qaTree - } else { - log.info("根节点回答 - 会话: {}", session.getSessionId()); + + // 验证nodeId + String nodeId = request.getNodeId(); + if (nodeId == null || nodeId.trim().isEmpty()) { + log.warn("继续会话必须提供nodeId - 会话ID: {}", sessionId); + return ResponseEntity.badRequest().body("继续会话必须提供nodeId"); } - } else { - // nodeId不为空且不是'root',验证是否属于该会话 - if (!sessionManagementService.validateNodeId(session.getSessionId(), nodeId)) { - log.warn("无效的节点ID - 会话: {}, 节点: {}", session.getSessionId(), nodeId); + + // 验证nodeId是否属于该会话 + if (!sessionManagementService.validateNodeId(sessionId, nodeId)) { + log.warn("无效的节点ID - 会话: {}, 节点: {}", sessionId, nodeId); return ResponseEntity.badRequest().body("无效的节点ID"); } - log.info("更新现有节点 - 会话: {}, 节点: {}", session.getSessionId(), nodeId); + + log.info("验证通过 - 会话: {}, 节点: {}", sessionId, nodeId); } - // 3. 验证答案格式 + // 5. 验证答案格式 if (!messageProcessingService.validateAnswer(request)) { log.warn("答案格式验证失败: {}", request); return ResponseEntity.badRequest().body("答案格式不正确"); } - // 答案更新逻辑已在MessageProcessingService中处理 - - // 4. 处理答案并转换为消息 + // 6. 处理答案并转换为消息 String processedMessage = messageProcessingService.preprocessMessage( null, // 没有额外的原始消息 request, session ); - // 5. 发送处理后的消息给AI服务 + // 7. 发送处理后的消息给AI服务 messageProcessingService.processAndSendMessage(session, processedMessage); - return ResponseEntity.ok("答案处理成功"); } catch (Exception e) { - log.error("处理答案失败 - 会话ID: {}, 错误: {}", request.getSessionId(), e.getMessage(), e); + log.error("处理答案失败 - 会话ID: {}, 错误: {}", + (request.getSessionId() != null ? request.getSessionId() : "新建会话"), + e.getMessage(), e); return ResponseEntity.internalServerError().body("答案处理失败: " + e.getMessage()); } } @GetMapping("/gen-prompt") public void genPrompt(@RequestParam String sessionId) { - + } From 5c390f1bd8df4c9266238afbc26efc328bb478bb Mon Sep 17 00:00:00 2001 From: welsir <1824379011@qq.com> Date: Mon, 25 Aug 2025 08:11:56 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E6=9E=84=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/UserInteractionController.java | 29 +- .../timemachinelab/core/qatree/QaTree.java | 16 + .../core/qatree/QaTreeDomain.java | 10 + .../application/SessionManagementService.java | 11 + .../application/SseNotificationService.java | 299 ++++++++++++------ .../impl/DefaultMessageProcessingService.java | 7 +- .../web/dto/GenPromptRequest.java | 7 +- .../web/dto/UnifiedAnswerRequest.java | 6 +- .../src/services/conversationApi.ts | 14 +- 9 files changed, 269 insertions(+), 130 deletions(-) diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java index d72e48d..0253537 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java @@ -213,21 +213,16 @@ public ResponseEntity processAnswer(@Validated @RequestBody UnifiedAnswe request.getNodeId(), request.getQuestionType()); - // 1. 获取和验证用户指纹 - String fingerprint = request.getUserId(); // 假设前端传递的userId实际上是指纹 - if (fingerprint == null || fingerprint.trim().isEmpty()) { - log.warn("缺少必需的用户指纹参数"); - return ResponseEntity.badRequest().body("用户指纹参数是必需的"); - } - - // 2. 验证指纹是否存在 - UserFingerprint userFingerprint = fingerprintService.getUserFingerprintByFingerprint(fingerprint); - if (userFingerprint == null) { - log.warn("无效的用户指纹: {}", fingerprint); - // 尝试重新生成指纹 - userFingerprint = fingerprintService.getOrCreateUserFingerprint(httpRequest); - fingerprint = userFingerprint.getFingerprint(); - log.info("为无效指纹重新生成: {}", fingerprint); + // 1. 从请求头获取和验证用户指纹(与SSE连接保持一致) + UserFingerprint userFingerprint = fingerprintService.getOrCreateUserFingerprint(httpRequest); + String fingerprint = userFingerprint.getFingerprint(); + + log.info("用户指纹: {}, 访问次数: {}", fingerprint, userFingerprint.getVisitCount()); + + // 2. 验证请求体中的userId是否与指纹匹配(可选验证) + String requestUserId = request.getUserId(); + if (requestUserId != null && !requestUserId.equals(fingerprint)) { + log.warn("请求体中的userId({})与请求头指纹({})不匹配,以请求头指纹为准", requestUserId, fingerprint); } // 4. 根据sessionId是否为空判断新建还是继续会话 @@ -282,7 +277,7 @@ public ResponseEntity processAnswer(@Validated @RequestBody UnifiedAnswe // 6. 处理答案并转换为消息 String processedMessage = messageProcessingService.preprocessMessage( - null, // 没有额外的原始消息 + null, request, session ); @@ -301,7 +296,7 @@ public ResponseEntity processAnswer(@Validated @RequestBody UnifiedAnswe } @PostMapping("/gen-prompt") - public ResponseEntity genPrompt(@RequestBody GenPromptRequest request) { + public ResponseEntity genPrompt(@RequestBody GenPromptRequest request) { GenPromptOperation.GpResponse gpResponse = new GenPromptOperation.GpResponse(); gpResponse.setGenPrompt(AllPrompt.GEN_PROMPT_AGENT_PROMPT); sseNotificationService.sendWelcomeMessage(request.getSessionId(), JSON.toJSONString(gpResponse)); diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTree.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTree.java index 9cd794b..e183831 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTree.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTree.java @@ -30,6 +30,22 @@ public QaTreeNode getNodeById(String id) { return nodeMap.get(id); } + /** + * 获取节点映射表(用于调试和验证) + * @return 节点映射表 + */ + public Map getNodeMap() { + return new HashMap<>(nodeMap); // 返回副本保证封装性 + } + + /** + * 获取节点数量 + * @return 节点数量 + */ + public int getNodeCount() { + return nodeMap.size(); + } + /** * 移除指定节点及其所有子节点 * @param nodeId 要移除的节点ID diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTreeDomain.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTreeDomain.java index 037740c..e9497f9 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTreeDomain.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/qatree/QaTreeDomain.java @@ -122,6 +122,16 @@ public boolean nodeExists(QaTree tree, String nodeId) { return tree.getNodeById(nodeId) != null; } + /** + * 验证节点是否存在(别名方法) + * @param tree QA树 + * @param nodeId 节点ID + * @return 节点是否存在 + */ + public boolean containsNode(QaTree tree, String nodeId) { + return nodeExists(tree, nodeId); + } + /** * 移除指定节点及其所有子节点 * @param tree QA树 diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java index 715799c..b5aea76 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java @@ -92,6 +92,17 @@ public List getUserSessions(String userId) { .collect(Collectors.toList()); } + /** + * 获取用户的所有会话ID列表 + * + * @param userId 用户ID + * @return 会话ID列表 + */ + public List getUserSessionIds(String userId) { + List sessionIds = userSessionMap.get(userId); + return sessionIds != null ? new ArrayList<>(sessionIds) : new ArrayList<>(); + } + /** * 获取用户最新的会话(最后创建的会话) * diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java index 571566d..c7ac83b 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java @@ -17,41 +17,52 @@ /** * SSE通知服务 * 负责管理SSE连接和发送消息给客户端 - * + * * @author suifeng * 日期: 2025/1/27 */ @Service @Slf4j public class SseNotificationService { - + @Resource private SessionManagementService sessionManagementService; @Resource private QaTreeDomain qaTreeDomain; - - // SSE连接管理 + + // SSE连接管理 - 基于用户指纹的一对一关系 private final Map sseEmitters = new ConcurrentHashMap<>(); - + /** * 注册SSE连接 - * - * @param sessionId 会话ID + * + * @param fingerprint 用户指纹 * @param emitter SSE发射器 */ - public void registerSseConnection(String sessionId, SseEmitter emitter) { - sseEmitters.put(sessionId, emitter); - log.info("SSE连接已注册 - 会话: {}", sessionId); + public void registerSseConnection(String fingerprint, SseEmitter emitter) { + // 如果用户已有连接,先移除旧连接 + SseEmitter oldEmitter = sseEmitters.get(fingerprint); + if (oldEmitter != null) { + try { + oldEmitter.complete(); + log.info("关闭旧SSE连接 - 用户指纹: {}", fingerprint); + } catch (Exception e) { + log.warn("关闭旧SSE连接失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); + } + } + + sseEmitters.put(fingerprint, emitter); + log.info("SSE连接已注册 - 用户指纹: {}", fingerprint); } - + /** * 移除SSE连接 - * - * @param sessionId 会话ID + * + * @param fingerprint 用户指纹 */ - public void removeSseConnection(String sessionId) { - sseEmitters.remove(sessionId); - log.info("SSE连接已移除 - 会话: {}", sessionId); + public void removeSseConnection(String fingerprint) { + sseEmitters.remove(fingerprint); + log.info("SSE连接已移除 - 用户指纹: {}", fingerprint); } /** @@ -61,123 +72,227 @@ public void removeSseConnection(String sessionId) { * @param response 消息响应对象 */ public void sendSseMessage(String sessionId, QuestionGenerationOperation.QuestionGenerationResponse response) { - SseEmitter emitter = sseEmitters.get(sessionId); - if (emitter != null) { - try { - String currentNodeId = null; + // 通过sessionId找到对应的用户指纹 + ConversationSession session = sessionManagementService.getSessionById(sessionId); + + // 验证输入参数 + if (session == null) { + log.warn("会话不存在 - 会话ID: {}", sessionId); + return; + } + + if (response == null) { + log.warn("AI响应对象为空 - 会话: {}", sessionId); + // 发送错误消息 + sendErrorMessage(session, "AI响应对象为空", "RESPONSE_NULL"); + return; + } + + String parentId = response.getParentId(); + if (parentId == null || parentId.trim().isEmpty()) { + log.warn("父节点ID为空 - 会话: {}", sessionId); + // 发送错误消息 + sendErrorMessage(session, "父节点ID为空", "PARENT_ID_EMPTY"); + return; + } + + // 验证QaTree是否存在 + QaTree qaTree = session.getQaTree(); + if (qaTree == null) { + log.warn("QaTree不存在 - 会话: {}, 无法验证父节点: {}", sessionId, parentId); + // 发送错误消息 + sendErrorMessage(session, "QaTree不存在", "QATREE_NOT_FOUND"); + return; + } + + // 验证父节点是否存在 + if (!qaTreeDomain.containsNode(qaTree, parentId)) { + log.warn("父节点不存在 - 会话: {}, parentId: {}, 当前QaTree节点数量: {}", + sessionId, parentId, qaTree.getNodeCount()); + + // 记录QaTree的所有节点ID以便调试 + if (qaTree.getNodeCount() > 0) { + log.info("当前QaTree中的所有节点ID: {}", qaTree.getNodeMap().keySet()); + } + + // 发送错误消息 + sendErrorMessage(session, "父节点不存在", "PARENT_NODE_NOT_FOUND"); + return; + } + + // 获取用户指纹和SSE连接 + String fingerprint = session.getUserId(); // userId就是指纹 + SseEmitter emitter = sseEmitters.get(fingerprint); + if (emitter == null) { + log.warn("SSE连接不存在 - 会话: {}, 用户指纹: {}, 当前连接数: {}", + sessionId, fingerprint, sseEmitters.size()); + // 可以考虑将消息缓存或记录到数据库,以便用户重连后可以收到 + log.info("考虑将消息缓存以便后续处理 - 问题: {}", + response.getQuestion() != null ? response.getQuestion().getQuestion() : "null"); + // 发送错误消息 + sendErrorMessage(session, "SSE连接不存在", "SSE_CONNECTION_NOT_FOUND"); + return; + } + + try { + String currentNodeId = null; + + log.info("开始处理AI问题 - 会话: {}, 父节点: {}, 问题类型: {}", + sessionId, parentId, response.getQuestion() != null ? response.getQuestion().getType() : "null"); + + // 1. 先将AI生成的新问题添加到QaTree(只填入question,answer留空) + if (response.getQuestion() != null) { + // 使用QaTreeDomain添加新节点,answer字段会自动为空 + qaTreeDomain.appendNode(qaTree, parentId, response.getQuestion(), session); - // 1. 先将AI生成的新问题添加到QaTree(只填入question,answer留空) - ConversationSession session = sessionManagementService.getSessionById(sessionId); - if (session != null && session.getQaTree() != null && response.getQuestion() != null) { - // 使用QaTreeDomain添加新节点,answer字段会自动为空 - // appendNode方法内部会调用session.getNextNodeId()获取新节点ID - QaTree qaTree = qaTreeDomain.appendNode( - session.getQaTree(), - response.getParentId(), - response.getQuestion(), - session - ); - - // 获取刚刚创建的节点ID(当前计数器的值) - currentNodeId = String.valueOf(session.getNodeIdCounter().get()); - - log.info("AI问题已添加到QaTree - 会话: {}, 父节点: {}, 新节点ID: {}, 问题类型: {}", - sessionId, response.getParentId(), currentNodeId, response.getQuestion().getType()); - } else { - log.warn("无法添加问题到QaTree - 会话: {}, session存在: {}, qaTree存在: {}, question存在: {}", - sessionId, session != null, - session != null && session.getQaTree() != null, - response.getQuestion() != null); - } + // 获取刚刚创建的节点ID(当前计数器的值) + currentNodeId = String.valueOf(session.getNodeIdCounter().get()); - // 2. 创建修改后的响应对象,包含currentNodeId和parentNodeId - Map modifiedResponse = new HashMap<>(); - modifiedResponse.put("question", response.getQuestion()); - modifiedResponse.put("currentNodeId", currentNodeId != null ? currentNodeId : response.getParentId()); - modifiedResponse.put("parentNodeId", response.getParentId()); + log.info("AI问题已添加到QaTree - 会话: {}, 父节点: {}, 新节点ID: {}, 问题类型: {}", + sessionId, parentId, currentNodeId, response.getQuestion().getType()); + } else { + log.warn("AI响应中问题对象为空 - 会话: {}", sessionId); + } + + // 2. 创建修改后的响应对象,包含currentNodeId和parentNodeId + Map modifiedResponse = new HashMap<>(); + modifiedResponse.put("question", response.getQuestion()); + modifiedResponse.put("currentNodeId", currentNodeId != null ? currentNodeId : parentId); + modifiedResponse.put("parentNodeId", parentId); + + // 3. 发送SSE消息给前端 + emitter.send(SseEmitter.event() + .name("message") + .data(modifiedResponse)); + log.info("SSE消息发送成功 - 会话: {}, 当前节点ID: {}", sessionId, currentNodeId); + + } catch (IOException e) { + log.error("SSE消息发送失败 - 会话: {}, 用户指纹: {}, 错误: {}", sessionId, fingerprint, e.getMessage()); + sseEmitters.remove(fingerprint); + // 发送错误消息 + sendErrorMessage(session, "SSE消息发送失败: " + e.getMessage(), "SSE_SEND_FAILED"); + } catch (Exception e) { + log.error("添加问题到QaTree失败 - 会话: {}, 错误: {}", sessionId, e.getMessage()); + // 即使QaTree更新失败,仍然发送SSE消息给前端 + try { + Map fallbackResponse = new HashMap<>(); + fallbackResponse.put("question", response.getQuestion()); + fallbackResponse.put("currentNodeId", parentId); // 使用parentId作为fallback + fallbackResponse.put("parentNodeId", parentId); - // 3. 发送SSE消息给前端 emitter.send(SseEmitter.event() .name("message") - .data(modifiedResponse)); - log.info("SSE消息发送成功 - 会话: {}, 当前节点ID: {}", sessionId, currentNodeId); - } catch (IOException e) { - log.error("SSE消息发送失败 - 会话: {}, 错误: {}", sessionId, e.getMessage()); - sseEmitters.remove(sessionId); - } catch (Exception e) { - log.error("添加问题到QaTree失败 - 会话: {}, 错误: {}", sessionId, e.getMessage()); - // 即使QaTree更新失败,仍然发送SSE消息给前端 - try { - Map fallbackResponse = new HashMap<>(); - fallbackResponse.put("question", response.getQuestion()); - fallbackResponse.put("currentNodeId", response.getParentId()); // 使用parentId作为fallback - fallbackResponse.put("parentNodeId", response.getParentId()); - - emitter.send(SseEmitter.event() - .name("message") - .data(fallbackResponse)); - log.info("SSE消息发送成功(QaTree更新失败但消息已发送) - 会话: {}", sessionId); - } catch (IOException ioException) { - log.error("SSE消息发送失败 - 会话: {}, 错误: {}", sessionId, ioException.getMessage()); - sseEmitters.remove(sessionId); - } + .data(fallbackResponse)); + log.info("SSE消息发送成功(QaTree更新失败但消息已发送) - 会话: {}", sessionId); + } catch (IOException ioException) { + log.error("SSE消息发送失败 - 会话: {}, 用户指纹: {}, 错误: {}", sessionId, fingerprint, ioException.getMessage()); + sseEmitters.remove(fingerprint); + // 发送错误消息 + sendErrorMessage(session, "SSE消息发送失败: " + ioException.getMessage(), "SSE_SEND_FAILED"); } - } else { - log.warn("SSE连接不存在 - 会话: {}", sessionId); } } - + /** * 获取SSE连接状态 - * + * * @return 连接状态信息 */ public Map getSseStatus() { Map status = new ConcurrentHashMap<>(); - status.put("connectedSessions", sseEmitters.keySet()); + status.put("connectedFingerprints", sseEmitters.keySet()); // 改为显示指纹列表 status.put("totalConnections", sseEmitters.size()); status.put("timestamp", System.currentTimeMillis()); return status; } - + /** * 发送欢迎消息 - * - * @param sessionId 会话ID + * + * @param fingerprint 用户指纹 * @param message 欢迎消息内容 */ - public void sendWelcomeMessage(String sessionId, String message) { - SseEmitter emitter = sseEmitters.get(sessionId); + public void sendWelcomeMessage(String fingerprint, String message) { + SseEmitter emitter = sseEmitters.get(fingerprint); if (emitter != null) { try { emitter.send(SseEmitter.event() .name("connected") .data(message)); - log.info("欢迎消息发送成功 - 会话: {}", sessionId); + log.info("欢迎消息发送成功 - 用户指纹: {}", fingerprint); } catch (IOException e) { - log.error("欢迎消息发送失败 - 会话: {}, 错误: {}", sessionId, e.getMessage()); - sseEmitters.remove(sessionId); + log.error("欢迎消息发送失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); + sseEmitters.remove(fingerprint); } } } - + /** * 发送连接数据 - * - * @param sessionId 会话ID + * + * @param fingerprint 用户指纹 * @param connectionData 连接数据 */ - public void sendWelcomeMessage(String sessionId, Map connectionData) { - SseEmitter emitter = sseEmitters.get(sessionId); + public void sendWelcomeMessage(String fingerprint, Map connectionData) { + SseEmitter emitter = sseEmitters.get(fingerprint); if (emitter != null) { try { emitter.send(SseEmitter.event() .name("connected") .data(connectionData)); - log.info("连接数据发送成功 - 会话: {}", sessionId); + log.info("连接数据发送成功 - 用户指纹: {}", fingerprint); + } catch (IOException e) { + log.error("连接数据发送失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); + sseEmitters.remove(fingerprint); + } + } + } + + /** + * 发送错误消息给客户端 + * 确保错误时能够立即响应,不会卡住 + * + * @param session 会话对象 + * @param errorMessage 错误消息 + * @param errorCode 错误代码 + */ + private void sendErrorMessage(ConversationSession session, String errorMessage, String errorCode) { + if (session == null) { + log.warn("无法发送错误消息,会话对象为空"); + return; + } + + String fingerprint = session.getUserId(); + sendErrorMessage(fingerprint, errorMessage, errorCode); + } + + /** + * 发送错误消息给客户端 + * 确保错误时能够立即响应,不会卡住 + * + * @param fingerprint 用户指纹 + * @param errorMessage 错误消息 + * @param errorCode 错误代码 + */ + private void sendErrorMessage(String fingerprint, String errorMessage, String errorCode) { + SseEmitter emitter = sseEmitters.get(fingerprint); + if (emitter != null) { + try { + Map errorResponse = new HashMap<>(); + errorResponse.put("error", true); + errorResponse.put("errorCode", errorCode); + errorResponse.put("errorMessage", errorMessage); + errorResponse.put("timestamp", System.currentTimeMillis()); + + emitter.send(SseEmitter.event() + .name("error") + .data(errorResponse)); + + log.info("错误消息已发送 - 用户指纹: {}, 错误代码: {}, 错误消息: {}", fingerprint, errorCode, errorMessage); } catch (IOException e) { - log.error("连接数据发送失败 - 会话: {}, 错误: {}", sessionId, e.getMessage()); - sseEmitters.remove(sessionId); + log.error("发送错误消息失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); + // 如果发送错误消息失败,移除连接 + sseEmitters.remove(fingerprint); } } } diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java index 0463485..1a2564f 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java @@ -76,10 +76,6 @@ private void updateQaTreeWithAnswer(ConversationSession session, UnifiedAnswerRe } String nodeId = request.getNodeId(); - // 如果nodeId为'1'(根节点),使用根节点ID - if ("1".equals(nodeId) && qaTree.getRoot() != null) { - nodeId = qaTree.getRoot().getId(); - } // 根据问题类型准备答案数据 Object answerData = prepareAnswerData(request); @@ -118,6 +114,9 @@ private Object prepareAnswerData(UnifiedAnswerRequest request) { public String preprocessMessage(String originalMessage, UnifiedAnswerRequest answerRequest,ConversationSession conversationSession) { try { + updateQaTreeWithAnswer(conversationSession, answerRequest); + + // 构建输入JSON JSONObject object = new JSONObject(); object.put("prompt",AllPrompt.GLOBAL_PROMPT); object.put("tree", QaTreeSerializeUtil.serialize(conversationSession.getQaTree())); diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java index a5dbf9e..8c728ce 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java @@ -13,13 +13,10 @@ @AllArgsConstructor public class GenPromptRequest { - - /** - * 会话ID - */ - @NotBlank(message = "会话ID不能为空") private String sessionId; + private String nodeId; + /** * 原始答案数据 diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java index 12755ee..c3ddc38 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java @@ -25,7 +25,6 @@ public class UnifiedAnswerRequest { /** * 会话ID */ - @NotBlank(message = "会话ID不能为空") private String sessionId; /** @@ -49,10 +48,13 @@ public class UnifiedAnswerRequest { private Object answer; /** - * 额外的上下文信息 + * 额外的上下文信息 用户画像 */ private Map context; + private String userPortrait; + private String aiModel; + /** * 用户ID */ diff --git a/prompto-lab-ui/src/services/conversationApi.ts b/prompto-lab-ui/src/services/conversationApi.ts index 7a6838b..ed3175f 100644 --- a/prompto-lab-ui/src/services/conversationApi.ts +++ b/prompto-lab-ui/src/services/conversationApi.ts @@ -211,17 +211,11 @@ export const processAnswer = async (request: UnifiedAnswerRequest): Promise void, onError?: (error: Event) => void): EventSource => { - // 构建查询参数 - const params = new URLSearchParams() - if (sessionId) { - params.append('sessionId', sessionId) - } - params.append('userId', userId) - - const url = `${USER_INTERACTION_BASE}/sse?${params.toString()}` +export const connectUserInteractionSSE = (onMessage: (response: any) => void, onError?: (error: Event) => void): EventSource => { + // 后端SSE接口不需要参数,会自动通过HttpServletRequest生成或获取用户指纹 + const url = `${USER_INTERACTION_BASE}/sse` const eventSource = new EventSource(url) // 监听连接建立事件 From 62b3b6ab9dcf0d7da59e3a851d9ae56996ea96fb Mon Sep 17 00:00:00 2001 From: welsir <1824379011@qq.com> Date: Mon, 25 Aug 2025 22:04:19 +0800 Subject: [PATCH 3/8] 123 --- .../controller/GlobalExceptionHandler.java | 75 ++ .../controller/SSEDemoController.java | 170 ----- .../controller/UserInteractionController.java | 324 ++++----- .../core/fingerprint/FingerprintService.java | 246 +++++++ .../core/fingerprint/UserFingerprint.java | 64 ++ .../application/MessageProcessingService.java | 11 +- .../application/RetryProcessingService.java | 120 ++++ .../application/SessionManagementService.java | 2 +- .../application/SessionProcessingService.java | 99 +++ .../application/SseNotificationService.java | 227 +----- .../application/SseValidationService.java | 138 ++++ .../impl/DefaultMessageProcessingService.java | 162 +++-- .../domain/entity/ConversationSession.java | 28 +- .../web/dto/GenPromptRequest.java | 4 - .../web/dto/UnifiedAnswerRequest.java | 10 - .../src/components/Chat/AIChatPage.vue | 647 ++++++++++++++++-- .../src/components/Chat/QuestionRenderer.vue | 46 +- prompto-lab-ui/src/services/apiUtils.ts | 46 +- .../src/services/conversationApi.ts | 38 +- .../src/services/userInteractionApi.ts | 81 +-- 20 files changed, 1693 insertions(+), 845 deletions(-) create mode 100644 prompto-lab-app/src/main/java/io/github/timemachinelab/controller/GlobalExceptionHandler.java delete mode 100644 prompto-lab-app/src/main/java/io/github/timemachinelab/controller/SSEDemoController.java create mode 100644 prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/FingerprintService.java create mode 100644 prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/UserFingerprint.java create mode 100644 prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/RetryProcessingService.java create mode 100644 prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionProcessingService.java create mode 100644 prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseValidationService.java diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/GlobalExceptionHandler.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/GlobalExceptionHandler.java new file mode 100644 index 0000000..e38d8c5 --- /dev/null +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/GlobalExceptionHandler.java @@ -0,0 +1,75 @@ +package io.github.timemachinelab.controller; + +import io.github.timemachinelab.core.session.application.SseValidationService; +import io.github.timemachinelab.core.session.application.SessionException; +import io.github.timemachinelab.entity.resp.ApiResult; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +import javax.annotation.Resource; +import javax.servlet.http.HttpServletRequest; + +/** + * 全局异常处理器 + * 统一处理应用中的异常 + * + * @author welsir + * @date 2025/1/20 + */ +@Slf4j +@RestControllerAdvice +public class GlobalExceptionHandler { + + @Resource + private SseValidationService sseValidationService; + + /** + * 处理SSE验证异常 + * + * @param e SSE验证异常 + * @param request HTTP请求对象 + * @return 错误响应 + */ + @ExceptionHandler(SessionException.class) + public ResponseEntity handleSessionException(SessionException e, HttpServletRequest request) { + log.warn("SSE验证失败: {}", e.getMessage()); + + // 尝试通过SSE发送错误消息 + sseValidationService.sendSseErrorIfConnected(request, e.getMessage()); + + // 根据请求路径返回不同格式的响应 + String requestPath = request.getRequestURI(); + if (requestPath.contains("/retry")) { + return ResponseEntity.badRequest().body(ApiResult.error(e.getMessage())); + } else { + return ResponseEntity.badRequest().body(e.getMessage()); + } + } + + + + /** + * 处理通用异常 + * + * @param e 异常 + * @param request HTTP请求对象 + * @return 错误响应 + */ + @ExceptionHandler(Exception.class) + public ResponseEntity handleGeneralException(Exception e, HttpServletRequest request) { + log.error("系统异常: {}", e.getMessage(), e); + + // 尝试通过SSE发送错误消息 + sseValidationService.sendSseErrorIfConnected(request, "系统异常,请重试"); + + // 根据请求路径返回不同格式的响应 + String requestPath = request.getRequestURI(); + if (requestPath.contains("/retry")) { + return ResponseEntity.internalServerError().body(ApiResult.error("系统异常: " + e.getMessage())); + } else { + return ResponseEntity.internalServerError().body("系统异常,请重试"); + } + } +} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/SSEDemoController.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/SSEDemoController.java deleted file mode 100644 index 0fa6ad3..0000000 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/SSEDemoController.java +++ /dev/null @@ -1,170 +0,0 @@ -package io.github.timemachinelab.controller; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.Map; - -@RestController -@RequestMapping("/api/demo") -@Slf4j -public class SSEDemoController { - - private final Map emitters = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - /** - * 建立SSE连接 - */ - @GetMapping(value = "/sse/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public SseEmitter connect(@PathVariable String clientId) { - log.info("客户端连接SSE: {}", clientId); - - SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); - emitters.put(clientId, emitter); - - // 连接建立时发送欢迎消息 - try { - emitter.send(SseEmitter.event() - .name("connected") - .data("SSE连接已建立,客户端ID: " + clientId)); - } catch (IOException e) { - log.error("发送欢迎消息失败: {}", e.getMessage()); - } - - // 设置连接事件处理 - emitter.onCompletion(() -> { - log.info("SSE连接完成: {}", clientId); - emitters.remove(clientId); - }); - - emitter.onTimeout(() -> { - log.info("SSE连接超时: {}", clientId); - emitters.remove(clientId); - }); - - emitter.onError((ex) -> { - log.error("SSE连接错误: {} - {}", clientId, ex.getMessage()); - emitters.remove(clientId); - }); - - return emitter; - } - - /** - * 发送消息到指定客户端 - */ - @PostMapping("/send/{clientId}") - public String sendMessage(@PathVariable String clientId, @RequestParam String message) { - SseEmitter emitter = emitters.get(clientId); - - if (emitter == null) { - return "客户端未连接: " + clientId; - } - - try { - emitter.send(SseEmitter.event() - .name("message") - .data(message)); - - log.info("消息已发送到客户端 {}: {}", clientId, message); - return "消息发送成功"; - - } catch (IOException e) { - log.error("发送消息失败: {}", e.getMessage()); - emitters.remove(clientId); - return "发送失败: " + e.getMessage(); - } - } - - /** - * 广播消息到所有连接的客户端 - */ - @PostMapping("/broadcast") - public String broadcast(@RequestParam String message) { - int successCount = 0; - int failCount = 0; - - for (Map.Entry entry : emitters.entrySet()) { - try { - entry.getValue().send(SseEmitter.event() - .name("broadcast") - .data(message)); - successCount++; - } catch (IOException e) { - log.error("广播消息失败,客户端: {} - {}", entry.getKey(), e.getMessage()); - emitters.remove(entry.getKey()); - failCount++; - } - } - - return String.format("广播完成 - 成功: %d, 失败: %d", successCount, failCount); - } - - /** - * 真实流式数据传输 - SSE本身就是流式的 - * 这里演示连续推送数据流的场景 - */ - @PostMapping("/stream/{clientId}") - public String startStream(@PathVariable String clientId) { - SseEmitter emitter = emitters.get(clientId); - - if (emitter == null) { - return "客户端未连接: " + clientId; - } - - // 真实流式传输:连续推送数据,每秒一条,共10条 - // 这就是SSE的本质 - 服务器主动推送数据流 - scheduler.schedule(() -> { - for (int i = 1; i <= 10; i++) { - final int count = i; - scheduler.schedule(() -> { - try { - // 直接通过SSE流式推送数据 - emitter.send(SseEmitter.event() - .name("stream") - .data("实时数据流 #" + count + " - 时间戳: " + System.currentTimeMillis())); - - // 最后一条数据后发送完成通知 - if (count == 10) { - scheduler.schedule(() -> { - try { - emitter.send(SseEmitter.event() - .name("stream_complete") - .data("数据流传输完成")); - } catch (IOException e) { - log.error("发送完成通知失败: {}", e.getMessage()); - } - }, 1, TimeUnit.SECONDS); - } - - } catch (IOException e) { - log.error("流式数据推送失败: {}", e.getMessage()); - emitters.remove(clientId); - } - }, count, TimeUnit.SECONDS); - } - }, 0, TimeUnit.SECONDS); - - return "SSE数据流已开始推送"; - } - - /** - * 获取当前连接状态 - */ - @GetMapping("/status") - public Map getStatus() { - Map status = new ConcurrentHashMap<>(); - status.put("connectedClients", emitters.keySet()); - status.put("totalConnections", emitters.size()); - status.put("timestamp", System.currentTimeMillis()); - return status; - } -} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java index 075d2b9..a9115a3 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/controller/UserInteractionController.java @@ -1,14 +1,19 @@ package io.github.timemachinelab.controller; -import com.alibaba.fastjson2.JSON; -import io.github.timemachinelab.core.constant.AllPrompt; +import com.alibaba.fastjson.JSONObject; import io.github.timemachinelab.core.fingerprint.FingerprintService; import io.github.timemachinelab.core.fingerprint.UserFingerprint; +import io.github.timemachinelab.core.session.application.ConversationService; import io.github.timemachinelab.core.session.application.MessageProcessingService; import io.github.timemachinelab.core.session.application.SessionManagementService; import io.github.timemachinelab.core.session.application.SseNotificationService; +import io.github.timemachinelab.core.session.application.SseValidationService; +import io.github.timemachinelab.core.session.application.SessionException; +import io.github.timemachinelab.core.session.application.SseValidationService; +import io.github.timemachinelab.core.session.application.SessionProcessingService; +import io.github.timemachinelab.core.session.application.RetryProcessingService; + import io.github.timemachinelab.core.session.domain.entity.ConversationSession; -import io.github.timemachinelab.core.session.infrastructure.ai.GenPromptOperation; import io.github.timemachinelab.core.session.infrastructure.web.dto.GenPromptRequest; import io.github.timemachinelab.core.session.infrastructure.web.dto.UnifiedAnswerRequest; import io.github.timemachinelab.entity.req.RetryRequest; @@ -25,16 +30,16 @@ import javax.servlet.http.HttpServletRequest; import javax.validation.Valid; import java.io.IOException; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * 用户交互控制器 * 提供用户交互相关的API接口 * - * @author suifeng + * @author welsir * @date 2025/1/20 */ @Slf4j @@ -50,11 +55,20 @@ public class UserInteractionController { private SseNotificationService sseNotificationService; @Resource private FingerprintService fingerprintService; + @Resource + private ConversationService conversationService; + @Resource + private SseValidationService sseValidationService; + @Resource + private SessionProcessingService sessionProcessingService; + @Resource + private RetryProcessingService retryProcessingService; /** * 建立SSE连接 * 1. 生成指纹(如果不存在),返回空的sessionList * 2. 如果生成的指纹已经存在,获取对应的sessionList返回 + * * @param request HTTP请求对象 * @return SSE连接 */ @@ -91,219 +105,159 @@ public SseEmitter streamConversation(HttpServletRequest request) { connectionData.put("visitCount", userFingerprint.getVisitCount()); connectionData.put("timestamp", System.currentTimeMillis()); - sseNotificationService.sendWelcomeMessage(fingerprint, connectionData); - - // 设置连接事件处理 - emitter.onCompletion(() -> { - log.info("SSE连接完成: {}", fingerprint); - }); - - emitter.onTimeout(() -> { - log.info("SSE连接超时: {}", fingerprint); - sseNotificationService.removeSseConnection(fingerprint); - }); - - emitter.onError((ex) -> { - log.error("SSE连接错误: {} - {}", fingerprint, ex.getMessage()); - sseNotificationService.removeSseConnection(fingerprint); - }); - - return emitter; - - } catch (Exception e) { - log.error("建立SSE连接失败: {}", e.getMessage()); - SseEmitter errorEmitter = new SseEmitter(Long.MAX_VALUE); - try { - errorEmitter.send(SseEmitter.event() - .name("error") - .data("连接建立失败: " + e.getMessage())); - } catch (IOException ioException) { - log.error("发送错误消息失败: {}", ioException.getMessage()); - } - return errorEmitter; - } + // 将Map转换为JSONObject以生成标准JSON格式 + JSONObject jsonData = new JSONObject(connectionData); + sseNotificationService.sendSuccessMessage(fingerprint, jsonData.toJSONString()); + + // 设置连接事件处理 + emitter.onCompletion(() -> { + log.info("SSE连接完成: {}", fingerprint); + }); + + emitter.onTimeout(() -> { + log.info("SSE连接超时: {}", fingerprint); + sseNotificationService.removeSseConnection(fingerprint); + }); + + emitter.onError((ex) -> { + log.error("SSE连接错误: {} - {}", fingerprint, ex.getMessage()); + sseNotificationService.removeSseConnection(fingerprint); + }); + + return emitter; + + } catch (Exception e) { + log.error("建立SSE连接失败: {}", e.getMessage()); + SseEmitter errorEmitter = new SseEmitter(Long.MAX_VALUE); + try { + errorEmitter.send(SseEmitter.event() + .name("error") + .data("连接建立失败: " + e.getMessage())); + } catch (IOException ioException) { + log.error("发送错误消息失败: {}", ioException.getMessage()); + } + return errorEmitter; + } } /** * 重试接口 * - * @param request 重试请求参数 + * @param request 重试请求参数 + * @param httpRequest HTTP请求对象 * @return 重试结果 */ @PostMapping("/retry") - public ResponseEntity> retry(@Valid @RequestBody RetryRequest request) { - try { - log.info("收到重试请求 - nodeId: {}, sessionId: {}, whyretry: {}", - request.getNodeId(), request.getSessionId(), request.getWhyretry()); - - // 使用应用服务验证节点存在性 - //todo: 有可能水平越权 不传userId的话 - if (!sessionManagementService.validateNodeExists(request.getSessionId(), request.getNodeId())) { - log.warn("节点不存在 - nodeId: {}, sessionId: {}", request.getNodeId(), request.getSessionId()); - return ResponseEntity.badRequest().body(ApiResult.error("指定的节点不存在")); - } - - // 使用应用服务获取问题内容 - String question = sessionManagementService.getNodeQuestion(request.getSessionId(), request.getNodeId()); - if (question == null) { - log.warn("节点问题内容为空 - nodeId: {}, sessionId: {}", request.getNodeId(), request.getSessionId()); - return ResponseEntity.badRequest().body(ApiResult.error("节点问题内容为空")); - } - - // 获取会话对象 - ConversationSession session = sessionManagementService.getSessionById(request.getSessionId()); - if (session == null) { - log.warn("会话不存在 - sessionId: {}", request.getSessionId()); - return ResponseEntity.badRequest().body(ApiResult.error("会话不存在")); - } - - // 移除要重试的节点(AI会基于parentId重新创建节点) - boolean nodeRemoved = sessionManagementService.removeNode(request.getSessionId(), request.getNodeId()); - if (!nodeRemoved) { - log.warn("移除节点失败,但继续处理重试 - sessionId: {}, nodeId: {}", - request.getSessionId(), request.getNodeId()); - } - - // 使用MessageProcessingService处理重试消息 - String processedMessage = messageProcessingService.processRetryMessage( - request.getSessionId(), - request.getNodeId(), - request.getWhyretry(), - session - ); - - // 发送处理后的消息给AI服务 - messageProcessingService.processAndSendMessage(session, processedMessage); - - // 构建响应数据 - RetryResponse response = RetryResponse.builder() - .nodeId(request.getNodeId()) - .sessionId(request.getSessionId()) - .whyretry(request.getWhyretry()) - .processTime(System.currentTimeMillis()) - .build(); - - log.info("重试请求处理成功 - nodeId: {}, sessionId: {}", - request.getNodeId(), request.getSessionId()); - - return ResponseEntity.ok(ApiResult.success("重试请求处理成功", response)); - - } catch (Exception e) { - log.error("重试请求处理失败: {}", e.getMessage(), e); - return ResponseEntity.badRequest().body(ApiResult.serverError("重试请求处理失败: " + e.getMessage())); - } + public ResponseEntity> retry(@Valid @RequestBody RetryRequest request, HttpServletRequest httpRequest) + throws SessionException { + log.info("收到重试请求 - nodeId: {}, sessionId: {}, whyretry: {}", + request.getNodeId(), request.getSessionId(), request.getWhyretry()); + + // 1. 验证SSE连接并获取用户指纹 + String fingerprint = sseValidationService.validateAndGetFingerprint(httpRequest); + + // 2. 处理重试请求(验证和处理逻辑) + RetryProcessingService.RetryProcessingResult retryResult = + retryProcessingService.processRetryRequest(fingerprint, request); + + // 3. 发送处理后的消息给AI服务 + messageProcessingService.processAndSendMessage( + retryResult.getSession(), + retryResult.getProcessedMessage() + ); + + // 4. 构建响应数据 + RetryResponse response = retryProcessingService.buildRetryResponse(request); + + log.info("重试请求处理成功 - nodeId: {}, sessionId: {}", + request.getNodeId(), request.getSessionId()); + + return ResponseEntity.ok(ApiResult.success("重试请求处理成功", response)); } /** * 处理统一答案请求(基于用户指纹) * 支持单选、多选、输入框、表单等多种问题类型的回答 * 逻辑: 如果没有带sessionId,默认就是新建对话,不需要传入NodeId - * 如果带了sessionId就不是新建会话,需要nodeId + * 如果带了sessionId就不是新建会话 * - * @param request 统一答案请求 + * @param request 统一答案请求 * @param httpRequest HTTP请求对象 * @return 处理结果 */ @PostMapping("/message") public ResponseEntity processAnswer(@Validated @RequestBody UnifiedAnswerRequest request, - HttpServletRequest httpRequest) { - try { - log.info("接收到答案请求 - 会话ID: {}, 节点ID: {}, 问题类型: {}", - request.getSessionId(), - request.getNodeId(), - request.getQuestionType()); - - // 1. 从请求头获取和验证用户指纹(与SSE连接保持一致) - UserFingerprint userFingerprint = fingerprintService.getOrCreateUserFingerprint(httpRequest); - String fingerprint = userFingerprint.getFingerprint(); - - log.info("用户指纹: {}, 访问次数: {}", fingerprint, userFingerprint.getVisitCount()); - - // 2. 验证请求体中的userId是否与指纹匹配(可选验证) - String requestUserId = request.getUserId(); - if (requestUserId != null && !requestUserId.equals(fingerprint)) { - log.warn("请求体中的userId({})与请求头指纹({})不匹配,以请求头指纹为准", requestUserId, fingerprint); - } - - // 4. 根据sessionId是否为空判断新建还是继续会话 - String sessionId = request.getSessionId(); - boolean isNewConversation = (sessionId == null || sessionId.trim().isEmpty()); - - ConversationSession session; - - if (isNewConversation) { - // 新建对话,不需要nodeId - log.info("新建对话 - 用户指纹: {}", fingerprint); + HttpServletRequest httpRequest) throws SessionException { + log.info("接收到答案请求 - 会话ID: {}, 问题类型: {}", + request.getSessionId(), + request.getQuestionType()); + + // 1. 验证SSE连接并获取用户指纹 + String fingerprint = sseValidationService.validateAndGetFingerprint(httpRequest); + + // 2. 处理会话请求(新建或继续会话) + SessionProcessingService.SessionProcessingResult sessionResult = + sessionProcessingService.processSessionRequest(fingerprint, request); + + ConversationSession session = sessionResult.getSession(); + + // 3. 验证答案格式 + if (!messageProcessingService.validateAnswer(request)) { + log.warn("答案格式验证失败: {}", request); + return ResponseEntity.badRequest().body("答案格式不正确"); + } - // 创建新会话 - session = sessionManagementService.createNewSession(fingerprint); - sessionId = session.getSessionId(); + // 6. 处理答案并转换为消息 + String processedMessage = messageProcessingService.preprocessMessage( + null, + request, + session + ); - log.info("已创建新会话 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + // 7. 发送处理后的消息给AI服务 + messageProcessingService.processAndSendMessage(session, processedMessage); - // 新对话不需要验证nodeId,直接跳过验证 - } else { - // 继续现有对话,需要验证sessionId和nodeId - log.info("继续现有对话 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + return ResponseEntity.ok("答案处理成功"); + } - // 验证会话是否存在 - session = sessionManagementService.validateAndGetSession(fingerprint, sessionId); - if (session == null) { - log.warn("会话不存在或无效 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); - return ResponseEntity.badRequest().body("会话不存在或无效"); - } + @PostMapping("/gen-prompt") + public ResponseEntity genPrompt(@RequestBody GenPromptRequest request, HttpServletRequest httpRequest) throws SessionException { + // 1. 验证SSE连接并获取用户指纹 + String fingerprint = sseValidationService.validateAndGetFingerprint(httpRequest); + log.info("处理genPrompt请求 - 指纹: {}, sessionId: {}", fingerprint, request.getSessionId()); + + // 2. 获取或创建会话 + ConversationSession session = sessionManagementService.getOrCreateSession(fingerprint, request.getSessionId()); + if (session == null) { + log.error("会话创建或获取失败 - 指纹: {}, sessionId: {}", fingerprint, request.getSessionId()); + sseNotificationService.sendErrorMessage(fingerprint, "会话创建失败,请重试"); + return ResponseEntity.internalServerError().body("会话处理失败"); + } - // 验证nodeId - String nodeId = request.getNodeId(); - if (nodeId == null || nodeId.trim().isEmpty()) { - log.warn("继续会话必须提供nodeId - 会话ID: {}", sessionId); - return ResponseEntity.badRequest().body("继续会话必须提供nodeId"); - } + // 4. 调用AI服务生成提示词 + conversationService.genPrompt(session.getSessionId(), response -> { + try { - // 验证nodeId是否属于该会话 - if (!sessionManagementService.validateNodeId(sessionId, nodeId)) { - log.warn("无效的节点ID - 会话: {}, 节点: {}", sessionId, nodeId); - return ResponseEntity.badRequest().body("无效的节点ID"); - } + // 5. 更新currentNode - 在AI回答后创建新节点 + String newNodeId = session.getNextNodeId(); + session.setCurrentNode(newNodeId); + session.setUpdateTime(LocalDateTime.now()); - log.info("验证通过 - 会话: {}, 节点: {}", sessionId, nodeId); - } + // 发送AI生成的提示词 + sseNotificationService.sendSuccessMessage(fingerprint, response.getGenPrompt()); - // 5. 验证答案格式 - if (!messageProcessingService.validateAnswer(request)) { - log.warn("答案格式验证失败: {}", request); - return ResponseEntity.badRequest().body("答案格式不正确"); + log.info("genPrompt处理完成 - 会话: {}, 新节点: {}", session.getSessionId(), newNodeId); + } catch (Exception e) { + log.error("SSE消息发送失败: {}", e.getMessage(), e); + // 确保SSE有响应,即使是错误消息 + sseNotificationService.sendErrorMessage(fingerprint, "提示词生成完成,但响应发送失败"); } + }); - // 6. 处理答案并转换为消息 - String processedMessage = messageProcessingService.preprocessMessage( - null, - request, - session - ); - - // 7. 发送处理后的消息给AI服务 - messageProcessingService.processAndSendMessage(session, processedMessage); - - return ResponseEntity.ok("答案处理成功"); - - } catch (Exception e) { - log.error("处理答案失败 - 会话ID: {}, 错误: {}", - (request.getSessionId() != null ? request.getSessionId() : "新建会话"), - e.getMessage(), e); - return ResponseEntity.internalServerError().body("答案处理失败: " + e.getMessage()); - } + return ResponseEntity.ok("提示词生成请求已处理"); } - @PostMapping("/gen-prompt") - public ResponseEntity genPrompt(@RequestBody GenPromptRequest request) { - GenPromptOperation.GpResponse gpResponse = new GenPromptOperation.GpResponse(); - gpResponse.setGenPrompt(AllPrompt.GEN_PROMPT_AGENT_PROMPT); - sseNotificationService.sendWelcomeMessage(request.getSessionId(), JSON.toJSONString(gpResponse)); - return ResponseEntity.ok("生成提示词"); - } - /** * 获取SSE连接状态 */ diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/FingerprintService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/FingerprintService.java new file mode 100644 index 0000000..d4286ba --- /dev/null +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/FingerprintService.java @@ -0,0 +1,246 @@ +package io.github.timemachinelab.core.fingerprint; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.servlet.http.HttpServletRequest; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 用户指纹服务 + * 用于生成和管理用户指纹信息 + * + * @author welsir + * @date 2025/1/20 + */ +@Slf4j +@Service +public class FingerprintService { + + /** + * 用户指纹缓存 + */ + private final Map fingerprintCache = new ConcurrentHashMap<>(); + + /** + * 获取或创建用户指纹 + * + * @param request HTTP请求对象 + * @return 用户指纹信息 + */ + public UserFingerprint getOrCreateUserFingerprint(HttpServletRequest request) { + try { + // 生成指纹ID + String fingerprintId = generateFingerprint(request); + + // 检查缓存中是否已存在 + UserFingerprint existingFingerprint = fingerprintCache.get(fingerprintId); + + if (existingFingerprint != null) { + // 更新访问信息 + existingFingerprint.setVisitCount(existingFingerprint.getVisitCount() + 1); + existingFingerprint.setLastVisitTime(LocalDateTime.now()); + + log.debug("用户指纹已存在: {}, 访问次数: {}", fingerprintId, existingFingerprint.getVisitCount()); + return existingFingerprint; + } + + // 创建新的用户指纹 + UserFingerprint newFingerprint = UserFingerprint.builder() + .fingerprint(fingerprintId) + .displayName(generateDisplayName()) + .visitCount(1) + .firstVisitTime(LocalDateTime.now()) + .lastVisitTime(LocalDateTime.now()) + .ipAddress(getClientIpAddress(request)) + .userAgent(request.getHeader("User-Agent")) + .build(); + + // 存入缓存 + fingerprintCache.put(fingerprintId, newFingerprint); + + log.info("创建新用户指纹: {}, 显示名称: {}", fingerprintId, newFingerprint.getDisplayName()); + return newFingerprint; + + } catch (Exception e) { + log.error("生成用户指纹失败: {}", e.getMessage(), e); + // 返回默认指纹 + return createDefaultFingerprint(); + } + } + + /** + * 生成用户指纹ID + * + * @param request HTTP请求对象 + * @return 指纹ID + */ + private String generateFingerprint(HttpServletRequest request) { + try { + StringBuilder fingerprintData = new StringBuilder(); + + // 获取客户端IP + String clientIp = getClientIpAddress(request); + fingerprintData.append(clientIp); + + // 获取User-Agent + String userAgent = request.getHeader("User-Agent"); + if (userAgent != null) { + fingerprintData.append(userAgent); + } + + // 获取Accept-Language + String acceptLanguage = request.getHeader("Accept-Language"); + if (acceptLanguage != null) { + fingerprintData.append(acceptLanguage); + } + + // 使用MD5生成指纹 + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] digest = md.digest(fingerprintData.toString().getBytes()); + + StringBuilder hexString = new StringBuilder(); + for (byte b : digest) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + + return hexString.toString(); + + } catch (NoSuchAlgorithmException e) { + log.error("MD5算法不可用,使用UUID作为指纹: {}", e.getMessage()); + return UUID.randomUUID().toString().replace("-", ""); + } + } + + /** + * 获取客户端真实IP地址 + * + * @param request HTTP请求对象 + * @return 客户端IP地址 + */ + private String getClientIpAddress(HttpServletRequest request) { + String[] headerNames = { + "X-Forwarded-For", + "X-Real-IP", + "Proxy-Client-IP", + "WL-Proxy-Client-IP", + "HTTP_CLIENT_IP", + "HTTP_X_FORWARDED_FOR" + }; + + for (String headerName : headerNames) { + String ip = request.getHeader(headerName); + if (ip != null && !ip.isEmpty() && !"unknown".equalsIgnoreCase(ip)) { + // 多个IP时取第一个 + if (ip.contains(",")) { + ip = ip.split(",")[0].trim(); + } + return ip; + } + } + + return request.getRemoteAddr(); + } + + /** + * 生成用户显示名称 + * + * @return 显示名称 + */ + private String generateDisplayName() { + String[] adjectives = {"聪明的", "勇敢的", "友善的", "创新的", "睿智的", "活跃的", "优雅的", "坚强的"}; + String[] nouns = {"探索者", "创造者", "思考者", "学习者", "建设者", "梦想家", "实践者", "先锋"}; + + int adjIndex = (int) (Math.random() * adjectives.length); + int nounIndex = (int) (Math.random() * nouns.length); + + return adjectives[adjIndex] + nouns[nounIndex]; + } + + /** + * 创建默认指纹(当生成失败时使用) + * + * @return 默认用户指纹 + */ + private UserFingerprint createDefaultFingerprint() { + String defaultId = UUID.randomUUID().toString().replace("-", ""); + + return UserFingerprint.builder() + .fingerprint(defaultId) + .displayName("匿名用户") + .visitCount(1) + .firstVisitTime(LocalDateTime.now()) + .lastVisitTime(LocalDateTime.now()) + .ipAddress("unknown") + .userAgent("unknown") + .build(); + } + + /** + * 获取已存在的用户指纹(不创建新指纹) + * + * @param request HTTP请求对象 + * @return 用户指纹信息,如果不存在则返回null + */ + public UserFingerprint getExistingUserFingerprint(HttpServletRequest request) { + try { + // 生成指纹ID + String fingerprintId = generateFingerprint(request); + + // 只从缓存中获取,不创建新的 + UserFingerprint existingFingerprint = fingerprintCache.get(fingerprintId); + + if (existingFingerprint != null) { + // 更新访问信息 + existingFingerprint.setVisitCount(existingFingerprint.getVisitCount() + 1); + existingFingerprint.setLastVisitTime(LocalDateTime.now()); + + log.debug("获取已存在用户指纹: {}, 访问次数: {}", fingerprintId, existingFingerprint.getVisitCount()); + return existingFingerprint; + } + + log.debug("用户指纹不存在: {}", fingerprintId); + return null; + + } catch (Exception e) { + log.error("获取用户指纹失败: {}", e.getMessage(), e); + return null; + } + } + + /** + * 获取指纹缓存统计信息 + * + * @return 缓存统计信息 + */ + public Map getCacheStats() { + Map stats = new ConcurrentHashMap<>(); + stats.put("totalFingerprints", fingerprintCache.size()); + stats.put("cacheSize", fingerprintCache.size()); + return stats; + } + + /** + * 清理过期的指纹缓存 + * 可以定期调用此方法清理长时间未访问的指纹 + */ + public void cleanupExpiredFingerprints() { + LocalDateTime expireTime = LocalDateTime.now().minusHours(24); // 24小时未访问则清理 + + fingerprintCache.entrySet().removeIf(entry -> { + UserFingerprint fingerprint = entry.getValue(); + return fingerprint.getLastVisitTime().isBefore(expireTime); + }); + + log.info("清理过期指纹缓存完成,当前缓存大小: {}", fingerprintCache.size()); + } +} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/UserFingerprint.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/UserFingerprint.java new file mode 100644 index 0000000..3ec0283 --- /dev/null +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/fingerprint/UserFingerprint.java @@ -0,0 +1,64 @@ +package io.github.timemachinelab.core.fingerprint; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.AllArgsConstructor; +import lombok.Builder; + +import java.time.LocalDateTime; + +/** + * 用户指纹信息 + * 用于标识和跟踪用户会话 + * + * @author welsir + * @date 2025/1/20 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class UserFingerprint { + + /** + * 用户指纹ID + */ + private String fingerprint; + + /** + * 用户显示名称 + */ + private String displayName; + + /** + * 访问次数 + */ + private Integer visitCount; + + /** + * 首次访问时间 + */ + private LocalDateTime firstVisitTime; + + /** + * 最后访问时间 + */ + private LocalDateTime lastVisitTime; + + /** + * 用户IP地址 + */ + private String ipAddress; + + /** + * 用户代理信息 + */ + private String userAgent; + + /** + * 是否为新用户 + */ + public boolean isNewUser() { + return visitCount != null && visitCount == 1; + } +} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/MessageProcessingService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/MessageProcessingService.java index b415f1b..63f74ac 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/MessageProcessingService.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/MessageProcessingService.java @@ -11,16 +11,7 @@ * 日期: 2025/1/27 */ public interface MessageProcessingService { - - /** - * 处理统一答案请求 - * 将用户的回答转换为适合大模型处理的格式 - * - * @param request 统一答案请求 - * @return 处理后的消息内容 - */ - String processAnswer(UnifiedAnswerRequest request); - + /** * 预处理消息 * 在发送给大模型之前对消息进行加工 diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/RetryProcessingService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/RetryProcessingService.java new file mode 100644 index 0000000..17f6859 --- /dev/null +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/RetryProcessingService.java @@ -0,0 +1,120 @@ +package io.github.timemachinelab.core.session.application; + +import io.github.timemachinelab.core.session.domain.entity.ConversationSession; +import io.github.timemachinelab.core.session.application.SessionException; +import io.github.timemachinelab.entity.req.RetryRequest; +import io.github.timemachinelab.entity.resp.RetryResponse; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * 重试处理服务 + * 封装重试请求的验证和处理逻辑 + * + * @author welsir + * @date 2025/1/20 + */ +@Slf4j +@Service +public class RetryProcessingService { + + @Resource + private SessionManagementService sessionManagementService; + + @Resource + private MessageProcessingService messageProcessingService; + + /** + * 重试处理结果 + */ + public static class RetryProcessingResult { + private final ConversationSession session; + private final String processedMessage; + + public RetryProcessingResult(ConversationSession session, String processedMessage) { + this.session = session; + this.processedMessage = processedMessage; + } + + public ConversationSession getSession() { + return session; + } + + public String getProcessedMessage() { + return processedMessage; + } + } + + /** + * 处理重试请求 + * 包含完整的验证和处理逻辑 + * + * @param fingerprint 用户指纹 + * @param request 重试请求 + * @return 重试处理结果 + * @throws SessionException 重试处理异常 + */ + public RetryProcessingResult processRetryRequest(String fingerprint, RetryRequest request) + throws SessionException { + + String sessionId = request.getSessionId(); + String whyRetry = request.getWhyretry(); + + log.info("处理重试请求 - 指纹: {}, sessionId: {}, 原因: {}", + fingerprint, sessionId, whyRetry); + + ConversationSession session = sessionManagementService.getSessionById(sessionId); + if (session == null) { + log.warn("会话不存在 - sessionId: {}", sessionId); + throw new SessionException("SSE连接验证失败: 会话不存在"); + } + String nodeId = session.getCurrentNode(); + // 1. 验证节点存在性 + if (!sessionManagementService.validateNodeExists(sessionId, nodeId)) { + log.warn("节点不存在 - nodeId: {}, sessionId: {}", nodeId, sessionId); + throw new SessionException("SSE连接验证失败: 指定的节点不存在"); + } + + // 2. 验证问题内容 + String question = sessionManagementService.getNodeQuestion(sessionId, nodeId); + if (question == null) { + log.warn("节点问题内容为空 - nodeId: {}, sessionId: {}", nodeId, sessionId); + throw new SessionException("SSE连接验证失败: 节点问题内容为空"); + } + + // 4. 移除要重试的节点(AI会基于parentId重新创建节点) + boolean nodeRemoved = sessionManagementService.removeNode(sessionId, nodeId); + if (!nodeRemoved) { + log.warn("移除节点失败 - sessionId: {}, nodeId: {}", sessionId, nodeId); + throw new RuntimeException("移除节点失败"); + } + + // 5. 使用MessageProcessingService处理重试消息 + String processedMessage = messageProcessingService.processRetryMessage( + sessionId, + nodeId, + whyRetry, + session + ); + + log.info("重试请求处理完成 - sessionId: {}, nodeId: {}", sessionId, nodeId); + return new RetryProcessingResult(session, processedMessage); + } + + /** + * 构建重试响应 + * + * @param request 重试请求 + * @return 重试响应 + */ + public RetryResponse buildRetryResponse(RetryRequest request) { + return RetryResponse.builder() + .nodeId(request.getNodeId()) + .sessionId(request.getSessionId()) + .whyretry(request.getWhyretry()) + .processTime(System.currentTimeMillis()) + .build(); + } +} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java index b5aea76..3e96c0e 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionManagementService.java @@ -134,7 +134,7 @@ public ConversationSession createNewSession(String userId) { // 设置QaTree到会话中 session.setQaTree(tree); - + session.setCurrentNode(tree.getRoot().getId()); // 建立映射关系 - 添加到用户的会话列表中 userSessionMap.computeIfAbsent(userId, k -> new ArrayList<>()).add(session.getSessionId()); sessions.put(session.getSessionId(), session); diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionProcessingService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionProcessingService.java new file mode 100644 index 0000000..412299a --- /dev/null +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SessionProcessingService.java @@ -0,0 +1,99 @@ +package io.github.timemachinelab.core.session.application; + +import io.github.timemachinelab.core.session.domain.entity.ConversationSession; +import io.github.timemachinelab.core.session.infrastructure.web.dto.UnifiedAnswerRequest; +import io.github.timemachinelab.core.session.application.SessionException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * 会话处理服务 + * 封装新建/继续会话的判断逻辑和验证逻辑 + */ +@Slf4j +@Service +public class SessionProcessingService { + + @Resource + private SessionManagementService sessionManagementService; + + /** + * 会话处理结果 + */ + public static class SessionProcessingResult { + private final ConversationSession session; + private final boolean isNewConversation; + + public SessionProcessingResult(ConversationSession session, boolean isNewConversation) { + this.session = session; + this.isNewConversation = isNewConversation; + } + + public ConversationSession getSession() { + return session; + } + + public boolean isNewConversation() { + return isNewConversation; + } + } + + /** + * 处理会话请求,根据sessionId判断新建还是继续会话 + * 并进行相应的验证 + * + * @param fingerprint 用户指纹 + * @param request 统一答案请求 + * @return 会话处理结果 + * @throws SessionException 会话处理异常 + */ + public SessionProcessingResult processSessionRequest(String fingerprint, UnifiedAnswerRequest request) + throws SessionException { + + String sessionId = request.getSessionId(); + boolean isNewConversation = (sessionId == null || sessionId.trim().isEmpty()); + + ConversationSession session; + + if (isNewConversation) { + // 新建对话,不需要nodeId + log.info("新建对话 - 用户指纹: {}", fingerprint); + + // 创建新会话 + session = sessionManagementService.createNewSession(fingerprint); + sessionId = session.getSessionId(); + + log.info("已创建新会话 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + + } else { + // 继续现有对话,需要验证sessionId和nodeId + log.info("继续现有对话 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + + // 验证会话是否存在 + session = sessionManagementService.validateAndGetSession(fingerprint, sessionId); + if (session == null) { + log.warn("会话不存在或无效 - 用户指纹: {}, 会话ID: {}", fingerprint, sessionId); + throw new SessionException("会话处理失败: 会话不存在或无效"); + } + + // 验证nodeId + String nodeId = session.getCurrentNode(); + if (nodeId == null || nodeId.trim().isEmpty()) { + log.warn("继续会话必须提供nodeId - 会话ID: {}", sessionId); + throw new SessionException("会话处理失败: 继续会话必须提供nodeId"); + } + + // 验证nodeId是否属于该会话 + if (!sessionManagementService.validateNodeId(sessionId, nodeId)) { + log.warn("无效的节点ID - 会话: {}, 节点: {}", sessionId, nodeId); + throw new SessionException("会话处理失败: 无效的节点ID"); + } + + log.info("验证通过 - 会话: {}, 节点: {}", sessionId, nodeId); + } + + return new SessionProcessingResult(session, isNewConversation); + } +} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java index c7ac83b..c6bf922 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseNotificationService.java @@ -43,12 +43,7 @@ public void registerSseConnection(String fingerprint, SseEmitter emitter) { // 如果用户已有连接,先移除旧连接 SseEmitter oldEmitter = sseEmitters.get(fingerprint); if (oldEmitter != null) { - try { - oldEmitter.complete(); - log.info("关闭旧SSE连接 - 用户指纹: {}", fingerprint); - } catch (Exception e) { - log.warn("关闭旧SSE连接失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); - } + return; } sseEmitters.put(fingerprint, emitter); @@ -64,133 +59,15 @@ public void removeSseConnection(String fingerprint) { sseEmitters.remove(fingerprint); log.info("SSE连接已移除 - 用户指纹: {}", fingerprint); } - + /** - * 发送SSE消息给客户端 - * - * @param sessionId 会话ID - * @param response 消息响应对象 + * 检查SSE连接是否存在 + * + * @param fingerprint 用户指纹 + * @return 连接是否存在 */ - public void sendSseMessage(String sessionId, QuestionGenerationOperation.QuestionGenerationResponse response) { - // 通过sessionId找到对应的用户指纹 - ConversationSession session = sessionManagementService.getSessionById(sessionId); - - // 验证输入参数 - if (session == null) { - log.warn("会话不存在 - 会话ID: {}", sessionId); - return; - } - - if (response == null) { - log.warn("AI响应对象为空 - 会话: {}", sessionId); - // 发送错误消息 - sendErrorMessage(session, "AI响应对象为空", "RESPONSE_NULL"); - return; - } - - String parentId = response.getParentId(); - if (parentId == null || parentId.trim().isEmpty()) { - log.warn("父节点ID为空 - 会话: {}", sessionId); - // 发送错误消息 - sendErrorMessage(session, "父节点ID为空", "PARENT_ID_EMPTY"); - return; - } - - // 验证QaTree是否存在 - QaTree qaTree = session.getQaTree(); - if (qaTree == null) { - log.warn("QaTree不存在 - 会话: {}, 无法验证父节点: {}", sessionId, parentId); - // 发送错误消息 - sendErrorMessage(session, "QaTree不存在", "QATREE_NOT_FOUND"); - return; - } - - // 验证父节点是否存在 - if (!qaTreeDomain.containsNode(qaTree, parentId)) { - log.warn("父节点不存在 - 会话: {}, parentId: {}, 当前QaTree节点数量: {}", - sessionId, parentId, qaTree.getNodeCount()); - - // 记录QaTree的所有节点ID以便调试 - if (qaTree.getNodeCount() > 0) { - log.info("当前QaTree中的所有节点ID: {}", qaTree.getNodeMap().keySet()); - } - - // 发送错误消息 - sendErrorMessage(session, "父节点不存在", "PARENT_NODE_NOT_FOUND"); - return; - } - - // 获取用户指纹和SSE连接 - String fingerprint = session.getUserId(); // userId就是指纹 - SseEmitter emitter = sseEmitters.get(fingerprint); - if (emitter == null) { - log.warn("SSE连接不存在 - 会话: {}, 用户指纹: {}, 当前连接数: {}", - sessionId, fingerprint, sseEmitters.size()); - // 可以考虑将消息缓存或记录到数据库,以便用户重连后可以收到 - log.info("考虑将消息缓存以便后续处理 - 问题: {}", - response.getQuestion() != null ? response.getQuestion().getQuestion() : "null"); - // 发送错误消息 - sendErrorMessage(session, "SSE连接不存在", "SSE_CONNECTION_NOT_FOUND"); - return; - } - - try { - String currentNodeId = null; - - log.info("开始处理AI问题 - 会话: {}, 父节点: {}, 问题类型: {}", - sessionId, parentId, response.getQuestion() != null ? response.getQuestion().getType() : "null"); - - // 1. 先将AI生成的新问题添加到QaTree(只填入question,answer留空) - if (response.getQuestion() != null) { - // 使用QaTreeDomain添加新节点,answer字段会自动为空 - qaTreeDomain.appendNode(qaTree, parentId, response.getQuestion(), session); - - // 获取刚刚创建的节点ID(当前计数器的值) - currentNodeId = String.valueOf(session.getNodeIdCounter().get()); - - log.info("AI问题已添加到QaTree - 会话: {}, 父节点: {}, 新节点ID: {}, 问题类型: {}", - sessionId, parentId, currentNodeId, response.getQuestion().getType()); - } else { - log.warn("AI响应中问题对象为空 - 会话: {}", sessionId); - } - - // 2. 创建修改后的响应对象,包含currentNodeId和parentNodeId - Map modifiedResponse = new HashMap<>(); - modifiedResponse.put("question", response.getQuestion()); - modifiedResponse.put("currentNodeId", currentNodeId != null ? currentNodeId : parentId); - modifiedResponse.put("parentNodeId", parentId); - - // 3. 发送SSE消息给前端 - emitter.send(SseEmitter.event() - .name("message") - .data(modifiedResponse)); - log.info("SSE消息发送成功 - 会话: {}, 当前节点ID: {}", sessionId, currentNodeId); - - } catch (IOException e) { - log.error("SSE消息发送失败 - 会话: {}, 用户指纹: {}, 错误: {}", sessionId, fingerprint, e.getMessage()); - sseEmitters.remove(fingerprint); - // 发送错误消息 - sendErrorMessage(session, "SSE消息发送失败: " + e.getMessage(), "SSE_SEND_FAILED"); - } catch (Exception e) { - log.error("添加问题到QaTree失败 - 会话: {}, 错误: {}", sessionId, e.getMessage()); - // 即使QaTree更新失败,仍然发送SSE消息给前端 - try { - Map fallbackResponse = new HashMap<>(); - fallbackResponse.put("question", response.getQuestion()); - fallbackResponse.put("currentNodeId", parentId); // 使用parentId作为fallback - fallbackResponse.put("parentNodeId", parentId); - - emitter.send(SseEmitter.event() - .name("message") - .data(fallbackResponse)); - log.info("SSE消息发送成功(QaTree更新失败但消息已发送) - 会话: {}", sessionId); - } catch (IOException ioException) { - log.error("SSE消息发送失败 - 会话: {}, 用户指纹: {}, 错误: {}", sessionId, fingerprint, ioException.getMessage()); - sseEmitters.remove(fingerprint); - // 发送错误消息 - sendErrorMessage(session, "SSE消息发送失败: " + ioException.getMessage(), "SSE_SEND_FAILED"); - } - } + public boolean isConnectionExists(String fingerprint) { + return sseEmitters.containsKey(fingerprint); } /** @@ -207,26 +84,28 @@ public Map getSseStatus() { } /** - * 发送欢迎消息 + * 通用SSE消息发送方法 * * @param fingerprint 用户指纹 - * @param message 欢迎消息内容 + * @param eventName 事件名称 + * @param data 消息数据 */ - public void sendWelcomeMessage(String fingerprint, String message) { + private void sendSseEvent(String fingerprint, String eventName, Object data) { SseEmitter emitter = sseEmitters.get(fingerprint); if (emitter != null) { try { emitter.send(SseEmitter.event() - .name("connected") - .data(message)); - log.info("欢迎消息发送成功 - 用户指纹: {}", fingerprint); + .name(eventName) + .data(data)); + log.info("SSE事件发送成功 - 用户指纹: {}, 事件: {}", fingerprint, eventName); } catch (IOException e) { - log.error("欢迎消息发送失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); + log.error("SSE事件发送失败 - 用户指纹: {}, 事件: {}, 错误: {}", fingerprint, eventName, e.getMessage()); sseEmitters.remove(fingerprint); } } } + /** * 发送连接数据 * @@ -234,36 +113,7 @@ public void sendWelcomeMessage(String fingerprint, String message) { * @param connectionData 连接数据 */ public void sendWelcomeMessage(String fingerprint, Map connectionData) { - SseEmitter emitter = sseEmitters.get(fingerprint); - if (emitter != null) { - try { - emitter.send(SseEmitter.event() - .name("connected") - .data(connectionData)); - log.info("连接数据发送成功 - 用户指纹: {}", fingerprint); - } catch (IOException e) { - log.error("连接数据发送失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); - sseEmitters.remove(fingerprint); - } - } - } - - /** - * 发送错误消息给客户端 - * 确保错误时能够立即响应,不会卡住 - * - * @param session 会话对象 - * @param errorMessage 错误消息 - * @param errorCode 错误代码 - */ - private void sendErrorMessage(ConversationSession session, String errorMessage, String errorCode) { - if (session == null) { - log.warn("无法发送错误消息,会话对象为空"); - return; - } - - String fingerprint = session.getUserId(); - sendErrorMessage(fingerprint, errorMessage, errorCode); + sendSseEvent(fingerprint, "connected", connectionData); } /** @@ -271,29 +121,24 @@ private void sendErrorMessage(ConversationSession session, String errorMessage, * 确保错误时能够立即响应,不会卡住 * * @param fingerprint 用户指纹 - * @param errorMessage 错误消息 - * @param errorCode 错误代码 */ - private void sendErrorMessage(String fingerprint, String errorMessage, String errorCode) { - SseEmitter emitter = sseEmitters.get(fingerprint); - if (emitter != null) { - try { - Map errorResponse = new HashMap<>(); - errorResponse.put("error", true); - errorResponse.put("errorCode", errorCode); - errorResponse.put("errorMessage", errorMessage); - errorResponse.put("timestamp", System.currentTimeMillis()); - - emitter.send(SseEmitter.event() - .name("error") - .data(errorResponse)); - - log.info("错误消息已发送 - 用户指纹: {}, 错误代码: {}, 错误消息: {}", fingerprint, errorCode, errorMessage); - } catch (IOException e) { - log.error("发送错误消息失败 - 用户指纹: {}, 错误: {}", fingerprint, e.getMessage()); - // 如果发送错误消息失败,移除连接 - sseEmitters.remove(fingerprint); - } - } + public void sendErrorMessage(String fingerprint, String msg) { + + sendMessage(fingerprint,msg,"500",false); + } + + private void sendMessage(String fingerprint, String msg,String code,boolean success) { + Map errorResponse = new HashMap<>(); + errorResponse.put("success", success); + errorResponse.put("code", code); + errorResponse.put("data", msg); + errorResponse.put("timestamp", System.currentTimeMillis()); + + sendSseEvent(fingerprint, success?"success":"error", errorResponse); + log.info("消息已发送 - 用户指纹: {}, 代码: {}, 消息: {}", fingerprint, code, msg); + } + + public void sendSuccessMessage(String fingerprint,String msg) { + sendMessage(fingerprint,msg,"200",true); } } \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseValidationService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseValidationService.java new file mode 100644 index 0000000..f4ee36c --- /dev/null +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/SseValidationService.java @@ -0,0 +1,138 @@ +package io.github.timemachinelab.core.session.application; + +import io.github.timemachinelab.core.fingerprint.FingerprintService; +import io.github.timemachinelab.core.fingerprint.UserFingerprint; +import io.github.timemachinelab.core.session.application.SessionException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import javax.servlet.http.HttpServletRequest; + +/** + * SSE验证服务 + * 封装指纹验证和SSE连接检查的通用逻辑 + * + * @author welsir + * @date 2025/1/20 + */ +@Slf4j +@Service +public class SseValidationService { + + @Resource + private FingerprintService fingerprintService; + + @Resource + private SseNotificationService sseNotificationService; + + /** + * SSE验证结果 + */ + public static class ValidationResult { + private final boolean valid; + private final String errorMessage; + private final String fingerprint; + + private ValidationResult(boolean valid, String errorMessage, String fingerprint) { + this.valid = valid; + this.errorMessage = errorMessage; + this.fingerprint = fingerprint; + } + + public static ValidationResult success(String fingerprint) { + return new ValidationResult(true, null, fingerprint); + } + + public static ValidationResult error(String errorMessage) { + return new ValidationResult(false, errorMessage, null); + } + + public boolean isValid() { + return valid; + } + + public String getErrorMessage() { + return errorMessage; + } + + public String getFingerprint() { + return fingerprint; + } + } + + /** + * 验证SSE连接并获取用户指纹 + * 如果验证失败,抛出业务异常 + * + * @param request HTTP请求对象 + * @return 用户指纹字符串 + * @throws SessionException 验证失败时抛出 + */ + public String validateAndGetFingerprint(HttpServletRequest request) throws SessionException { + try { + // 1. 获取已存在的用户指纹(不创建新指纹) + UserFingerprint userFingerprint = fingerprintService.getExistingUserFingerprint(request); + if (userFingerprint == null) { + log.warn("用户指纹不存在,请先建立SSE连接"); + throw new SessionException("SSE连接验证失败: 用户指纹不存在,请先建立SSE连接"); + } + + String fingerprint = userFingerprint.getFingerprint(); + log.debug("用户指纹: {}, 访问次数: {}", fingerprint, userFingerprint.getVisitCount()); + + // 2. 检查SSE连接是否存在 + if (!sseNotificationService.isConnectionExists(fingerprint)) { + log.warn("SSE连接不存在 - 指纹: {}", fingerprint); + throw new SessionException("SSE连接验证失败: SSE连接不存在,请先建立连接"); + } + + log.debug("SSE连接验证成功 - 指纹: {}", fingerprint); + return fingerprint; + + } catch (SessionException e) { + throw e; // 重新抛出业务异常 + } catch (Exception e) { + log.error("SSE连接验证异常: {}", e.getMessage(), e); + throw new SessionException("SSE连接验证失败: 系统异常,请重试"); + } + } + + /** + * 验证SSE连接和用户指纹 + * + * @param request HTTP请求对象 + * @return 验证结果 + * @deprecated 使用 validateAndGetFingerprint 方法替代 + */ + @Deprecated + public ValidationResult validateSseConnection(HttpServletRequest request) { + try { + String fingerprint = validateAndGetFingerprint(request); + return ValidationResult.success(fingerprint); + } catch (SessionException e) { + return ValidationResult.error(e.getMessage()); + } + } + + /** + * 发送SSE错误消息(如果连接存在) + * + * @param request HTTP请求对象 + * @param errorMessage 错误消息 + */ + public void sendSseErrorIfConnected(HttpServletRequest request, String errorMessage) { + try { + UserFingerprint userFingerprint = fingerprintService.getExistingUserFingerprint(request); + if (userFingerprint != null) { + String fingerprint = userFingerprint.getFingerprint(); + + if (sseNotificationService.isConnectionExists(fingerprint)) { + sseNotificationService.sendErrorMessage(fingerprint, errorMessage); + } + } + } catch (Exception ex) { + log.error("发送SSE错误消息失败: {}", ex.getMessage()); + } + } +} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java index 1a2564f..d06b810 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/application/impl/DefaultMessageProcessingService.java @@ -1,14 +1,16 @@ package io.github.timemachinelab.core.session.application.impl; - -import com.alibaba.fastjson2.JSONObject; -import com.fasterxml.jackson.core.JsonProcessingException;; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.gson.JsonObject; +import dev.langchain4j.internal.Json; import io.github.timemachinelab.core.constant.AllPrompt; +import io.github.timemachinelab.core.constant.RetryPrompt; +import io.github.timemachinelab.core.session.application.ConversationService; import io.github.timemachinelab.core.qatree.QaTree; import io.github.timemachinelab.core.qatree.QaTreeDomain; import io.github.timemachinelab.core.session.application.MessageProcessingService; import io.github.timemachinelab.core.session.application.SessionManagementService; -import io.github.timemachinelab.core.session.application.ConversationService; import io.github.timemachinelab.core.session.application.SseNotificationService; import io.github.timemachinelab.core.session.domain.entity.ConversationSession; import io.github.timemachinelab.core.session.infrastructure.web.dto.UnifiedAnswerRequest; @@ -36,34 +38,8 @@ public class DefaultMessageProcessingService implements MessageProcessingService @Resource ConversationService conversationService; @Resource - SseNotificationService sseNotificationService; + SseNotificationService sseNotificationService; - @Override - public String processAnswer(UnifiedAnswerRequest request) { - if (!validateAnswer(request)) { - log.warn("无效的答案请求: {}", request); - return "无效的回答格式"; - } - - // 获取会话并更新qaTree - ConversationSession session = sessionManagementService.validateAndGetSession(request.getUserId(), request.getSessionId()); - if (session != null) { - updateQaTreeWithAnswer(session, request); - } - - // 将答案转换为可读文本 - String readableText = request.toReadableText(); - - // 构建完整的回答消息 - StringBuilder message = new StringBuilder(); - message.append("用户回答了问题: "); - message.append(readableText); - - log.info("处理答案请求 - 问题类型: {}, 内容: {}", request.getQuestionType(), readableText); - - return message.toString(); - } - /** * 更新qaTree中的答案 */ @@ -75,7 +51,7 @@ private void updateQaTreeWithAnswer(ConversationSession session, UnifiedAnswerRe return; } - String nodeId = request.getNodeId(); + String nodeId = session.getCurrentNode(); // 根据问题类型准备答案数据 Object answerData = prepareAnswerData(request); @@ -116,7 +92,6 @@ public String preprocessMessage(String originalMessage, UnifiedAnswerRequest ans updateQaTreeWithAnswer(conversationSession, answerRequest); - // 构建输入JSON JSONObject object = new JSONObject(); object.put("prompt",AllPrompt.GLOBAL_PROMPT); object.put("tree", QaTreeSerializeUtil.serialize(conversationSession.getQaTree())); @@ -160,48 +135,91 @@ public boolean validateAnswer(UnifiedAnswerRequest request) { @Override public String processRetryMessage(String sessionId, String nodeId, String whyRetry, ConversationSession conversationSession) { try { - // 构建重试消息的JSON格式 - JSONObject retryInput = new JSONObject(); - retryInput.put("action", "retry"); - retryInput.put("nodeId", nodeId); - retryInput.put("whyRetry", whyRetry != null ? whyRetry : "用户要求重新生成问题"); - - // 获取节点的问题内容 + // 获取上一个问题内容 String preQuestion = sessionManagementService.getNodeQuestion(sessionId, nodeId); - if (preQuestion != null) { - retryInput.put("preQuestion", preQuestion); + if (preQuestion == null) { + log.warn("无法获取节点问题内容 - sessionId: {}, nodeId: {}", sessionId, nodeId); + preQuestion = "无法获取上一个问题内容"; } - JSONObject object = new JSONObject(); - object.put("prompt", AllPrompt.GLOBAL_PROMPT); - object.put("tree", QaTreeSerializeUtil.serialize(conversationSession.getQaTree())); - object.put("input", retryInput.toString()); + // 构建重试请求的JSON格式 + JSONObject retryObject = new JSONObject(); + retryObject.put("action", "retry"); + retryObject.put("preQuestion", preQuestion); + retryObject.put("whyRetry", whyRetry); - log.info("处理重试消息 - 会话: {}, 节点: {}, 原因: {}", sessionId, nodeId, whyRetry); - return object.toString(); + // 构建完整的消息对象 + JSONObject messageObject = new JSONObject(); + messageObject.put("prompt", RetryPrompt.RETRY_PROMPT); + messageObject.put("tree", QaTreeSerializeUtil.serialize(conversationSession.getQaTree())); + messageObject.put("input", retryObject.toString()); + + log.info("处理重试消息 - sessionId: {}, nodeId: {}, 原因: {}", sessionId, nodeId, whyRetry); + return messageObject.toString(); } catch (JsonProcessingException e) { - log.error("处理重试消息失败 - 会话: {}, 错误: {}", sessionId, e.getMessage(), e); - throw new RuntimeException("重试消息处理失败", e); - } - } - - @Override - public void processAndSendMessage(ConversationSession session, String processedMessage) { - try { - log.info("发送消息给AI服务 - 会话: {}, 用户: {}", session.getSessionId(), session.getUserId()); - - conversationService.processUserMessage( - session.getUserId(), - processedMessage, - response -> sseNotificationService.sendSseMessage(session.getSessionId(), response) - ); - - log.info("消息发送成功 - 会话: {}", session.getSessionId()); - } catch (Exception e) { - log.error("发送消息失败 - 会话: {}, 错误: {}", session.getSessionId(), e.getMessage(), e); - throw new RuntimeException("消息发送失败: " + e.getMessage(), e); - } - } - - } \ No newline at end of file + log.error("处理重试消息失败 - sessionId: {}, nodeId: {}, 错误: {}", sessionId, nodeId, e.getMessage(), e); + throw new RuntimeException("重试消息处理失败", e); + } + } + + @Override + public void processAndSendMessage(ConversationSession session, String processedMessage) { + try { + log.info("发送消息给AI服务 - sessionId: {}", session.getSessionId()); + + // 调用ConversationService处理消息 + conversationService.processUserMessage( + session.getUserId(), + processedMessage, + response -> { + log.info("AI服务响应成功 - sessionId: {}", session.getSessionId()); + + try { + // 通过SSE发送AI响应给前端 + // 注意:这里需要获取用户指纹来发送SSE消息 + // 由于当前方法没有直接访问指纹的方式,需要通过session的userId来查找 + // 暂时使用userId作为指纹标识(需要根据实际业务逻辑调整) + String fingerprint = session.getUserId(); + + // 构建要发送的响应数据,只包含用户指定的字段 + JSONObject responseData = new JSONObject(); + responseData.put("parentNodeId", response.getParentId()); + responseData.put("currentNodeId", session.getCurrentNode()); + responseData.put("sessionId", session.getSessionId()); + responseData.put("updateTime", session.getUpdateTime()); + responseData.put("question", response.getQuestion()); + + sseNotificationService + .sendSuccessMessage(fingerprint,responseData.toJSONString()); + // 这里需要注入SseNotificationService + log.info("通过SSE发送AI响应 - 指纹: {}, 响应: {}", fingerprint, responseData); + + } catch (Exception sseError) { + log.error("SSE消息发送失败 - sessionId: {}, 错误: {}", session.getSessionId(), sseError.getMessage(), sseError); + } + } + ); + + } catch (Exception e) { + log.error("发送消息给AI服务失败 - sessionId: {}, 错误: {}", session.getSessionId(), e.getMessage(), e); + + // 通过SSE发送错误消息给前端 + try { + String fingerprint = session.getUserId(); + JSONObject errorData = new JSONObject(); + errorData.put("error", true); + errorData.put("message", "AI服务调用失败: " + e.getMessage()); + errorData.put("sessionId", session.getSessionId()); + errorData.put("currentNodeId", session.getCurrentNode()); + + sseNotificationService.sendErrorMessage(fingerprint, errorData.toJSONString()); + log.info("通过SSE发送错误消息 - 指纹: {}, 错误: {}", fingerprint, errorData); + } catch (Exception sseError) { + log.error("SSE错误消息发送失败 - sessionId: {}, 错误: {}", session.getSessionId(), sseError.getMessage(), sseError); + } + + } + } + +} \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/domain/entity/ConversationSession.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/domain/entity/ConversationSession.java index fd21e5a..370db2a 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/domain/entity/ConversationSession.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/domain/entity/ConversationSession.java @@ -24,12 +24,16 @@ public class ConversationSession { private QaTree qaTree; // 移除final,允许后续设置 private final LocalDateTime createTime; private LocalDateTime updateTime; - - private String user; //用户画像 - private String userTarget; // 用户目标 - private String aiModel; // AI模型 - - private String genPrompt; // 生成提示词 + + // 当前节点ID,用于表示当前question和answer处于哪个节点 + private String currentNode; + + // 用户信息 + private String user; + // 用户目标 + private String userTarget; + // AI模型 + private String aiModel; // 节点ID自增计数器,从1开始 private final AtomicInteger nodeIdCounter = new AtomicInteger(0); @@ -40,6 +44,10 @@ public ConversationSession(String userId, String sessionId, QaTree qaTree) { this.userId = userId; this.createTime = LocalDateTime.now(); this.updateTime = LocalDateTime.now(); + // 设置默认值 + this.user = userId; + this.userTarget = "通用对话"; + this.aiModel = "gpt-4-turbo"; } /** @@ -49,12 +57,4 @@ public ConversationSession(String userId, String sessionId, QaTree qaTree) { public String getNextNodeId() { return String.valueOf(nodeIdCounter.incrementAndGet()); } - - /** - * 获取节点ID计数器 - * @return 节点ID计数器 - */ - public AtomicInteger getNodeIdCounter() { - return nodeIdCounter; - } } \ No newline at end of file diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java index 8c728ce..7033995 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/GenPromptRequest.java @@ -14,10 +14,6 @@ public class GenPromptRequest { private String sessionId; - - private String nodeId; - - /** * 原始答案数据 * - 单选/多选:List diff --git a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java index c3ddc38..0bd2051 100644 --- a/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java +++ b/prompto-lab-app/src/main/java/io/github/timemachinelab/core/session/infrastructure/web/dto/UnifiedAnswerRequest.java @@ -27,11 +27,6 @@ public class UnifiedAnswerRequest { */ private String sessionId; - /** - * 节点ID - */ - private String nodeId; - /** * 问题类型:single, multi, input, form */ @@ -55,11 +50,6 @@ public class UnifiedAnswerRequest { private String userPortrait; private String aiModel; - /** - * 用户ID - */ - @NotBlank(message = "用户ID不能为空") - private String userId; /** * 表单答案项 */ diff --git a/prompto-lab-ui/src/components/Chat/AIChatPage.vue b/prompto-lab-ui/src/components/Chat/AIChatPage.vue index bd0b5b4..d26f0da 100644 --- a/prompto-lab-ui/src/components/Chat/AIChatPage.vue +++ b/prompto-lab-ui/src/components/Chat/AIChatPage.vue @@ -11,24 +11,46 @@ -