From f61e357406eb2bd33c958d6b51ba5b83ddd0e2be Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Thu, 19 Mar 2026 20:26:20 +0800 Subject: [PATCH] fix(plugin): restore bug fixes from #681 and #688 lost during #662 merge PR #662 renamed the plugin directory and rewrote index.ts from a stale branch base, silently dropping two merged bug fixes: - #681: share pending clientPromise across dual-context registrations to prevent before_agent_start hook from hanging forever - #688: wrap auto-recall search in withTimeout(5s) to prevent indefinite agent hang when OpenViking search API is slow or unresponsive --- examples/openclaw-plugin/index.ts | 161 +++++++++++++++++------------- 1 file changed, 93 insertions(+), 68 deletions(-) diff --git a/examples/openclaw-plugin/index.ts b/examples/openclaw-plugin/index.ts index 9138ce34..4e11dba9 100644 --- a/examples/openclaw-plugin/index.ts +++ b/examples/openclaw-plugin/index.ts @@ -4,8 +4,8 @@ import { tmpdir } from "node:os"; import { Type } from "@sinclair/typebox"; import { memoryOpenVikingConfigSchema } from "./config.js"; -import { OpenVikingClient, localClientCache, isMemoryUri } from "./client.js"; -import type { FindResultItem } from "./client.js"; +import { OpenVikingClient, localClientCache, localClientPendingPromises, isMemoryUri } from "./client.js"; +import type { FindResultItem, PendingClientEntry } from "./client.js"; import { isTranscriptLikeIngest, extractLatestUserText, @@ -69,6 +69,7 @@ type OpenClawPluginApi = { const MAX_OPENVIKING_STDERR_LINES = 200; const MAX_OPENVIKING_STDERR_CHARS = 256_000; +const AUTO_RECALL_TIMEOUT_MS = 5_000; const contextEnginePlugin = { id: "openviking", @@ -108,10 +109,18 @@ const contextEnginePlugin = { localProcess = cached.process; clientPromise = Promise.resolve(cached.client); } else { - clientPromise = new Promise((resolve, reject) => { - resolveLocalClient = resolve; - rejectLocalClient = reject; - }); + const existingPending = localClientPendingPromises.get(localCacheKey); + if (existingPending) { + clientPromise = existingPending.promise; + } else { + const entry = {} as PendingClientEntry; + entry.promise = new Promise((resolve, reject) => { + entry.resolve = resolve; + entry.reject = reject; + }); + clientPromise = entry.promise; + localClientPendingPromises.set(localCacheKey, entry); + } } } else { clientPromise = Promise.resolve(new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs)); @@ -449,67 +458,73 @@ const contextEnginePlugin = { ); } else { try { - const candidateLimit = Math.max(cfg.recallLimit * 4, 20); - const [userSettled, agentSettled] = await Promise.allSettled([ - client.find(queryText, { - targetUri: "viking://user/memories", - limit: candidateLimit, - scoreThreshold: 0, - }), - client.find(queryText, { - targetUri: "viking://agent/memories", - limit: candidateLimit, - scoreThreshold: 0, - }), - ]); - - const userResult = userSettled.status === "fulfilled" ? userSettled.value : { memories: [] }; - const agentResult = agentSettled.status === "fulfilled" ? agentSettled.value : { memories: [] }; - if (userSettled.status === "rejected") { - api.logger.warn(`openviking: user memories search failed: ${String(userSettled.reason)}`); - } - if (agentSettled.status === "rejected") { - api.logger.warn(`openviking: agent memories search failed: ${String(agentSettled.reason)}`); - } - - const allMemories = [...(userResult.memories ?? []), ...(agentResult.memories ?? [])]; - const uniqueMemories = allMemories.filter((memory, index, self) => - index === self.findIndex((m) => m.uri === memory.uri) - ); - const leafOnly = uniqueMemories.filter((m) => m.level === 2); - const processed = postProcessMemories(leafOnly, { - limit: candidateLimit, - scoreThreshold: cfg.recallScoreThreshold, - }); - const memories = pickMemoriesForInjection(processed, cfg.recallLimit, queryText); - - if (memories.length > 0) { - const memoryLines = await Promise.all( - memories.map(async (item: FindResultItem) => { - if (item.level === 2) { - try { - const content = await client.read(item.uri); - if (content && typeof content === "string" && content.trim()) { - return `- [${item.category ?? "memory"}] ${content.trim()}`; + await withTimeout( + (async () => { + const candidateLimit = Math.max(cfg.recallLimit * 4, 20); + const [userSettled, agentSettled] = await Promise.allSettled([ + client.find(queryText, { + targetUri: "viking://user/memories", + limit: candidateLimit, + scoreThreshold: 0, + }), + client.find(queryText, { + targetUri: "viking://agent/memories", + limit: candidateLimit, + scoreThreshold: 0, + }), + ]); + + const userResult = userSettled.status === "fulfilled" ? userSettled.value : { memories: [] }; + const agentResult = agentSettled.status === "fulfilled" ? agentSettled.value : { memories: [] }; + if (userSettled.status === "rejected") { + api.logger.warn(`openviking: user memories search failed: ${String(userSettled.reason)}`); + } + if (agentSettled.status === "rejected") { + api.logger.warn(`openviking: agent memories search failed: ${String(agentSettled.reason)}`); + } + + const allMemories = [...(userResult.memories ?? []), ...(agentResult.memories ?? [])]; + const uniqueMemories = allMemories.filter((memory, index, self) => + index === self.findIndex((m) => m.uri === memory.uri) + ); + const leafOnly = uniqueMemories.filter((m) => m.level === 2); + const processed = postProcessMemories(leafOnly, { + limit: candidateLimit, + scoreThreshold: cfg.recallScoreThreshold, + }); + const memories = pickMemoriesForInjection(processed, cfg.recallLimit, queryText); + + if (memories.length > 0) { + const memoryLines = await Promise.all( + memories.map(async (item: FindResultItem) => { + if (item.level === 2) { + try { + const content = await client.read(item.uri); + if (content && typeof content === "string" && content.trim()) { + return `- [${item.category ?? "memory"}] ${content.trim()}`; + } + } catch { + // fallback to abstract + } } - } catch { - // fallback to abstract - } - } - return `- [${item.category ?? "memory"}] ${item.abstract ?? item.uri}`; - }), - ); - const memoryContext = memoryLines.join("\n"); - api.logger.info(`openviking: injecting ${memories.length} memories into context`); - api.logger.info( - `openviking: inject-detail ${toJsonLog({ count: memories.length, memories: summarizeInjectionMemories(memories) })}`, - ); - prependContextParts.push( - "\nThe following OpenViking memories may be relevant:\n" + - `${memoryContext}\n` + - "", - ); - } + return `- [${item.category ?? "memory"}] ${item.abstract ?? item.uri}`; + }), + ); + const memoryContext = memoryLines.join("\n"); + api.logger.info(`openviking: injecting ${memories.length} memories into context`); + api.logger.info( + `openviking: inject-detail ${toJsonLog({ count: memories.length, memories: summarizeInjectionMemories(memories) })}`, + ); + prependContextParts.push( + "\nThe following OpenViking memories may be relevant:\n" + + `${memoryContext}\n` + + "", + ); + } + })(), + AUTO_RECALL_TIMEOUT_MS, + "openviking: auto-recall search timeout", + ); } catch (err) { api.logger.warn(`openviking: auto-recall failed: ${String(err)}`); } @@ -576,7 +591,16 @@ const contextEnginePlugin = { api.registerService({ id: "openviking", start: async () => { - if (cfg.mode === "local" && resolveLocalClient) { + // Claim the pending entry — only the first start() call to claim it spawns the process. + // Subsequent start() calls (from other registrations sharing the same promise) fall through. + const pendingEntry = localClientPendingPromises.get(localCacheKey); + const isSpawner = cfg.mode === "local" && !!pendingEntry; + if (isSpawner) { + localClientPendingPromises.delete(localCacheKey); + resolveLocalClient = pendingEntry!.resolve; + rejectLocalClient = pendingEntry!.reject; + } + if (isSpawner) { const timeoutMs = 60_000; const intervalMs = 500; @@ -654,7 +678,7 @@ const contextEnginePlugin = { await waitForHealth(baseUrl, timeoutMs, intervalMs); const client = new OpenVikingClient(baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs); localClientCache.set(localCacheKey, { client, process: child }); - resolveLocalClient(client); + resolveLocalClient!(client); rejectLocalClient = null; api.logger.info( `openviking: local server started (${baseUrl}, config: ${cfg.configPath})`, @@ -681,6 +705,7 @@ const contextEnginePlugin = { if (localProcess) { localProcess.kill("SIGTERM"); localClientCache.delete(localCacheKey); + localClientPendingPromises.delete(localCacheKey); localProcess = null; api.logger.info("openviking: local server stopped"); } else {