From 4e454affcc96b407f1ef40dc798288fd266b38ea Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 16:24:40 +0800 Subject: [PATCH 01/14] =?UTF-8?q?fix(session):=20=E7=BB=88=E6=AD=A2?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E5=86=99=E5=85=A5=E7=BB=88=E6=AD=A2=E6=A0=87?= =?UTF-8?q?=E8=AE=B0=E5=B9=B6=E5=BD=BB=E5=BA=95=E6=B8=85=E7=90=86=20Redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib/session-manager.ts | 162 ++++++++++++++---- .../session-manager-terminate-session.test.ts | 33 +++- .../session-manager-terminated-remap.test.ts | 130 ++++++++++++++ 3 files changed, 292 insertions(+), 33 deletions(-) create mode 100644 tests/unit/lib/session-manager-terminated-remap.test.ts diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index e29256f6c..98d5937a5 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -105,6 +105,71 @@ export class SessionManager { ); // 短上下文阈值 private static readonly ENABLE_SHORT_CONTEXT_DETECTION = process.env.ENABLE_SHORT_CONTEXT_DETECTION !== "false"; // 默认启用 + private static readonly TERMINATED_SESSION_TTL = (() => { + const raw = + process.env.SESSION_TERMINATION_TTL_SECONDS ?? + process.env.SESSION_TERMINATION_TTL ?? + process.env.TERMINATED_SESSION_TTL ?? + ""; + const parsed = Number.parseInt(raw, 10); + if (Number.isFinite(parsed) && parsed > 0) { + return parsed; + } + return 24 * 60 * 60; // 1 天 + })(); + + private static getTerminationMarkerKey(sessionId: string): string { + return `session:${sessionId}:terminated`; + } + + private static getTerminationReplacementKey(sessionId: string): string { + return `session:${sessionId}:terminated_replacement`; + } + + private static async resolveSessionIdIfTerminated(sessionId: string): Promise<{ + sessionId: string; + remapped: boolean; + terminatedSessionId: string; + }> { + const redis = getRedisClient(); + if (!redis || redis.status !== "ready") { + return { sessionId, remapped: false, terminatedSessionId: sessionId }; + } + + const terminatedKey = SessionManager.getTerminationMarkerKey(sessionId); + const replacementKey = SessionManager.getTerminationReplacementKey(sessionId); + + try { + const values = await redis.mget(terminatedKey, replacementKey); + const terminatedAt = values?.[0] ?? null; + const replacement = values?.[1] ?? null; + + if (!terminatedAt) { + return { sessionId, remapped: false, terminatedSessionId: sessionId }; + } + + if (replacement && typeof replacement === "string" && replacement.length > 0) { + return { sessionId: replacement, remapped: true, terminatedSessionId: sessionId }; + } + + const newSessionId = SessionManager.generateSessionId(); + const ttlSeconds = SessionManager.TERMINATED_SESSION_TTL; + + await redis + .pipeline() + .setex(replacementKey, ttlSeconds, newSessionId) + .expire(terminatedKey, ttlSeconds) + .exec(); + + return { sessionId: newSessionId, remapped: true, terminatedSessionId: sessionId }; + } catch (error) { + logger.error("SessionManager: Failed to resolve terminated session", { + error, + sessionId, + }); + return { sessionId, remapped: false, terminatedSessionId: sessionId }; + } + } /** * 获取 STORE_SESSION_MESSAGES 配置 @@ -358,19 +423,29 @@ export class SessionManager { // 1. 优先使用客户端传递的 session_id (来自 metadata.user_id 或 metadata.session_id) if (clientSessionId) { + const resolved = await SessionManager.resolveSessionIdIfTerminated(clientSessionId); + const effectiveSessionId = resolved.sessionId; + + if (resolved.remapped) { + logger.info("SessionManager: Remapped terminated session", { + terminatedSessionId: resolved.terminatedSessionId, + sessionId: effectiveSessionId, + }); + } + // 2. 短上下文并发检测(方案E) if ( SessionManager.ENABLE_SHORT_CONTEXT_DETECTION && messagesLength <= SessionManager.SHORT_CONTEXT_THRESHOLD ) { // 检查该 session 是否有其他请求正在运行 - const concurrentCount = await SessionTracker.getConcurrentCount(clientSessionId); + const concurrentCount = await SessionTracker.getConcurrentCount(effectiveSessionId); if (concurrentCount > 0) { // 场景B:有并发请求 → 这是并发短任务 → 强制新建 session const newId = SessionManager.generateSessionId(); logger.info("SessionManager: 检测到并发短任务,强制新建 session", { - originalSessionId: clientSessionId, + originalSessionId: effectiveSessionId, newSessionId: newId, messagesLength, existingConcurrentCount: concurrentCount, @@ -380,22 +455,22 @@ export class SessionManager { // 场景A:无并发 → 这可能是长对话的开始 → 允许复用 logger.debug("SessionManager: 短上下文但 session 空闲,允许复用(长对话开始)", { - sessionId: clientSessionId, + sessionId: effectiveSessionId, messagesLength, }); } // 3. 长上下文 or 无并发 → 正常复用 logger.debug("SessionManager: Using client-provided session", { - sessionId: clientSessionId, + sessionId: effectiveSessionId, }); // 刷新 TTL(滑动窗口) if (redis && redis.status === "ready") { - await SessionManager.refreshSessionTTL(clientSessionId).catch((err) => { + await SessionManager.refreshSessionTTL(effectiveSessionId).catch((err) => { logger.error("SessionManager: Failed to refresh TTL", { error: err }); }); } - return clientSessionId; + return effectiveSessionId; } // 2. 降级方案:计算 messages 内容哈希(TC-047 警告:不可靠) @@ -420,9 +495,14 @@ export class SessionManager { if (redis && redis.status === "ready") { try { const hashKey = `hash:${contentHash}:session`; - const existingSessionId = await redis.get(hashKey); + let existingSessionId = await redis.get(hashKey); if (existingSessionId) { + const resolved = await SessionManager.resolveSessionIdIfTerminated(existingSessionId); + if (resolved.remapped) { + existingSessionId = resolved.sessionId; + await redis.setex(hashKey, SessionManager.SESSION_TTL, existingSessionId); + } // 找到已有 session,刷新 TTL await SessionManager.refreshSessionTTL(existingSessionId); logger.trace("SessionManager: Reusing session via hash", { @@ -1941,6 +2021,18 @@ export class SessionManager { } try { + const terminatedKey = SessionManager.getTerminationMarkerKey(sessionId); + const terminatedAt = Date.now().toString(); + const ttlSeconds = SessionManager.TERMINATED_SESSION_TTL; + + // 0. 标记终止(优先写入,避免并发请求在清理窗口内复活) + const markerResult = await redis.set(terminatedKey, terminatedAt, "EX", ttlSeconds); + const markerOk = markerResult === "OK"; + + if (!markerOk) { + logger.warn("SessionManager: Failed to set termination marker", { sessionId }); + } + // 1. 先查询绑定信息(用于从 ZSET 中移除) let providerId: number | null = null; let keyId: number | null = null; @@ -1971,21 +2063,8 @@ export class SessionManager { ); } - // 2. 删除所有 Session 相关的 key + // 2. 从 ZSET 中移除(始终尝试,即使查询失败) const pipeline = redis.pipeline(); - - // 基础绑定信息 - pipeline.del(`session:${sessionId}:provider`); - pipeline.del(`session:${sessionId}:key`); - pipeline.del(`session:${sessionId}:info`); - pipeline.del(`session:${sessionId}:last_seen`); - pipeline.del(`session:${sessionId}:concurrent_count`); - - // 可选:messages 和 response(如果启用了存储) - pipeline.del(`session:${sessionId}:messages`); - pipeline.del(`session:${sessionId}:response`); - - // 3. 从 ZSET 中移除(始终尝试,即使查询失败) pipeline.zrem(getGlobalActiveSessionsKey(), sessionId); if (providerId) { @@ -2000,30 +2079,51 @@ export class SessionManager { pipeline.zrem(getUserActiveSessionsKey(userId), sessionId); } - // 4. 删除 hash 映射(如果存在) - // 注意:无法直接反查 hash,只能清理已知的 session key - // hash 会在 TTL 后自动过期,不影响功能 - - const results = await pipeline.exec(); + await pipeline.exec(); - // 5. 检查结果 + // 3. 删除 session:* 相关 key(包含 req:* 新格式,避免终止后仍能查看/复活) let deletedKeys = 0; - if (results) { - for (const [err, result] of results) { + let cursor = "0"; + const matchPattern = `session:${sessionId}:*`; + + do { + const scanResult = (await redis.scan(cursor, "MATCH", matchPattern, "COUNT", 200)) as [ + string, + string[], + ]; + const nextCursor = scanResult[0]; + const keys = scanResult[1] ?? []; + cursor = nextCursor; + + if (keys.length === 0) continue; + + const deletePipeline = redis.pipeline(); + for (const key of keys) { + if (key === terminatedKey) continue; + deletePipeline.del(key); + } + + const deleteResults = await deletePipeline.exec(); + if (!deleteResults) continue; + + for (const [err, result] of deleteResults) { if (!err && typeof result === "number" && result > 0) { deletedKeys += result; } } - } + } while (cursor !== "0"); logger.info("SessionManager: Terminated session", { sessionId, providerId, keyId, + userId, deletedKeys, + terminatedAt, + markerOk, }); - return deletedKeys > 0; + return markerOk || deletedKeys > 0; } catch (error) { logger.error("SessionManager: Failed to terminate session", { error, diff --git a/tests/unit/lib/session-manager-terminate-session.test.ts b/tests/unit/lib/session-manager-terminate-session.test.ts index f4de279ac..af4a0af08 100644 --- a/tests/unit/lib/session-manager-terminate-session.test.ts +++ b/tests/unit/lib/session-manager-terminate-session.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let redisClientRef: any; let pipelineRef: any; +let deletePipelineRef: any; vi.mock("server-only", () => ({})); @@ -25,21 +26,32 @@ describe("SessionManager.terminateSession", () => { vi.resetModules(); pipelineRef = { - del: vi.fn(() => pipelineRef), zrem: vi.fn(() => pipelineRef), + exec: vi.fn(async () => []), + }; + + deletePipelineRef = { + del: vi.fn(() => deletePipelineRef), exec: vi.fn(async () => [[null, 1]]), }; redisClientRef = { status: "ready", + set: vi.fn(async () => "OK"), get: vi.fn(async () => null), hget: vi.fn(async () => null), - pipeline: vi.fn(() => pipelineRef), + scan: vi.fn(async () => ["0", []]), + mget: vi.fn(async () => [null, null]), + pipeline: vi + .fn() + .mockImplementationOnce(() => pipelineRef) + .mockImplementationOnce(() => deletePipelineRef), }; }); it("应同时从 global/key/user 的 active_sessions ZSET 中移除 sessionId(若可解析到 userId)", async () => { const sessionId = "sess_test"; + const terminatedKey = `session:${sessionId}:terminated`; redisClientRef.get.mockImplementation(async (key: string) => { if (key === `session:${sessionId}:provider`) return "42"; if (key === `session:${sessionId}:key`) return "7"; @@ -49,6 +61,15 @@ describe("SessionManager.terminateSession", () => { if (key === `session:${sessionId}:info` && field === "userId") return "123"; return null; }); + redisClientRef.scan.mockResolvedValueOnce([ + "0", + [ + terminatedKey, + `session:${sessionId}:provider`, + `session:${sessionId}:req:1:messages`, + `session:${sessionId}:req:1:response`, + ], + ]); const { getGlobalActiveSessionsKey, getKeyActiveSessionsKey, getUserActiveSessionsKey } = await import("@/lib/redis/active-session-keys"); @@ -57,22 +78,30 @@ describe("SessionManager.terminateSession", () => { const ok = await SessionManager.terminateSession(sessionId); expect(ok).toBe(true); + expect(redisClientRef.set).toHaveBeenCalledWith(terminatedKey, expect.any(String), "EX", 86400); expect(redisClientRef.hget).toHaveBeenCalledWith(`session:${sessionId}:info`, "userId"); expect(pipelineRef.zrem).toHaveBeenCalledWith(getGlobalActiveSessionsKey(), sessionId); expect(pipelineRef.zrem).toHaveBeenCalledWith("provider:42:active_sessions", sessionId); expect(pipelineRef.zrem).toHaveBeenCalledWith(getKeyActiveSessionsKey(7), sessionId); expect(pipelineRef.zrem).toHaveBeenCalledWith(getUserActiveSessionsKey(123), sessionId); + + expect(deletePipelineRef.del).toHaveBeenCalledWith(`session:${sessionId}:provider`); + expect(deletePipelineRef.del).toHaveBeenCalledWith(`session:${sessionId}:req:1:messages`); + expect(deletePipelineRef.del).toHaveBeenCalledWith(`session:${sessionId}:req:1:response`); + expect(deletePipelineRef.del).not.toHaveBeenCalledWith(terminatedKey); }); it("当 userId 不可用时,不应尝试 zrem user active_sessions key", async () => { const sessionId = "sess_test"; + const terminatedKey = `session:${sessionId}:terminated`; redisClientRef.get.mockImplementation(async (key: string) => { if (key === `session:${sessionId}:provider`) return "42"; if (key === `session:${sessionId}:key`) return "7"; return null; }); redisClientRef.hget.mockResolvedValue(null); + redisClientRef.scan.mockResolvedValueOnce(["0", [terminatedKey]]); const { getUserActiveSessionsKey } = await import("@/lib/redis/active-session-keys"); const { SessionManager } = await import("@/lib/session-manager"); diff --git a/tests/unit/lib/session-manager-terminated-remap.test.ts b/tests/unit/lib/session-manager-terminated-remap.test.ts new file mode 100644 index 000000000..465b7e1cb --- /dev/null +++ b/tests/unit/lib/session-manager-terminated-remap.test.ts @@ -0,0 +1,130 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +let redisClientRef: any; +const pipelineCalls: Array = []; + +vi.mock("server-only", () => ({})); + +vi.mock("@/lib/logger", () => ({ + logger: { + warn: vi.fn(), + info: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + trace: vi.fn(), + }, +})); + +vi.mock("@/app/v1/_lib/proxy/errors", () => ({ + sanitizeHeaders: vi.fn(() => "(empty)"), + sanitizeUrl: vi.fn((url: unknown) => String(url)), +})); + +vi.mock("@/lib/session-tracker", () => ({ + SessionTracker: { + getConcurrentCount: vi.fn(async () => 0), + }, +})); + +vi.mock("@/lib/redis", () => ({ + getRedisClient: () => redisClientRef, +})); + +function makePipeline() { + const pipeline = { + setex: vi.fn((...args: unknown[]) => { + pipelineCalls.push(["setex", ...args]); + return pipeline; + }), + expire: vi.fn((...args: unknown[]) => { + pipelineCalls.push(["expire", ...args]); + return pipeline; + }), + exec: vi.fn(async () => { + pipelineCalls.push(["exec"]); + return []; + }), + }; + return pipeline; +} + +describe("SessionManager.getOrCreateSessionId - terminated remap", () => { + beforeEach(() => { + vi.resetAllMocks(); + vi.resetModules(); + pipelineCalls.length = 0; + + redisClientRef = { + status: "ready", + mget: vi.fn(async () => [null, null]), + pipeline: vi.fn(() => makePipeline()), + }; + }); + + it("未终止时应保持原 sessionId", async () => { + const { SessionManager } = await import("@/lib/session-manager"); + + const keyId = 1; + const oldSessionId = "sess_old"; + const messages = [{ role: "user", content: "hi" }, { role: "assistant", content: "ok" }, {}]; + + const sessionId = await SessionManager.getOrCreateSessionId(keyId, messages, oldSessionId); + + expect(sessionId).toBe(oldSessionId); + expect(redisClientRef.mget).toHaveBeenCalledWith( + `session:${oldSessionId}:terminated`, + `session:${oldSessionId}:terminated_replacement` + ); + expect( + pipelineCalls.some( + (c) => c[0] === "setex" && c[1] === `session:${oldSessionId}:terminated_replacement` + ) + ).toBe(false); + }); + + it("已终止且存在 replacement 时应返回 replacement sessionId", async () => { + const keyId = 1; + const oldSessionId = "sess_old"; + const replacementSessionId = "sess_new"; + redisClientRef.mget.mockResolvedValueOnce(["1", replacementSessionId]); + + const { SessionManager } = await import("@/lib/session-manager"); + + const sessionId = await SessionManager.getOrCreateSessionId(keyId, [], oldSessionId); + + expect(sessionId).toBe(replacementSessionId); + expect( + pipelineCalls.some( + (c) => c[0] === "setex" && c[1] === `session:${oldSessionId}:terminated_replacement` + ) + ).toBe(false); + }); + + it("已终止但无 replacement 时应生成并持久化 replacement", async () => { + const keyId = 1; + const oldSessionId = "sess_old"; + redisClientRef.mget.mockResolvedValueOnce(["1", null]); + + const { SessionManager } = await import("@/lib/session-manager"); + + const sessionId = await SessionManager.getOrCreateSessionId(keyId, [], oldSessionId); + + expect(sessionId).not.toBe(oldSessionId); + expect(sessionId).toMatch(/^sess_/); + + expect( + pipelineCalls.some( + (c) => + c[0] === "setex" && + c[1] === `session:${oldSessionId}:terminated_replacement` && + c[2] === 86400 && + c[3] === sessionId + ) + ).toBe(true); + expect( + pipelineCalls.some( + (c) => c[0] === "expire" && c[1] === `session:${oldSessionId}:terminated` && c[2] === 86400 + ) + ).toBe(true); + }); +}); From ce476390c32f507a46317175ee0e83e618defa6c Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 16:24:53 +0800 Subject: [PATCH 02/14] =?UTF-8?q?test:=20=E4=BF=AE=E5=A4=8D=20priorities?= =?UTF-8?q?=20batch=20=E4=B8=8E=E5=AE=A2=E6=88=B7=E7=AB=AF=E9=99=90?= =?UTF-8?q?=E5=88=B6=E7=BC=96=E8=BE=91=E5=99=A8=E5=8D=95=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../form/client-restrictions-editor.test.tsx | 105 ++++++------------ tests/unit/repository/provider.test.ts | 4 +- 2 files changed, 39 insertions(+), 70 deletions(-) diff --git a/tests/unit/components/form/client-restrictions-editor.test.tsx b/tests/unit/components/form/client-restrictions-editor.test.tsx index f9df775a4..87d283830 100644 --- a/tests/unit/components/form/client-restrictions-editor.test.tsx +++ b/tests/unit/components/form/client-restrictions-editor.test.tsx @@ -7,9 +7,13 @@ import { act } from "react"; import { createRoot } from "react-dom/client"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -vi.mock("@/lib/client-restrictions/client-presets", () => ({ - CLIENT_RESTRICTION_PRESET_OPTIONS: [], -})); +vi.mock("@/lib/client-restrictions/client-presets", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + CLIENT_RESTRICTION_PRESET_OPTIONS: [], + }; +}); vi.mock("@/components/ui/tag-input", () => ({ TagInput: vi.fn(() => null), @@ -40,10 +44,21 @@ function getTagInputOnChange(callIndex: number): (values: string[]) => void { return (call[0] as TagInputProps).onChange; } -describe("ClientRestrictionsEditor", () => { +describe("ClientRestrictionsEditor - custom clients", () => { const onAllowedChange = vi.fn(); const onBlockedChange = vi.fn(); + const translations = { + allowAction: "Allow", + blockAction: "Block", + customAllowedLabel: "Custom Allowed", + customAllowedPlaceholder: "", + customBlockedLabel: "Custom Blocked", + customBlockedPlaceholder: "", + customHelp: "", + presetClients: {}, + }; + beforeEach(() => { vi.clearAllMocks(); }); @@ -61,75 +76,29 @@ describe("ClientRestrictionsEditor", () => { blocked={blocked} onAllowedChange={onAllowedChange} onBlockedChange={onBlockedChange} - allowedLabel="Allowed" - blockedLabel="Blocked" - getPresetLabel={(v) => v} + translations={translations} /> ); } - describe("uniqueOrdered normalization", () => { - it("deduplicates values preserving first occurrence order", () => { - const unmount = renderEditor([], []); - act(() => getTagInputOnChange(0)(["a", "b", "a", "c"])); - expect(onAllowedChange).toHaveBeenCalledWith(["a", "b", "c"]); - unmount(); - }); - - it("trims whitespace from values", () => { - const unmount = renderEditor([], []); - act(() => getTagInputOnChange(0)([" a ", " b", "c "])); - expect(onAllowedChange).toHaveBeenCalledWith(["a", "b", "c"]); - unmount(); - }); - - it("filters out empty and whitespace-only entries", () => { - const unmount = renderEditor([], []); - act(() => getTagInputOnChange(0)(["a", "", " ", "b"])); - expect(onAllowedChange).toHaveBeenCalledWith(["a", "b"]); - unmount(); - }); + it("custom allowed: should deduplicate values preserving order", () => { + const unmount = renderEditor([], []); + + act(() => getTagInputOnChange(0)(["a", "b", "a", "c"])); + + expect(onAllowedChange).toHaveBeenCalledWith(["a", "b", "c"]); + expect(onBlockedChange).not.toHaveBeenCalled(); + unmount(); }); - describe("allow/block mutual exclusion", () => { - it("removes overlapping items from blocked when added to allowed", () => { - const unmount = renderEditor([], ["b", "c"]); - act(() => getTagInputOnChange(0)(["a", "b"])); - expect(onAllowedChange).toHaveBeenCalledWith(["a", "b"]); - expect(onBlockedChange).toHaveBeenCalledWith(["c"]); - unmount(); - }); - - it("does not call onBlockedChange when allowed has no overlap with blocked", () => { - const unmount = renderEditor([], ["c", "d"]); - act(() => getTagInputOnChange(0)(["a", "b"])); - expect(onAllowedChange).toHaveBeenCalledWith(["a", "b"]); - expect(onBlockedChange).not.toHaveBeenCalled(); - unmount(); - }); - - it("removes overlapping items from allowed when added to blocked", () => { - const unmount = renderEditor(["a", "b"], []); - act(() => getTagInputOnChange(1)(["b", "c"])); - expect(onBlockedChange).toHaveBeenCalledWith(["b", "c"]); - expect(onAllowedChange).toHaveBeenCalledWith(["a"]); - unmount(); - }); - - it("does not call onAllowedChange when blocked has no overlap with allowed", () => { - const unmount = renderEditor(["a", "b"], []); - act(() => getTagInputOnChange(1)(["c", "d"])); - expect(onBlockedChange).toHaveBeenCalledWith(["c", "d"]); - expect(onAllowedChange).not.toHaveBeenCalled(); - unmount(); - }); - - it("clears all blocked when all items are moved to allowed", () => { - const unmount = renderEditor([], ["x", "y"]); - act(() => getTagInputOnChange(0)(["x", "y", "z"])); - expect(onAllowedChange).toHaveBeenCalledWith(["x", "y", "z"]); - expect(onBlockedChange).toHaveBeenCalledWith([]); - unmount(); - }); + it("custom blocked: should deduplicate values preserving order", () => { + const unmount = renderEditor([], []); + + act(() => getTagInputOnChange(1)(["x", "x", "y"])); + + expect(onBlockedChange).toHaveBeenCalledWith(["x", "y"]); + expect(onAllowedChange).not.toHaveBeenCalled(); + unmount(); }); }); + diff --git a/tests/unit/repository/provider.test.ts b/tests/unit/repository/provider.test.ts index 694c29e98..ef9e6b7f5 100644 --- a/tests/unit/repository/provider.test.ts +++ b/tests/unit/repository/provider.test.ts @@ -69,7 +69,7 @@ describe("provider repository - updateProviderPrioritiesBatch", () => { test("generates CASE batch update SQL and returns affected rows", async () => { vi.resetModules(); - const executeMock = vi.fn(async () => ({ rowCount: 2 })); + const executeMock = vi.fn(async () => ({ count: 2 })); vi.doMock("@/drizzle/db", () => ({ db: { @@ -101,7 +101,7 @@ describe("provider repository - updateProviderPrioritiesBatch", () => { test("deduplicates provider ids (last update wins)", async () => { vi.resetModules(); - const executeMock = vi.fn(async () => ({ rowCount: 1 })); + const executeMock = vi.fn(async () => ({ count: 1 })); vi.doMock("@/drizzle/db", () => ({ db: { From 2f2f6c52cca68b10a7b98c5e47a41356705c8bf6 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 25 Feb 2026 08:25:40 +0000 Subject: [PATCH 03/14] chore: format code (fix-session-terminate-ce47639) --- tests/unit/components/form/client-restrictions-editor.test.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/components/form/client-restrictions-editor.test.tsx b/tests/unit/components/form/client-restrictions-editor.test.tsx index 87d283830..3241235b2 100644 --- a/tests/unit/components/form/client-restrictions-editor.test.tsx +++ b/tests/unit/components/form/client-restrictions-editor.test.tsx @@ -101,4 +101,3 @@ describe("ClientRestrictionsEditor - custom clients", () => { unmount(); }); }); - From 3ea25ec1dd8408c38f13f0b32092d144e9f8a798 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 16:52:11 +0800 Subject: [PATCH 04/14] =?UTF-8?q?fix(session):=20=E7=BB=88=E6=AD=A2?= =?UTF-8?q?=E5=90=8E=E9=98=BB=E6=96=AD=E6=97=A7=20sessionId=20=E5=B9=B6?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=20410?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/session-guard.ts | 11 +- src/lib/session-manager.ts | 116 +++++++-------- .../session-manager-terminated-remap.test.ts | 93 +++++------- .../session-guard-terminated-session.test.ts | 140 ++++++++++++++++++ .../session-guard-warmup-intercept.test.ts | 13 ++ 5 files changed, 252 insertions(+), 121 deletions(-) create mode 100644 tests/unit/proxy/session-guard-terminated-session.test.ts diff --git a/src/app/v1/_lib/proxy/session-guard.ts b/src/app/v1/_lib/proxy/session-guard.ts index b2685555a..de07aaa16 100644 --- a/src/app/v1/_lib/proxy/session-guard.ts +++ b/src/app/v1/_lib/proxy/session-guard.ts @@ -1,9 +1,10 @@ import { getCachedSystemSettings } from "@/lib/config"; import { logger } from "@/lib/logger"; import { resolveKeyUserConcurrentSessionLimits } from "@/lib/rate-limit/concurrent-session-limit"; -import { SessionManager } from "@/lib/session-manager"; +import { SessionManager, TerminatedSessionError } from "@/lib/session-manager"; import { SessionTracker } from "@/lib/session-tracker"; import { completeCodexSessionIdentifiers } from "../codex/session-completer"; +import { ProxyError } from "./errors"; import type { ProxySession } from "./session"; /** @@ -178,6 +179,14 @@ export class ProxySessionGuard { `[ProxySessionGuard] Session assigned: ${sessionId}:${requestSequence} (key=${keyId}, messagesLength=${session.getMessagesLength()}, clientProvided=${!!clientSessionId})` ); } catch (error) { + if (error instanceof TerminatedSessionError) { + logger.info("[ProxySessionGuard] Client session was terminated, blocking request", { + sessionId: error.sessionId, + terminatedAt: error.terminatedAt, + }); + throw new ProxyError("Session 已被终止,请创建新的会话后重试", 410); + } + logger.error("[ProxySessionGuard] Failed to assign session:", error); // 降级:生成新 session(不阻塞请求) const fallbackSessionId = SessionManager.generateSessionId(); diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index 98d5937a5..993a08d42 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -78,6 +78,16 @@ function parseHeaderRecord(value: string): Record | null { } } +export class TerminatedSessionError extends Error { + constructor( + public readonly sessionId: string, + public readonly terminatedAt: string | null = null + ) { + super("Session has been terminated"); + this.name = "TerminatedSessionError"; + } +} + type SessionRequestMeta = { url: string; method: string; @@ -122,52 +132,23 @@ export class SessionManager { return `session:${sessionId}:terminated`; } - private static getTerminationReplacementKey(sessionId: string): string { - return `session:${sessionId}:terminated_replacement`; - } - - private static async resolveSessionIdIfTerminated(sessionId: string): Promise<{ - sessionId: string; - remapped: boolean; - terminatedSessionId: string; - }> { - const redis = getRedisClient(); - if (!redis || redis.status !== "ready") { - return { sessionId, remapped: false, terminatedSessionId: sessionId }; - } - + private static async readTerminationMarker( + redis: any, + sessionId: string + ): Promise { const terminatedKey = SessionManager.getTerminationMarkerKey(sessionId); - const replacementKey = SessionManager.getTerminationReplacementKey(sessionId); - try { - const values = await redis.mget(terminatedKey, replacementKey); - const terminatedAt = values?.[0] ?? null; - const replacement = values?.[1] ?? null; - - if (!terminatedAt) { - return { sessionId, remapped: false, terminatedSessionId: sessionId }; - } - - if (replacement && typeof replacement === "string" && replacement.length > 0) { - return { sessionId: replacement, remapped: true, terminatedSessionId: sessionId }; + const value = await redis.get(terminatedKey); + if (typeof value !== "string" || value.length === 0) { + return null; } - - const newSessionId = SessionManager.generateSessionId(); - const ttlSeconds = SessionManager.TERMINATED_SESSION_TTL; - - await redis - .pipeline() - .setex(replacementKey, ttlSeconds, newSessionId) - .expire(terminatedKey, ttlSeconds) - .exec(); - - return { sessionId: newSessionId, remapped: true, terminatedSessionId: sessionId }; + return value; } catch (error) { - logger.error("SessionManager: Failed to resolve terminated session", { + logger.error("SessionManager: Failed to read termination marker", { error, sessionId, }); - return { sessionId, remapped: false, terminatedSessionId: sessionId }; + return null; } } @@ -423,16 +404,21 @@ export class SessionManager { // 1. 优先使用客户端传递的 session_id (来自 metadata.user_id 或 metadata.session_id) if (clientSessionId) { - const resolved = await SessionManager.resolveSessionIdIfTerminated(clientSessionId); - const effectiveSessionId = resolved.sessionId; - - if (resolved.remapped) { - logger.info("SessionManager: Remapped terminated session", { - terminatedSessionId: resolved.terminatedSessionId, - sessionId: effectiveSessionId, - }); + if (redis && redis.status === "ready") { + const terminatedAt = await SessionManager.readTerminationMarker(redis, clientSessionId); + if (terminatedAt) { + logger.info("SessionManager: Rejected terminated client session", { + keyId, + sessionId: clientSessionId, + terminatedAt, + messagesLength, + }); + throw new TerminatedSessionError(clientSessionId, terminatedAt); + } } + const effectiveSessionId = clientSessionId; + // 2. 短上下文并发检测(方案E) if ( SessionManager.ENABLE_SHORT_CONTEXT_DETECTION && @@ -498,18 +484,28 @@ export class SessionManager { let existingSessionId = await redis.get(hashKey); if (existingSessionId) { - const resolved = await SessionManager.resolveSessionIdIfTerminated(existingSessionId); - if (resolved.remapped) { - existingSessionId = resolved.sessionId; - await redis.setex(hashKey, SessionManager.SESSION_TTL, existingSessionId); + const terminatedAt = await SessionManager.readTerminationMarker(redis, existingSessionId); + if (terminatedAt) { + logger.info( + "SessionManager: Hash hit but session was terminated, creating new session", + { + existingSessionId, + terminatedAt, + hash: contentHash, + } + ); + existingSessionId = null; + } + + if (existingSessionId) { + // 找到已有 session,刷新 TTL + await SessionManager.refreshSessionTTL(existingSessionId); + logger.trace("SessionManager: Reusing session via hash", { + sessionId: existingSessionId, + hash: contentHash, + }); + return existingSessionId; } - // 找到已有 session,刷新 TTL - await SessionManager.refreshSessionTTL(existingSessionId); - logger.trace("SessionManager: Reusing session via hash", { - sessionId: existingSessionId, - hash: contentHash, - }); - return existingSessionId; } // 未找到:创建新 session @@ -2007,8 +2003,8 @@ export class SessionManager { /** * 终止 Session(主动打断) * - * 功能:删除 Session 在 Redis 中的所有绑定关系,强制下次请求重新选择供应商 - * 用途:管理员主动打断长时间占用同一供应商的 Session + * 功能:写入“终止标记”并清理 Redis 中所有 session:{id}:* 相关 key + * 影响:客户端后续继续携带同一 sessionId 时,将被阻断(getOrCreateSessionId 抛出 TerminatedSessionError) * * @param sessionId - Session ID * @returns 是否成功删除 diff --git a/tests/unit/lib/session-manager-terminated-remap.test.ts b/tests/unit/lib/session-manager-terminated-remap.test.ts index 465b7e1cb..169663e99 100644 --- a/tests/unit/lib/session-manager-terminated-remap.test.ts +++ b/tests/unit/lib/session-manager-terminated-remap.test.ts @@ -1,7 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let redisClientRef: any; -const pipelineCalls: Array = []; vi.mock("server-only", () => ({})); @@ -32,31 +31,20 @@ vi.mock("@/lib/redis", () => ({ function makePipeline() { const pipeline = { - setex: vi.fn((...args: unknown[]) => { - pipelineCalls.push(["setex", ...args]); - return pipeline; - }), - expire: vi.fn((...args: unknown[]) => { - pipelineCalls.push(["expire", ...args]); - return pipeline; - }), - exec: vi.fn(async () => { - pipelineCalls.push(["exec"]); - return []; - }), + setex: vi.fn(() => pipeline), + exec: vi.fn(async () => []), }; return pipeline; } -describe("SessionManager.getOrCreateSessionId - terminated remap", () => { +describe("SessionManager.getOrCreateSessionId - terminated blocking", () => { beforeEach(() => { vi.resetAllMocks(); vi.resetModules(); - pipelineCalls.length = 0; redisClientRef = { status: "ready", - mget: vi.fn(async () => [null, null]), + get: vi.fn(async () => null), pipeline: vi.fn(() => makePipeline()), }; }); @@ -66,65 +54,50 @@ describe("SessionManager.getOrCreateSessionId - terminated remap", () => { const keyId = 1; const oldSessionId = "sess_old"; - const messages = [{ role: "user", content: "hi" }, { role: "assistant", content: "ok" }, {}]; + const messages = [{ role: "user", content: "hi" }]; const sessionId = await SessionManager.getOrCreateSessionId(keyId, messages, oldSessionId); expect(sessionId).toBe(oldSessionId); - expect(redisClientRef.mget).toHaveBeenCalledWith( - `session:${oldSessionId}:terminated`, - `session:${oldSessionId}:terminated_replacement` - ); - expect( - pipelineCalls.some( - (c) => c[0] === "setex" && c[1] === `session:${oldSessionId}:terminated_replacement` - ) - ).toBe(false); + expect(redisClientRef.get).toHaveBeenCalledWith(`session:${oldSessionId}:terminated`); }); - it("已终止且存在 replacement 时应返回 replacement sessionId", async () => { + it("已终止时应拒绝复用并抛出 TerminatedSessionError", async () => { const keyId = 1; const oldSessionId = "sess_old"; - const replacementSessionId = "sess_new"; - redisClientRef.mget.mockResolvedValueOnce(["1", replacementSessionId]); + redisClientRef.get.mockImplementation(async (key: string) => { + if (key === `session:${oldSessionId}:terminated`) return "1"; + return null; + }); - const { SessionManager } = await import("@/lib/session-manager"); - - const sessionId = await SessionManager.getOrCreateSessionId(keyId, [], oldSessionId); + const { SessionManager, TerminatedSessionError } = await import("@/lib/session-manager"); - expect(sessionId).toBe(replacementSessionId); - expect( - pipelineCalls.some( - (c) => c[0] === "setex" && c[1] === `session:${oldSessionId}:terminated_replacement` - ) - ).toBe(false); + await expect( + SessionManager.getOrCreateSessionId(keyId, [], oldSessionId) + ).rejects.toBeInstanceOf(TerminatedSessionError); }); - it("已终止但无 replacement 时应生成并持久化 replacement", async () => { + it("hash 命中已终止 session 时应创建新 session", async () => { const keyId = 1; - const oldSessionId = "sess_old"; - redisClientRef.mget.mockResolvedValueOnce(["1", null]); + const terminatedSessionId = "sess_terminated"; const { SessionManager } = await import("@/lib/session-manager"); + vi.spyOn(SessionManager, "generateSessionId").mockReturnValue("sess_new"); + + redisClientRef.get.mockImplementation(async (key: string) => { + if (key.startsWith("hash:") && key.endsWith(":session")) return terminatedSessionId; + if (key === `session:${terminatedSessionId}:terminated`) return "1"; + return null; + }); + + const messages = [ + { role: "user", content: "hi" }, + { role: "assistant", content: "ok" }, + ]; + + const sessionId = await SessionManager.getOrCreateSessionId(keyId, messages, null); - const sessionId = await SessionManager.getOrCreateSessionId(keyId, [], oldSessionId); - - expect(sessionId).not.toBe(oldSessionId); - expect(sessionId).toMatch(/^sess_/); - - expect( - pipelineCalls.some( - (c) => - c[0] === "setex" && - c[1] === `session:${oldSessionId}:terminated_replacement` && - c[2] === 86400 && - c[3] === sessionId - ) - ).toBe(true); - expect( - pipelineCalls.some( - (c) => c[0] === "expire" && c[1] === `session:${oldSessionId}:terminated` && c[2] === 86400 - ) - ).toBe(true); + expect(sessionId).toBe("sess_new"); + expect(redisClientRef.get).toHaveBeenCalledWith(`session:${terminatedSessionId}:terminated`); }); }); diff --git a/tests/unit/proxy/session-guard-terminated-session.test.ts b/tests/unit/proxy/session-guard-terminated-session.test.ts new file mode 100644 index 000000000..9360ef987 --- /dev/null +++ b/tests/unit/proxy/session-guard-terminated-session.test.ts @@ -0,0 +1,140 @@ +import { beforeEach, describe, expect, test, vi } from "vitest"; +import type { ProxySession } from "@/app/v1/_lib/proxy/session"; + +const getCachedSystemSettingsMock = vi.fn(); + +const extractClientSessionIdMock = vi.fn(); +const getOrCreateSessionIdMock = vi.fn(); +const getNextRequestSequenceMock = vi.fn(); +const storeSessionRequestBodyMock = vi.fn(async () => undefined); +const storeSessionClientRequestMetaMock = vi.fn(async () => undefined); +const storeSessionMessagesMock = vi.fn(async () => undefined); +const storeSessionInfoMock = vi.fn(async () => undefined); +const generateSessionIdMock = vi.fn(); + +const trackSessionMock = vi.fn(async () => undefined); + +class TerminatedSessionError extends Error { + sessionId: string; + terminatedAt: string | null; + + constructor(sessionId: string, terminatedAt: string | null = null) { + super("Session has been terminated"); + this.name = "TerminatedSessionError"; + this.sessionId = sessionId; + this.terminatedAt = terminatedAt; + } +} + +vi.mock("@/lib/config", () => ({ + getCachedSystemSettings: () => getCachedSystemSettingsMock(), +})); + +vi.mock("@/lib/session-manager", () => ({ + SessionManager: { + extractClientSessionId: extractClientSessionIdMock, + getOrCreateSessionId: getOrCreateSessionIdMock, + getNextRequestSequence: getNextRequestSequenceMock, + storeSessionRequestBody: storeSessionRequestBodyMock, + storeSessionClientRequestMeta: storeSessionClientRequestMetaMock, + storeSessionMessages: storeSessionMessagesMock, + storeSessionInfo: storeSessionInfoMock, + generateSessionId: generateSessionIdMock, + }, + TerminatedSessionError, +})); + +vi.mock("@/lib/session-tracker", () => ({ + SessionTracker: { + trackSession: trackSessionMock, + }, +})); + +vi.mock("@/lib/logger", () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + }, +})); + +async function loadGuard() { + const mod = await import("@/app/v1/_lib/proxy/session-guard"); + return mod.ProxySessionGuard; +} + +function createMockSession(overrides: Partial = {}): ProxySession { + const session: any = { + authState: { + success: true, + user: { id: 1, name: "u" }, + key: { id: 1, name: "k" }, + apiKey: "api-key", + }, + request: { + message: {}, + model: "claude-sonnet-4-5-20250929", + }, + headers: new Headers(), + userAgent: "claude_cli/1.0", + requestUrl: "http://localhost/v1/messages", + method: "POST", + originalFormat: "claude", + + sessionId: null, + setSessionId(id: string) { + this.sessionId = id; + }, + setRequestSequence(seq: number) { + this.requestSequence = seq; + }, + getRequestSequence() { + return this.requestSequence ?? 1; + }, + getMessages() { + return []; + }, + getMessagesLength() { + return 1; + }, + isWarmupRequest() { + return false; + }, + } satisfies Partial; + + return { ...session, ...overrides } as ProxySession; +} + +beforeEach(() => { + vi.clearAllMocks(); + getCachedSystemSettingsMock.mockResolvedValue({ + interceptAnthropicWarmupRequests: false, + enableCodexSessionIdCompletion: false, + }); + extractClientSessionIdMock.mockReturnValue("sess_terminated"); + getNextRequestSequenceMock.mockResolvedValue(1); +}); + +describe("ProxySessionGuard - terminated session", () => { + test("当 clientSessionId 已终止时应阻断请求并抛出 ProxyError(410)", async () => { + const ProxySessionGuard = await loadGuard(); + const session = createMockSession(); + + getOrCreateSessionIdMock.mockRejectedValueOnce( + new TerminatedSessionError("sess_terminated", "1") + ); + + await expect(ProxySessionGuard.ensure(session)).rejects.toMatchObject({ + name: "ProxyError", + statusCode: 410, + message: "Session 已被终止,请创建新的会话后重试", + }); + + expect(generateSessionIdMock).not.toHaveBeenCalled(); + expect(trackSessionMock).not.toHaveBeenCalled(); + expect(session.sessionId).toBeNull(); + }); +}); diff --git a/tests/unit/proxy/session-guard-warmup-intercept.test.ts b/tests/unit/proxy/session-guard-warmup-intercept.test.ts index f7443b936..f2c69a5df 100644 --- a/tests/unit/proxy/session-guard-warmup-intercept.test.ts +++ b/tests/unit/proxy/session-guard-warmup-intercept.test.ts @@ -14,6 +14,18 @@ const generateSessionIdMock = vi.fn(); const trackSessionMock = vi.fn(async () => undefined); +class TerminatedSessionError extends Error { + sessionId: string; + terminatedAt: string | null; + + constructor(sessionId: string, terminatedAt: string | null = null) { + super("Session has been terminated"); + this.name = "TerminatedSessionError"; + this.sessionId = sessionId; + this.terminatedAt = terminatedAt; + } +} + vi.mock("@/lib/config", () => ({ getCachedSystemSettings: () => getCachedSystemSettingsMock(), })); @@ -29,6 +41,7 @@ vi.mock("@/lib/session-manager", () => ({ storeSessionInfo: storeSessionInfoMock, generateSessionId: generateSessionIdMock, }, + TerminatedSessionError, })); vi.mock("@/lib/session-tracker", () => ({ From c22a53098d8f4774d1fef20265adf7e992e29cac Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 17:08:34 +0800 Subject: [PATCH 05/14] =?UTF-8?q?fix(session):=20=E8=BD=AC=E4=B9=89=20Redi?= =?UTF-8?q?s=20SCAN=20MATCH=20=E9=81=BF=E5=85=8D=E8=AF=AF=E5=88=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib/session-manager.ts | 20 ++++++- ...ession-manager-scan-pattern-escape.test.ts | 60 +++++++++++++++++++ .../session-manager-terminate-session.test.ts | 11 +++- 3 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 tests/unit/lib/session-manager-scan-pattern-escape.test.ts diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index 993a08d42..ed0ac7b21 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -152,6 +152,20 @@ export class SessionManager { } } + /** + * 将用户可控的字符串安全地嵌入 Redis `SCAN MATCH` glob pattern 中(按字面量匹配)。 + * + * Redis glob 语法中 `* ? [] \\` 都具有特殊含义,因此需要转义以避免误匹配/误删。 + */ + private static escapeRedisMatchPatternLiteral(value: string): string { + return value + .replaceAll("\\", "\\\\") + .replaceAll("*", "\\*") + .replaceAll("?", "\\?") + .replaceAll("[", "\\[") + .replaceAll("]", "\\]"); + } + /** * 获取 STORE_SESSION_MESSAGES 配置 * - true:原样存储 message 内容 @@ -1408,12 +1422,13 @@ export class SessionManager { } // 2. 检查新格式:使用 SCAN 搜索 session:{sessionId}:req:*:messages + const escapedSessionId = SessionManager.escapeRedisMatchPatternLiteral(sessionId); let cursor = "0"; do { const [nextCursor, keys] = (await redis.scan( cursor, "MATCH", - `session:${sessionId}:req:*:messages`, + `session:${escapedSessionId}:req:*:messages`, "COUNT", 100 )) as [string, string[]]; @@ -2080,7 +2095,8 @@ export class SessionManager { // 3. 删除 session:* 相关 key(包含 req:* 新格式,避免终止后仍能查看/复活) let deletedKeys = 0; let cursor = "0"; - const matchPattern = `session:${sessionId}:*`; + const escapedSessionId = SessionManager.escapeRedisMatchPatternLiteral(sessionId); + const matchPattern = `session:${escapedSessionId}:*`; do { const scanResult = (await redis.scan(cursor, "MATCH", matchPattern, "COUNT", 200)) as [ diff --git a/tests/unit/lib/session-manager-scan-pattern-escape.test.ts b/tests/unit/lib/session-manager-scan-pattern-escape.test.ts new file mode 100644 index 000000000..221c28433 --- /dev/null +++ b/tests/unit/lib/session-manager-scan-pattern-escape.test.ts @@ -0,0 +1,60 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +let redisClientRef: any; + +vi.mock("server-only", () => ({})); + +vi.mock("@/lib/logger", () => ({ + logger: { + warn: vi.fn(), + info: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + trace: vi.fn(), + }, +})); + +vi.mock("@/app/v1/_lib/proxy/errors", () => ({ + sanitizeHeaders: vi.fn(() => "(empty)"), + sanitizeUrl: vi.fn((url: unknown) => String(url)), +})); + +vi.mock("@/lib/session-tracker", () => ({ + SessionTracker: { + getConcurrentCount: vi.fn(async () => 0), + }, +})); + +vi.mock("@/lib/redis", () => ({ + getRedisClient: () => redisClientRef, +})); + +describe("SessionManager.hasAnySessionMessages - scan pattern escaping", () => { + beforeEach(() => { + vi.resetAllMocks(); + vi.resetModules(); + + redisClientRef = { + status: "ready", + exists: vi.fn(async () => 0), + scan: vi.fn(async () => ["0", []]), + }; + }); + + it("应对 sessionId 中的 glob 特殊字符进行转义(避免误匹配/误删)", async () => { + const { SessionManager } = await import("@/lib/session-manager"); + + const sessionId = "sess_te*st?[x]"; + const ok = await SessionManager.hasAnySessionMessages(sessionId); + + expect(ok).toBe(false); + expect(redisClientRef.exists).toHaveBeenCalledWith(`session:${sessionId}:messages`); + expect(redisClientRef.scan).toHaveBeenCalledWith( + "0", + "MATCH", + "session:sess_te\\*st\\?\\[x\\]:req:*:messages", + "COUNT", + 100 + ); + }); +}); diff --git a/tests/unit/lib/session-manager-terminate-session.test.ts b/tests/unit/lib/session-manager-terminate-session.test.ts index af4a0af08..a544a85eb 100644 --- a/tests/unit/lib/session-manager-terminate-session.test.ts +++ b/tests/unit/lib/session-manager-terminate-session.test.ts @@ -50,7 +50,7 @@ describe("SessionManager.terminateSession", () => { }); it("应同时从 global/key/user 的 active_sessions ZSET 中移除 sessionId(若可解析到 userId)", async () => { - const sessionId = "sess_test"; + const sessionId = "sess_te*st?[x]"; const terminatedKey = `session:${sessionId}:terminated`; redisClientRef.get.mockImplementation(async (key: string) => { if (key === `session:${sessionId}:provider`) return "42"; @@ -90,6 +90,15 @@ describe("SessionManager.terminateSession", () => { expect(deletePipelineRef.del).toHaveBeenCalledWith(`session:${sessionId}:req:1:messages`); expect(deletePipelineRef.del).toHaveBeenCalledWith(`session:${sessionId}:req:1:response`); expect(deletePipelineRef.del).not.toHaveBeenCalledWith(terminatedKey); + + // 安全性:SCAN MATCH pattern 必须按字面量匹配 sessionId,避免 glob 注入误删其它 key + expect(redisClientRef.scan).toHaveBeenCalledWith( + "0", + "MATCH", + "session:sess_te\\*st\\?\\[x\\]:*", + "COUNT", + 200 + ); }); it("当 userId 不可用时,不应尝试 zrem user active_sessions key", async () => { From 9b33aee5774fed52d0e9afafd6b6592968095d69 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 17:58:44 +0800 Subject: [PATCH 06/14] =?UTF-8?q?fix(session):=20=E6=A0=B9=E6=8D=AE?= =?UTF-8?q?=E5=AE=A1=E6=A0=B8=E5=BB=BA=E8=AE=AE=E5=AE=8C=E5=96=84=E7=BB=88?= =?UTF-8?q?=E6=AD=A2=E6=A0=87=E8=AE=B0=E4=B8=8E=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib/session-manager.ts | 43 ++++++++++++------- ...ession-manager-scan-pattern-escape.test.ts | 5 ++- .../session-manager-terminate-session.test.ts | 4 ++ 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index ed0ac7b21..514000ee8 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -1,6 +1,7 @@ import "server-only"; import crypto from "node:crypto"; +import type Redis from "ioredis"; import { extractCodexSessionId } from "@/app/v1/_lib/codex/session-extractor"; import { sanitizeHeaders, sanitizeUrl } from "@/app/v1/_lib/proxy/errors"; import { getEnvConfig } from "@/lib/config/env.schema"; @@ -115,12 +116,23 @@ export class SessionManager { ); // 短上下文阈值 private static readonly ENABLE_SHORT_CONTEXT_DETECTION = process.env.ENABLE_SHORT_CONTEXT_DETECTION !== "false"; // 默认启用 + // 会话终止标记 TTL(单位:秒) + // 规范环境变量:SESSION_TERMINATION_TTL_SECONDS + // 兼容旧名(计划弃用):SESSION_TERMINATION_TTL / TERMINATED_SESSION_TTL private static readonly TERMINATED_SESSION_TTL = (() => { - const raw = - process.env.SESSION_TERMINATION_TTL_SECONDS ?? - process.env.SESSION_TERMINATION_TTL ?? - process.env.TERMINATED_SESSION_TTL ?? - ""; + const rawPrimary = process.env.SESSION_TERMINATION_TTL_SECONDS; + const rawLegacyA = process.env.SESSION_TERMINATION_TTL; + const rawLegacyB = process.env.TERMINATED_SESSION_TTL; + + if (!rawPrimary && (rawLegacyA || rawLegacyB)) { + logger.warn("SessionManager: Deprecated termination TTL env var detected", { + SESSION_TERMINATION_TTL: rawLegacyA ? "set" : "unset", + TERMINATED_SESSION_TTL: rawLegacyB ? "set" : "unset", + preferred: "SESSION_TERMINATION_TTL_SECONDS", + }); + } + + const raw = rawPrimary ?? rawLegacyA ?? rawLegacyB ?? ""; const parsed = Number.parseInt(raw, 10); if (Number.isFinite(parsed) && parsed > 0) { return parsed; @@ -133,7 +145,7 @@ export class SessionManager { } private static async readTerminationMarker( - redis: any, + redis: Redis, sessionId: string ): Promise { const terminatedKey = SessionManager.getTerminationMarkerKey(sessionId); @@ -418,6 +430,8 @@ export class SessionManager { // 1. 优先使用客户端传递的 session_id (来自 metadata.user_id 或 metadata.session_id) if (clientSessionId) { + // Fail-open:Redis 不可用时,不阻断请求(避免因 Redis 故障导致全站请求失败)。 + // 代价:Redis 故障窗口内无法强制执行 terminated marker,因此 TerminatedSessionError 不会抛出。 if (redis && redis.status === "ready") { const terminatedAt = await SessionManager.readTerminationMarker(redis, clientSessionId); if (terminatedAt) { @@ -431,21 +445,19 @@ export class SessionManager { } } - const effectiveSessionId = clientSessionId; - // 2. 短上下文并发检测(方案E) if ( SessionManager.ENABLE_SHORT_CONTEXT_DETECTION && messagesLength <= SessionManager.SHORT_CONTEXT_THRESHOLD ) { // 检查该 session 是否有其他请求正在运行 - const concurrentCount = await SessionTracker.getConcurrentCount(effectiveSessionId); + const concurrentCount = await SessionTracker.getConcurrentCount(clientSessionId); if (concurrentCount > 0) { // 场景B:有并发请求 → 这是并发短任务 → 强制新建 session const newId = SessionManager.generateSessionId(); logger.info("SessionManager: 检测到并发短任务,强制新建 session", { - originalSessionId: effectiveSessionId, + originalSessionId: clientSessionId, newSessionId: newId, messagesLength, existingConcurrentCount: concurrentCount, @@ -455,22 +467,22 @@ export class SessionManager { // 场景A:无并发 → 这可能是长对话的开始 → 允许复用 logger.debug("SessionManager: 短上下文但 session 空闲,允许复用(长对话开始)", { - sessionId: effectiveSessionId, + sessionId: clientSessionId, messagesLength, }); } // 3. 长上下文 or 无并发 → 正常复用 logger.debug("SessionManager: Using client-provided session", { - sessionId: effectiveSessionId, + sessionId: clientSessionId, }); // 刷新 TTL(滑动窗口) if (redis && redis.status === "ready") { - await SessionManager.refreshSessionTTL(effectiveSessionId).catch((err) => { + await SessionManager.refreshSessionTTL(clientSessionId).catch((err) => { logger.error("SessionManager: Failed to refresh TTL", { error: err }); }); } - return effectiveSessionId; + return clientSessionId; } // 2. 降级方案:计算 messages 内容哈希(TC-047 警告:不可靠) @@ -2037,6 +2049,7 @@ export class SessionManager { const ttlSeconds = SessionManager.TERMINATED_SESSION_TTL; // 0. 标记终止(优先写入,避免并发请求在清理窗口内复活) + // 说明:这里允许覆盖旧值,用于刷新 TTL(多次终止时延长阻断窗口)。 const markerResult = await redis.set(terminatedKey, terminatedAt, "EX", ttlSeconds); const markerOk = markerResult === "OK"; @@ -2092,7 +2105,7 @@ export class SessionManager { await pipeline.exec(); - // 3. 删除 session:* 相关 key(包含 req:* 新格式,避免终止后仍能查看/复活) + // 3. 删除 session:* 相关 key(包含 req:* 新格式;保留 terminated 标记) let deletedKeys = 0; let cursor = "0"; const escapedSessionId = SessionManager.escapeRedisMatchPatternLiteral(sessionId); diff --git a/tests/unit/lib/session-manager-scan-pattern-escape.test.ts b/tests/unit/lib/session-manager-scan-pattern-escape.test.ts index 221c28433..575bd110a 100644 --- a/tests/unit/lib/session-manager-scan-pattern-escape.test.ts +++ b/tests/unit/lib/session-manager-scan-pattern-escape.test.ts @@ -1,6 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let redisClientRef: any; +const getRedisClientMock = vi.fn(); vi.mock("server-only", () => ({})); @@ -26,7 +27,7 @@ vi.mock("@/lib/session-tracker", () => ({ })); vi.mock("@/lib/redis", () => ({ - getRedisClient: () => redisClientRef, + getRedisClient: getRedisClientMock, })); describe("SessionManager.hasAnySessionMessages - scan pattern escaping", () => { @@ -39,6 +40,8 @@ describe("SessionManager.hasAnySessionMessages - scan pattern escaping", () => { exists: vi.fn(async () => 0), scan: vi.fn(async () => ["0", []]), }; + + getRedisClientMock.mockReturnValue(redisClientRef); }); it("应对 sessionId 中的 glob 特殊字符进行转义(避免误匹配/误删)", async () => { diff --git a/tests/unit/lib/session-manager-terminate-session.test.ts b/tests/unit/lib/session-manager-terminate-session.test.ts index a544a85eb..f3da4cd7f 100644 --- a/tests/unit/lib/session-manager-terminate-session.test.ts +++ b/tests/unit/lib/session-manager-terminate-session.test.ts @@ -44,7 +44,9 @@ describe("SessionManager.terminateSession", () => { mget: vi.fn(async () => [null, null]), pipeline: vi .fn() + // 第一次 pipeline:用于 ZSET 清理(global/key/provider/user) .mockImplementationOnce(() => pipelineRef) + // 第二次 pipeline:用于批量删除 session:{id}:* key .mockImplementationOnce(() => deletePipelineRef), }; }); @@ -111,6 +113,8 @@ describe("SessionManager.terminateSession", () => { }); redisClientRef.hget.mockResolvedValue(null); redisClientRef.scan.mockResolvedValueOnce(["0", [terminatedKey]]); + // SCAN 仅返回 terminatedKey 时,不会发出任何 DEL 命令,因此 exec 结果应为空(避免误计 deletedKeys)。 + deletePipelineRef.exec.mockResolvedValueOnce([]); const { getUserActiveSessionsKey } = await import("@/lib/redis/active-session-keys"); const { SessionManager } = await import("@/lib/session-manager"); From 25193e338ab1fe6b74ebb11b33a7a9c213d6d43f Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 18:10:57 +0800 Subject: [PATCH 07/14] =?UTF-8?q?fix(session):=20=E7=BB=88=E6=AD=A2?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=E5=80=BC=E5=8C=BA=E5=88=86=20markerOk=20?= =?UTF-8?q?=E4=B8=8E=E6=B8=85=E7=90=86=E7=BB=93=E6=9E=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/actions/active-sessions.ts | 4 +-- src/lib/session-manager.ts | 31 +++++++++++-------- .../session-manager-terminate-session.test.ts | 8 ++--- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/actions/active-sessions.ts b/src/actions/active-sessions.ts index e03cb7731..65fc63deb 100644 --- a/src/actions/active-sessions.ts +++ b/src/actions/active-sessions.ts @@ -854,9 +854,9 @@ export async function terminateActiveSession(sessionId: string): Promise { + static async terminateSession(sessionId: string): Promise { const redis = getRedisClient(); if (!redis || redis.status !== "ready") { logger.warn("SessionManager: Redis not ready, cannot terminate session"); - return false; + return { markerOk: false, deletedKeys: 0 }; } try { @@ -2148,13 +2153,13 @@ export class SessionManager { markerOk, }); - return markerOk || deletedKeys > 0; + return { markerOk, deletedKeys }; } catch (error) { logger.error("SessionManager: Failed to terminate session", { error, sessionId, }); - return false; + return { markerOk: false, deletedKeys: 0 }; } } @@ -2182,14 +2187,14 @@ export class SessionManager { const CHUNK_SIZE = 20; let successCount = 0; - for (let i = 0; i < sessionIds.length; i += CHUNK_SIZE) { - const chunk = sessionIds.slice(i, i + CHUNK_SIZE); - const results = await Promise.all( - chunk.map(async (sessionId) => { - const success = await SessionManager.terminateSession(sessionId); - return success ? 1 : 0; - }) - ); + for (let i = 0; i < sessionIds.length; i += CHUNK_SIZE) { + const chunk = sessionIds.slice(i, i + CHUNK_SIZE); + const results = await Promise.all( + chunk.map(async (sessionId) => { + const result = await SessionManager.terminateSession(sessionId); + return result.markerOk ? 1 : 0; + }) + ); successCount += results.reduce((sum, value) => sum + value, 0); } diff --git a/tests/unit/lib/session-manager-terminate-session.test.ts b/tests/unit/lib/session-manager-terminate-session.test.ts index f3da4cd7f..3d462b722 100644 --- a/tests/unit/lib/session-manager-terminate-session.test.ts +++ b/tests/unit/lib/session-manager-terminate-session.test.ts @@ -77,8 +77,8 @@ describe("SessionManager.terminateSession", () => { await import("@/lib/redis/active-session-keys"); const { SessionManager } = await import("@/lib/session-manager"); - const ok = await SessionManager.terminateSession(sessionId); - expect(ok).toBe(true); + const result = await SessionManager.terminateSession(sessionId); + expect(result.markerOk).toBe(true); expect(redisClientRef.set).toHaveBeenCalledWith(terminatedKey, expect.any(String), "EX", 86400); expect(redisClientRef.hget).toHaveBeenCalledWith(`session:${sessionId}:info`, "userId"); @@ -119,8 +119,8 @@ describe("SessionManager.terminateSession", () => { const { getUserActiveSessionsKey } = await import("@/lib/redis/active-session-keys"); const { SessionManager } = await import("@/lib/session-manager"); - const ok = await SessionManager.terminateSession(sessionId); - expect(ok).toBe(true); + const result = await SessionManager.terminateSession(sessionId); + expect(result.markerOk).toBe(true); expect(pipelineRef.zrem).not.toHaveBeenCalledWith(getUserActiveSessionsKey(123), sessionId); }); From cf3bd34903e614a0c6c1d780773186cf9e743621 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 25 Feb 2026 10:11:32 +0000 Subject: [PATCH 08/14] chore: format code (fix-session-terminate-25193e3) --- src/lib/session-manager.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index 2c1f6af50..aab1dd1bc 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -2187,14 +2187,14 @@ export class SessionManager { const CHUNK_SIZE = 20; let successCount = 0; - for (let i = 0; i < sessionIds.length; i += CHUNK_SIZE) { - const chunk = sessionIds.slice(i, i + CHUNK_SIZE); - const results = await Promise.all( - chunk.map(async (sessionId) => { - const result = await SessionManager.terminateSession(sessionId); - return result.markerOk ? 1 : 0; - }) - ); + for (let i = 0; i < sessionIds.length; i += CHUNK_SIZE) { + const chunk = sessionIds.slice(i, i + CHUNK_SIZE); + const results = await Promise.all( + chunk.map(async (sessionId) => { + const result = await SessionManager.terminateSession(sessionId); + return result.markerOk ? 1 : 0; + }) + ); successCount += results.reduce((sum, value) => sum + value, 0); } From 59a30432abd839b2088526e661831ba17df1a813 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 18:20:52 +0800 Subject: [PATCH 09/14] =?UTF-8?q?chore:=20=E8=A7=A6=E5=8F=91=20CI=EF=BC=88?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96=E5=90=8E=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From 7f50805c1d439c00da8be11e5addfb492070b7d6 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 18:29:17 +0800 Subject: [PATCH 10/14] =?UTF-8?q?test(session):=20=E8=A1=A5=E9=BD=90=20mar?= =?UTF-8?q?kerOk=3Dfalse=20=E5=88=86=E6=94=AF=E8=A6=86=E7=9B=96=E5=B9=B6?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib/session-manager.ts | 17 +++++++++++------ .../session-manager-terminate-session.test.ts | 13 ++++++++++++- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index aab1dd1bc..94a7d0e73 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -512,7 +512,7 @@ export class SessionManager { if (redis && redis.status === "ready") { try { const hashKey = `hash:${contentHash}:session`; - let existingSessionId = await redis.get(hashKey); + const existingSessionId = await redis.get(hashKey); if (existingSessionId) { const terminatedAt = await SessionManager.readTerminationMarker(redis, existingSessionId); @@ -525,10 +525,7 @@ export class SessionManager { hash: contentHash, } ); - existingSessionId = null; - } - - if (existingSessionId) { + } else { // 找到已有 session,刷新 TTL await SessionManager.refreshSessionTTL(existingSessionId); logger.trace("SessionManager: Reusing session via hash", { @@ -2059,7 +2056,15 @@ export class SessionManager { const markerOk = markerResult === "OK"; if (!markerOk) { - logger.warn("SessionManager: Failed to set termination marker", { sessionId }); + logger.warn( + "SessionManager: Failed to set termination marker; cleanup will still proceed (session may be reusable)", + { + sessionId, + terminatedKey, + terminatedAt, + ttlSeconds, + } + ); } // 1. 先查询绑定信息(用于从 ZSET 中移除) diff --git a/tests/unit/lib/session-manager-terminate-session.test.ts b/tests/unit/lib/session-manager-terminate-session.test.ts index 3d462b722..1b047d3e0 100644 --- a/tests/unit/lib/session-manager-terminate-session.test.ts +++ b/tests/unit/lib/session-manager-terminate-session.test.ts @@ -41,7 +41,6 @@ describe("SessionManager.terminateSession", () => { get: vi.fn(async () => null), hget: vi.fn(async () => null), scan: vi.fn(async () => ["0", []]), - mget: vi.fn(async () => [null, null]), pipeline: vi .fn() // 第一次 pipeline:用于 ZSET 清理(global/key/provider/user) @@ -124,4 +123,16 @@ describe("SessionManager.terminateSession", () => { expect(pipelineRef.zrem).not.toHaveBeenCalledWith(getUserActiveSessionsKey(123), sessionId); }); + + it("当终止标记写入失败时,markerOk 应为 false(但清理仍会执行)", async () => { + const sessionId = "sess_marker_fail"; + redisClientRef.set.mockResolvedValueOnce(null); + redisClientRef.scan.mockResolvedValueOnce(["0", [`session:${sessionId}:provider`]]); + + const { SessionManager } = await import("@/lib/session-manager"); + const result = await SessionManager.terminateSession(sessionId); + + expect(result.markerOk).toBe(false); + expect(deletePipelineRef.del).toHaveBeenCalledWith(`session:${sessionId}:provider`); + }); }); From d876014dc2a66075c2fcefcde63d514b2865a4af Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 19:05:48 +0800 Subject: [PATCH 11/14] =?UTF-8?q?fix(session):=20=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=E7=BB=88=E6=AD=A2=20TTL=20=E8=A7=A3=E6=9E=90=E4=B8=8E=E6=B8=85?= =?UTF-8?q?=E7=90=86=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib/session-manager.ts | 77 +++++++++++++++---- .../session-manager-terminate-session.test.ts | 4 +- 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index 94a7d0e73..ee88ecbb7 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -84,7 +84,8 @@ export class TerminatedSessionError extends Error { public readonly sessionId: string, public readonly terminatedAt: string | null = null ) { - super("Session has been terminated"); + // 注意:此错误的 message 不应作为用户可见文案;用户提示由 HTTP 层/ProxySessionGuard 统一映射。 + super("ERR_TERMINATED_SESSION"); this.name = "TerminatedSessionError"; } } @@ -125,9 +126,15 @@ export class SessionManager { // 规范环境变量:SESSION_TERMINATION_TTL_SECONDS // 兼容旧名(计划弃用):SESSION_TERMINATION_TTL / TERMINATED_SESSION_TTL private static readonly TERMINATED_SESSION_TTL = (() => { - const rawPrimary = process.env.SESSION_TERMINATION_TTL_SECONDS; - const rawLegacyA = process.env.SESSION_TERMINATION_TTL; - const rawLegacyB = process.env.TERMINATED_SESSION_TTL; + const normalize = (value: string | undefined): string | undefined => { + if (typeof value !== "string") return undefined; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; + }; + + const rawPrimary = normalize(process.env.SESSION_TERMINATION_TTL_SECONDS); + const rawLegacyA = normalize(process.env.SESSION_TERMINATION_TTL); + const rawLegacyB = normalize(process.env.TERMINATED_SESSION_TTL); if (!rawPrimary && (rawLegacyA || rawLegacyB)) { logger.warn("SessionManager: Deprecated termination TTL env var detected", { @@ -137,8 +144,8 @@ export class SessionManager { }); } - const raw = rawPrimary ?? rawLegacyA ?? rawLegacyB ?? ""; - const parsed = Number.parseInt(raw, 10); + const raw = rawPrimary ?? rawLegacyA ?? rawLegacyB; + const parsed = raw ? Number.parseInt(raw, 10) : Number.NaN; if (Number.isFinite(parsed) && parsed > 0) { return parsed; } @@ -2083,6 +2090,14 @@ export class SessionManager { keyId = keyIdStr ? parseInt(keyIdStr, 10) : null; userId = userIdStr ? parseInt(userIdStr, 10) : null; + if (!Number.isFinite(providerId)) { + providerId = null; + } + + if (!Number.isFinite(keyId)) { + keyId = null; + } + if (!Number.isFinite(userId)) { userId = null; } @@ -2099,21 +2114,55 @@ export class SessionManager { // 2. 从 ZSET 中移除(始终尝试,即使查询失败) const pipeline = redis.pipeline(); - pipeline.zrem(getGlobalActiveSessionsKey(), sessionId); + const zremKeys: string[] = []; - if (providerId) { - pipeline.zrem(`provider:${providerId}:active_sessions`, sessionId); + const globalKey = getGlobalActiveSessionsKey(); + pipeline.zrem(globalKey, sessionId); + zremKeys.push(globalKey); + + if (providerId !== null) { + const key = `provider:${providerId}:active_sessions`; + pipeline.zrem(key, sessionId); + zremKeys.push(key); } - if (keyId) { - pipeline.zrem(getKeyActiveSessionsKey(keyId), sessionId); + if (keyId !== null) { + const key = getKeyActiveSessionsKey(keyId); + pipeline.zrem(key, sessionId); + zremKeys.push(key); } - if (userId) { - pipeline.zrem(getUserActiveSessionsKey(userId), sessionId); + if (userId !== null) { + const key = getUserActiveSessionsKey(userId); + pipeline.zrem(key, sessionId); + zremKeys.push(key); } - await pipeline.exec(); + try { + const results = await pipeline.exec(); + if (results) { + for (let i = 0; i < results.length; i++) { + const [err] = results[i]; + if (!err) continue; + logger.warn("SessionManager: Failed to remove session from active_sessions ZSET", { + sessionId, + zsetKey: zremKeys[i], + providerId, + keyId, + userId, + error: err, + }); + } + } + } catch (zremError) { + logger.warn("SessionManager: Failed to cleanup active_sessions ZSET, continuing", { + sessionId, + providerId, + keyId, + userId, + error: zremError, + }); + } // 3. 删除 session:* 相关 key(包含 req:* 新格式;保留 terminated 标记) let deletedKeys = 0; diff --git a/tests/unit/lib/session-manager-terminate-session.test.ts b/tests/unit/lib/session-manager-terminate-session.test.ts index 1b047d3e0..0b5b068fc 100644 --- a/tests/unit/lib/session-manager-terminate-session.test.ts +++ b/tests/unit/lib/session-manager-terminate-session.test.ts @@ -45,8 +45,8 @@ describe("SessionManager.terminateSession", () => { .fn() // 第一次 pipeline:用于 ZSET 清理(global/key/provider/user) .mockImplementationOnce(() => pipelineRef) - // 第二次 pipeline:用于批量删除 session:{id}:* key - .mockImplementationOnce(() => deletePipelineRef), + // 后续 pipeline:用于批量删除 session:{id}:* key(可能多页 SCAN) + .mockImplementation(() => deletePipelineRef), }; }); From f1a9ceb34bdb05662db2d7c296f6db8bd6842ab7 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 20:28:50 +0800 Subject: [PATCH 12/14] =?UTF-8?q?fix(session):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E7=BB=88=E6=AD=A2=E6=B8=85=E7=90=86=E8=AF=AD=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib/session-manager.ts | 68 ++++++++++++------- .../session-manager-terminate-session.test.ts | 16 ++++- 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index ee88ecbb7..c59a50a6c 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -2052,6 +2052,9 @@ export class SessionManager { return { markerOk: false, deletedKeys: 0 }; } + let markerOk = false; + let deletedKeys = 0; + try { const terminatedKey = SessionManager.getTerminationMarkerKey(sessionId); const terminatedAt = Date.now().toString(); @@ -2060,7 +2063,7 @@ export class SessionManager { // 0. 标记终止(优先写入,避免并发请求在清理窗口内复活) // 说明:这里允许覆盖旧值,用于刷新 TTL(多次终止时延长阻断窗口)。 const markerResult = await redis.set(terminatedKey, terminatedAt, "EX", ttlSeconds); - const markerOk = markerResult === "OK"; + markerOk = markerResult === "OK"; if (!markerOk) { logger.warn( @@ -2165,37 +2168,52 @@ export class SessionManager { } // 3. 删除 session:* 相关 key(包含 req:* 新格式;保留 terminated 标记) - let deletedKeys = 0; - let cursor = "0"; const escapedSessionId = SessionManager.escapeRedisMatchPatternLiteral(sessionId); const matchPattern = `session:${escapedSessionId}:*`; - do { - const scanResult = (await redis.scan(cursor, "MATCH", matchPattern, "COUNT", 200)) as [ - string, - string[], - ]; - const nextCursor = scanResult[0]; - const keys = scanResult[1] ?? []; - cursor = nextCursor; + // 说明:Redis SCAN 不提供快照语义;为了减少并发窗口下的遗漏,这里最多执行两轮全量扫描清理。 + const MAX_SCAN_ROUNDS = 2; + for (let round = 0; round < MAX_SCAN_ROUNDS; round++) { + let cursor = "0"; + let deletedInRound = 0; + + do { + const scanResult = (await redis.scan(cursor, "MATCH", matchPattern, "COUNT", 200)) as [ + string, + string[], + ]; + const nextCursor = scanResult[0]; + const keys = scanResult[1] ?? []; + cursor = nextCursor; + + if (keys.length === 0) continue; + + const deletePipeline = redis.pipeline(); + let hasDeletes = false; + for (const key of keys) { + if (key === terminatedKey) continue; + deletePipeline.del(key); + hasDeletes = true; + } - if (keys.length === 0) continue; + // 如果这一页 SCAN 只返回了 terminatedKey,则无需发起空 pipeline.exec()。 + if (!hasDeletes) continue; - const deletePipeline = redis.pipeline(); - for (const key of keys) { - if (key === terminatedKey) continue; - deletePipeline.del(key); - } + const deleteResults = await deletePipeline.exec(); + if (!deleteResults) continue; - const deleteResults = await deletePipeline.exec(); - if (!deleteResults) continue; - - for (const [err, result] of deleteResults) { - if (!err && typeof result === "number" && result > 0) { - deletedKeys += result; + for (const [err, result] of deleteResults) { + if (!err && typeof result === "number" && result > 0) { + deletedInRound += result; + } } + } while (cursor !== "0"); + + deletedKeys += deletedInRound; + if (deletedInRound === 0) { + break; } - } while (cursor !== "0"); + } logger.info("SessionManager: Terminated session", { sessionId, @@ -2213,7 +2231,7 @@ export class SessionManager { error, sessionId, }); - return { markerOk: false, deletedKeys: 0 }; + return { markerOk, deletedKeys }; } } diff --git a/tests/unit/lib/session-manager-terminate-session.test.ts b/tests/unit/lib/session-manager-terminate-session.test.ts index 0b5b068fc..7629153e3 100644 --- a/tests/unit/lib/session-manager-terminate-session.test.ts +++ b/tests/unit/lib/session-manager-terminate-session.test.ts @@ -112,8 +112,6 @@ describe("SessionManager.terminateSession", () => { }); redisClientRef.hget.mockResolvedValue(null); redisClientRef.scan.mockResolvedValueOnce(["0", [terminatedKey]]); - // SCAN 仅返回 terminatedKey 时,不会发出任何 DEL 命令,因此 exec 结果应为空(避免误计 deletedKeys)。 - deletePipelineRef.exec.mockResolvedValueOnce([]); const { getUserActiveSessionsKey } = await import("@/lib/redis/active-session-keys"); const { SessionManager } = await import("@/lib/session-manager"); @@ -121,6 +119,8 @@ describe("SessionManager.terminateSession", () => { const result = await SessionManager.terminateSession(sessionId); expect(result.markerOk).toBe(true); + // SCAN 仅返回 terminatedKey 时,不会发出任何 DEL 命令,因此不应执行 delete pipeline(避免不必要的网络开销)。 + expect(deletePipelineRef.exec).not.toHaveBeenCalled(); expect(pipelineRef.zrem).not.toHaveBeenCalledWith(getUserActiveSessionsKey(123), sessionId); }); @@ -135,4 +135,16 @@ describe("SessionManager.terminateSession", () => { expect(result.markerOk).toBe(false); expect(deletePipelineRef.del).toHaveBeenCalledWith(`session:${sessionId}:provider`); }); + + it("当清理过程抛错时,应尽量保留 markerOk=true(如果终止标记已写入)", async () => { + const sessionId = "sess_cleanup_fail"; + const terminatedKey = `session:${sessionId}:terminated`; + redisClientRef.scan.mockRejectedValueOnce(new Error("scan failed")); + + const { SessionManager } = await import("@/lib/session-manager"); + const result = await SessionManager.terminateSession(sessionId); + + expect(redisClientRef.set).toHaveBeenCalledWith(terminatedKey, expect.any(String), "EX", 86400); + expect(result.markerOk).toBe(true); + }); }); From b490a1eb59b8461d9a00933c272e2bb797f686f4 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 20:36:03 +0800 Subject: [PATCH 13/14] =?UTF-8?q?test(session):=20=E8=A1=A5=E9=BD=90=20pip?= =?UTF-8?q?eline=20expire=20mock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/lib/session-manager-terminated-remap.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/lib/session-manager-terminated-remap.test.ts b/tests/unit/lib/session-manager-terminated-remap.test.ts index 169663e99..61847e2e9 100644 --- a/tests/unit/lib/session-manager-terminated-remap.test.ts +++ b/tests/unit/lib/session-manager-terminated-remap.test.ts @@ -32,6 +32,7 @@ vi.mock("@/lib/redis", () => ({ function makePipeline() { const pipeline = { setex: vi.fn(() => pipeline), + expire: vi.fn(() => pipeline), exec: vi.fn(async () => []), }; return pipeline; From df6d67503235084a4a76af63334eb78939923466 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 25 Feb 2026 21:22:57 +0800 Subject: [PATCH 14/14] =?UTF-8?q?chore(session):=20=E5=B8=B8=E9=87=8F?= =?UTF-8?q?=E5=8C=96=20SCAN=20COUNT=20=E5=B9=B6=E8=A1=A5=E5=BC=BA=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E6=96=AD=E8=A8=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib/session-manager.ts | 20 ++++++++++++------- .../session-manager-terminated-remap.test.ts | 9 ++++++--- tests/unit/repository/provider.test.ts | 2 +- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index c59a50a6c..906afc5d1 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -152,6 +152,9 @@ export class SessionManager { return 24 * 60 * 60; // 1 天 })(); + private static readonly SCAN_COUNT = 100; + private static readonly TERMINATE_SCAN_COUNT = 200; + private static getTerminationMarkerKey(sessionId: string): string { return `session:${sessionId}:terminated`; } @@ -1265,7 +1268,7 @@ export class SessionManager { "MATCH", "session:*:info", "COUNT", - 100 + SessionManager.SCAN_COUNT )) as [string, string[]]; cursor = nextCursor; @@ -1360,7 +1363,7 @@ export class SessionManager { "MATCH", "session:*:info", "COUNT", - 100 + SessionManager.SCAN_COUNT )) as [string, string[]]; cursor = nextCursor; @@ -1451,7 +1454,7 @@ export class SessionManager { "MATCH", `session:${escapedSessionId}:req:*:messages`, "COUNT", - 100 + SessionManager.SCAN_COUNT )) as [string, string[]]; cursor = nextCursor; @@ -2178,10 +2181,13 @@ export class SessionManager { let deletedInRound = 0; do { - const scanResult = (await redis.scan(cursor, "MATCH", matchPattern, "COUNT", 200)) as [ - string, - string[], - ]; + const scanResult = (await redis.scan( + cursor, + "MATCH", + matchPattern, + "COUNT", + SessionManager.TERMINATE_SCAN_COUNT + )) as [string, string[]]; const nextCursor = scanResult[0]; const keys = scanResult[1] ?? []; cursor = nextCursor; diff --git a/tests/unit/lib/session-manager-terminated-remap.test.ts b/tests/unit/lib/session-manager-terminated-remap.test.ts index 61847e2e9..a6132e88c 100644 --- a/tests/unit/lib/session-manager-terminated-remap.test.ts +++ b/tests/unit/lib/session-manager-terminated-remap.test.ts @@ -73,9 +73,12 @@ describe("SessionManager.getOrCreateSessionId - terminated blocking", () => { const { SessionManager, TerminatedSessionError } = await import("@/lib/session-manager"); - await expect( - SessionManager.getOrCreateSessionId(keyId, [], oldSessionId) - ).rejects.toBeInstanceOf(TerminatedSessionError); + const error = await SessionManager.getOrCreateSessionId(keyId, [], oldSessionId).catch( + (e) => e as any + ); + expect(error).toBeInstanceOf(TerminatedSessionError); + expect(error.sessionId).toBe(oldSessionId); + expect(error.terminatedAt).toBe("1"); }); it("hash 命中已终止 session 时应创建新 session", async () => { diff --git a/tests/unit/repository/provider.test.ts b/tests/unit/repository/provider.test.ts index ef9e6b7f5..fff4538ef 100644 --- a/tests/unit/repository/provider.test.ts +++ b/tests/unit/repository/provider.test.ts @@ -51,7 +51,7 @@ describe("provider repository - updateProviderPrioritiesBatch", () => { test("returns 0 and does not execute SQL when updates is empty", async () => { vi.resetModules(); - const executeMock = vi.fn(async () => ({ rowCount: 0 })); + const executeMock = vi.fn(async () => ({ count: 0 })); vi.doMock("@/drizzle/db", () => ({ db: {