Skip to content

Commit 4debb57

Browse files
committed
feat(hooks): add pulse-monitor for token stall detection and auto-recovery
- Detect token stalls via message.part.updated heartbeat monitoring - Support thinking/reasoning block detection with extended timeout - Auto-recover: abort + 'continue' prompt on 5min stall - Pause monitoring during tool execution
1 parent a763db6 commit 4debb57

File tree

3 files changed

+148
-1
lines changed

3 files changed

+148
-1
lines changed

src/hooks/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export { createSessionNotification } from "./session-notification"
44
export { createSessionRecoveryHook } from "./session-recovery"
55
export { createCommentCheckerHooks } from "./comment-checker"
66
export { createGrepOutputTruncatorHook } from "./grep-output-truncator"
7+
export { createPulseMonitorHook } from "./pulse-monitor"

src/hooks/pulse-monitor.ts

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import type { PluginInput } from "@opencode-ai/plugin"
2+
3+
export function createPulseMonitorHook(ctx: PluginInput) {
4+
const STANDARD_TIMEOUT = 5 * 60 * 1000 // 5 minutes
5+
const THINKING_TIMEOUT = 5 * 60 * 1000 // 5 minutes
6+
const CHECK_INTERVAL = 5 * 1000 // 5 seconds
7+
8+
let lastHeartbeat = Date.now()
9+
let isMonitoring = false
10+
let currentSessionID: string | null = null
11+
let monitorTimer: ReturnType<typeof setInterval> | null = null
12+
let isThinking = false
13+
14+
const startMonitoring = (sessionID: string) => {
15+
if (currentSessionID !== sessionID) {
16+
currentSessionID = sessionID
17+
// Reset thinking state when switching sessions or starting new
18+
isThinking = false
19+
}
20+
21+
lastHeartbeat = Date.now()
22+
23+
if (!isMonitoring) {
24+
isMonitoring = true
25+
if (monitorTimer) clearInterval(monitorTimer)
26+
27+
monitorTimer = setInterval(async () => {
28+
if (!isMonitoring || !currentSessionID) return
29+
30+
const timeSinceLastHeartbeat = Date.now() - lastHeartbeat
31+
const currentTimeout = isThinking ? THINKING_TIMEOUT : STANDARD_TIMEOUT
32+
33+
if (timeSinceLastHeartbeat > currentTimeout) {
34+
await recoverStalledSession(currentSessionID, timeSinceLastHeartbeat, isThinking)
35+
}
36+
}, CHECK_INTERVAL)
37+
}
38+
}
39+
40+
const stopMonitoring = () => {
41+
isMonitoring = false
42+
if (monitorTimer) {
43+
clearInterval(monitorTimer)
44+
monitorTimer = null
45+
}
46+
}
47+
48+
const updateHeartbeat = (isThinkingUpdate?: boolean) => {
49+
if (isMonitoring) {
50+
lastHeartbeat = Date.now()
51+
if (isThinkingUpdate !== undefined) {
52+
isThinking = isThinkingUpdate
53+
}
54+
}
55+
}
56+
57+
const recoverStalledSession = async (sessionID: string, stalledDuration: number, wasThinking: boolean) => {
58+
stopMonitoring()
59+
60+
try {
61+
const durationSec = Math.round(stalledDuration/1000)
62+
const typeStr = wasThinking ? "Thinking" : "Standard"
63+
64+
// 1. Notify User
65+
await ctx.client.tui.showToast({
66+
body: {
67+
title: "Pulse Monitor: Cardiac Arrest",
68+
message: `Session stalled (${typeStr}) for ${durationSec}s. Defibrillating...`,
69+
variant: "error",
70+
duration: 5000
71+
}
72+
}).catch(() => {})
73+
74+
// 2. Abort current generation (Defibrillation shock)
75+
await ctx.client.session.abort({ path: { id: sessionID } }).catch(() => {})
76+
77+
// 3. Wait a bit for state to settle
78+
await new Promise(resolve => setTimeout(resolve, 1500))
79+
80+
// 4. Prompt "continue" to kickstart (CPR)
81+
await ctx.client.session.prompt({
82+
path: { id: sessionID },
83+
body: { parts: [{ type: "text", text: "The connection was unstable and stalled. Please continue from where you left off." }] },
84+
query: { directory: ctx.directory }
85+
})
86+
87+
// Resume monitoring
88+
startMonitoring(sessionID)
89+
90+
} catch (err) {
91+
console.error("[PulseMonitor] Recovery failed:", err)
92+
// If recovery fails, we stop monitoring to avoid loops
93+
stopMonitoring()
94+
}
95+
}
96+
97+
return {
98+
event: async (input: { event: any }) => {
99+
const { event } = input
100+
const props = event.properties as Record<string, any> | undefined
101+
102+
// Monitor both session updates and part updates to capture token flow
103+
if (event.type === "session.updated" || event.type === "message.part.updated") {
104+
// Try to get sessionID from various common locations
105+
const sessionID = props?.info?.id || props?.sessionID
106+
107+
if (sessionID) {
108+
if (!isMonitoring) startMonitoring(sessionID)
109+
110+
// Check for thinking indicators in the payload
111+
let thinkingUpdate: boolean | undefined = undefined
112+
113+
if (event.type === "message.part.updated") {
114+
const part = props?.part
115+
if (part) {
116+
const THINKING_TYPES = ["thinking", "redacted_thinking", "reasoning"]
117+
if (THINKING_TYPES.includes(part.type)) {
118+
thinkingUpdate = true
119+
} else if (part.type === "text" || part.type === "tool_use") {
120+
thinkingUpdate = false
121+
}
122+
}
123+
}
124+
125+
updateHeartbeat(thinkingUpdate)
126+
}
127+
} else if (event.type === "session.idle" || event.type === "session.error" || event.type === "session.stopped") {
128+
stopMonitoring()
129+
}
130+
},
131+
"tool.execute.before": async () => {
132+
// Pause monitoring while tool runs locally (tools can take time)
133+
stopMonitoring()
134+
},
135+
"tool.execute.after": async (input: { sessionID: string }) => {
136+
// Resume monitoring after tool finishes
137+
if (input.sessionID) {
138+
startMonitoring(input.sessionID)
139+
}
140+
}
141+
}
142+
}

src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Plugin } from "@opencode-ai/plugin"
22
import { createBuiltinAgents } from "./agents"
3-
import { createTodoContinuationEnforcer, createContextWindowMonitorHook, createSessionRecoveryHook, createCommentCheckerHooks, createGrepOutputTruncatorHook } from "./hooks"
3+
import { createTodoContinuationEnforcer, createContextWindowMonitorHook, createSessionRecoveryHook, createCommentCheckerHooks, createGrepOutputTruncatorHook, createPulseMonitorHook } from "./hooks"
44
import { updateTerminalTitle } from "./features/terminal"
55
import { builtinTools } from "./tools"
66
import { createBuiltinMcps } from "./mcp"
@@ -43,6 +43,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => {
4343
const todoContinuationEnforcer = createTodoContinuationEnforcer(ctx)
4444
const contextWindowMonitor = createContextWindowMonitorHook(ctx)
4545
const sessionRecovery = createSessionRecoveryHook(ctx)
46+
const pulseMonitor = createPulseMonitorHook(ctx)
4647
const commentChecker = createCommentCheckerHooks()
4748
const grepOutputTruncator = createGrepOutputTruncatorHook(ctx)
4849

@@ -80,6 +81,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => {
8081
event: async (input) => {
8182
await todoContinuationEnforcer(input)
8283
await contextWindowMonitor.event(input)
84+
await pulseMonitor.event(input)
8385

8486
const { event } = input
8587
const props = event.properties as Record<string, unknown> | undefined
@@ -172,6 +174,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => {
172174
},
173175

174176
"tool.execute.before": async (input, output) => {
177+
await pulseMonitor["tool.execute.before"]()
175178
await commentChecker["tool.execute.before"](input, output)
176179

177180
if (input.sessionID === mainSessionID) {
@@ -186,6 +189,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => {
186189
},
187190

188191
"tool.execute.after": async (input, output) => {
192+
await pulseMonitor["tool.execute.after"](input)
189193
await grepOutputTruncator["tool.execute.after"](input, output)
190194
await contextWindowMonitor["tool.execute.after"](input, output)
191195
await commentChecker["tool.execute.after"](input, output)

0 commit comments

Comments
 (0)