diff --git a/extension/src/__tests__/command-scheduler.test.ts b/extension/src/__tests__/command-scheduler.test.ts new file mode 100644 index 0000000..ecde678 --- /dev/null +++ b/extension/src/__tests__/command-scheduler.test.ts @@ -0,0 +1,154 @@ +import { describe, expect, test } from 'bun:test'; + +import { CommandScheduler } from '../background/command-scheduler'; + +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +async function flushMicrotasks(rounds: number = 3): Promise { + for (let round = 0; round < rounds; round += 1) { + await Promise.resolve(); + } +} + +describe('CommandScheduler', () => { + test('runs different conversations concurrently up to the global limit', async () => { + const started: string[] = []; + const gates = new Map>>(); + + const scheduler = new CommandScheduler< + { id: string; conversation_id: string }, + string + >({ + maxConcurrentCommands: 2, + maxConcurrentHeavyCommands: 2, + processCommand: async (data) => { + started.push(data.id); + const gate = createDeferred(); + gates.set(data.id, gate); + await gate.promise; + return data.id; + }, + }); + + const first = scheduler.enqueue({ id: 'a1', conversation_id: 'conv-a' }); + const second = scheduler.enqueue({ id: 'b1', conversation_id: 'conv-b' }); + const third = scheduler.enqueue({ id: 'c1', conversation_id: 'conv-c' }); + + await flushMicrotasks(); + expect(started).toEqual(['a1', 'b1']); + + gates.get('a1')?.resolve(); + await flushMicrotasks(); + expect(started).toEqual(['a1', 'b1', 'c1']); + + gates.get('b1')?.resolve(); + gates.get('c1')?.resolve(); + + await expect(first).resolves.toBe('a1'); + await expect(second).resolves.toBe('b1'); + await expect(third).resolves.toBe('c1'); + }); + + test('keeps commands from the same conversation serialized', async () => { + const started: string[] = []; + const gates = new Map>>(); + + const scheduler = new CommandScheduler< + { id: string; conversation_id: string }, + string + >({ + maxConcurrentCommands: 3, + maxConcurrentHeavyCommands: 2, + processCommand: async (data) => { + started.push(data.id); + const gate = createDeferred(); + gates.set(data.id, gate); + await gate.promise; + return data.id; + }, + }); + + const first = scheduler.enqueue({ id: 'a1', conversation_id: 'conv-a' }); + const second = scheduler.enqueue({ id: 'a2', conversation_id: 'conv-a' }); + const third = scheduler.enqueue({ id: 'b1', conversation_id: 'conv-b' }); + + await flushMicrotasks(); + expect(started).toEqual(['a1', 'b1']); + + gates.get('b1')?.resolve(); + await flushMicrotasks(); + expect(started).toEqual(['a1', 'b1']); + + gates.get('a1')?.resolve(); + await flushMicrotasks(); + expect(started).toEqual(['a1', 'b1', 'a2']); + + gates.get('a2')?.resolve(); + + await expect(first).resolves.toBe('a1'); + await expect(second).resolves.toBe('a2'); + await expect(third).resolves.toBe('b1'); + }); + + test('limits heavy commands separately from overall concurrency', async () => { + const started: string[] = []; + const gates = new Map>>(); + + const scheduler = new CommandScheduler< + { id: string; conversation_id: string; heavy?: boolean }, + string + >({ + maxConcurrentCommands: 3, + maxConcurrentHeavyCommands: 1, + isHeavyCommand: (data) => data.heavy === true, + processCommand: async (data) => { + started.push(data.id); + const gate = createDeferred(); + gates.set(data.id, gate); + await gate.promise; + return data.id; + }, + }); + + const heavyA = scheduler.enqueue({ + id: 'heavy-a', + conversation_id: 'conv-a', + heavy: true, + }); + const heavyB = scheduler.enqueue({ + id: 'heavy-b', + conversation_id: 'conv-b', + heavy: true, + }); + const lightC = scheduler.enqueue({ + id: 'light-c', + conversation_id: 'conv-c', + heavy: false, + }); + + await flushMicrotasks(); + expect(started).toEqual(['heavy-a', 'light-c']); + + gates.get('light-c')?.resolve(); + await flushMicrotasks(); + expect(started).toEqual(['heavy-a', 'light-c']); + + gates.get('heavy-a')?.resolve(); + await flushMicrotasks(); + expect(started).toEqual(['heavy-a', 'light-c', 'heavy-b']); + + gates.get('heavy-b')?.resolve(); + + await expect(heavyA).resolves.toBe('heavy-a'); + await expect(heavyB).resolves.toBe('heavy-b'); + await expect(lightC).resolves.toBe('light-c'); + }); +}); diff --git a/extension/src/background/command-scheduler.ts b/extension/src/background/command-scheduler.ts new file mode 100644 index 0000000..60fc68d --- /dev/null +++ b/extension/src/background/command-scheduler.ts @@ -0,0 +1,287 @@ +export interface SchedulerCommand { + data: T; + resolve: (value: unknown) => void; + reject: (error: Error) => void; + addedAt: number; + conversationKey: string; + isHeavy: boolean; +} + +export interface CommandSchedulerOptions { + processCommand: (data: T) => Promise; + getConversationKey?: (data: T) => string; + isHeavyCommand?: (data: T) => boolean; + maxConcurrentCommands?: number; + maxConcurrentHeavyCommands?: number; + warnQueueLength?: number; + warnWaitMs?: number; + now?: () => number; +} + +export interface CommandSchedulerStatus { + queueLength: number; + activeCount: number; + activeHeavyCount: number; + activeConversations: string[]; +} + +const DEFAULT_MAX_CONCURRENT_COMMANDS = 3; +const DEFAULT_MAX_CONCURRENT_HEAVY_COMMANDS = 2; +const DEFAULT_WARN_QUEUE_LENGTH = 3; +const DEFAULT_WARN_WAIT_MS = 5000; + +export class CommandScheduler { + private readonly processCommand: (data: T) => Promise; + private readonly getConversationKey: (data: T) => string; + private readonly isHeavyCommand: (data: T) => boolean; + private readonly maxConcurrentCommands: number; + private readonly maxConcurrentHeavyCommands: number; + private readonly warnQueueLength: number; + private readonly warnWaitMs: number; + private readonly now: () => number; + + private readonly queues = new Map[]>(); + private readonly conversationOrder: string[] = []; + private readonly activeConversations = new Set(); + + private activeCount = 0; + private activeHeavyCount = 0; + private drainScheduled = false; + private roundRobinCursor = 0; + + constructor(options: CommandSchedulerOptions) { + this.processCommand = options.processCommand; + this.getConversationKey = + options.getConversationKey ?? + ((data: T) => { + const maybeConversationId = (data as { conversation_id?: unknown }) + ?.conversation_id; + const maybeTabId = (data as { tab_id?: unknown })?.tab_id; + if (typeof maybeConversationId === 'string' && maybeConversationId) { + return maybeConversationId; + } + if (typeof maybeTabId === 'number') { + return `tab:${maybeTabId}`; + } + return '__global__'; + }); + this.isHeavyCommand = options.isHeavyCommand ?? (() => false); + this.maxConcurrentCommands = Math.max( + 1, + options.maxConcurrentCommands ?? DEFAULT_MAX_CONCURRENT_COMMANDS, + ); + this.maxConcurrentHeavyCommands = Math.max( + 1, + options.maxConcurrentHeavyCommands ?? + DEFAULT_MAX_CONCURRENT_HEAVY_COMMANDS, + ); + this.warnQueueLength = Math.max( + 1, + options.warnQueueLength ?? DEFAULT_WARN_QUEUE_LENGTH, + ); + this.warnWaitMs = Math.max(0, options.warnWaitMs ?? DEFAULT_WARN_WAIT_MS); + this.now = options.now ?? (() => Date.now()); + } + + async enqueue(data: T): Promise { + return new Promise((resolve, reject) => { + const conversationKey = this.getConversationKey(data); + const queue = this.ensureQueue(conversationKey); + queue.push({ + data, + resolve: (value) => resolve(value as R), + reject, + addedAt: this.now(), + conversationKey, + isHeavy: this.isHeavyCommand(data), + }); + + if (this.getQueueLength() > this.warnQueueLength) { + console.warn( + `⚠️ Command scheduler backlog: ${this.getQueueLength()} commands pending`, + ); + } + + this.scheduleDrain(); + }); + } + + clearQueue(reason: string = 'Command queue cleared'): void { + for (const queue of this.queues.values()) { + for (const command of queue) { + command.reject(new Error(reason)); + } + } + + this.queues.clear(); + this.conversationOrder.length = 0; + this.roundRobinCursor = 0; + } + + getStatus(): CommandSchedulerStatus { + return { + queueLength: this.getQueueLength(), + activeCount: this.activeCount, + activeHeavyCount: this.activeHeavyCount, + activeConversations: Array.from(this.activeConversations), + }; + } + + private ensureQueue(conversationKey: string): SchedulerCommand[] { + let queue = this.queues.get(conversationKey); + if (!queue) { + queue = []; + this.queues.set(conversationKey, queue); + this.conversationOrder.push(conversationKey); + } + return queue; + } + + private getQueueLength(): number { + let total = 0; + for (const queue of this.queues.values()) { + total += queue.length; + } + return total; + } + + private scheduleDrain(): void { + if (this.drainScheduled) { + return; + } + + this.drainScheduled = true; + queueMicrotask(() => { + this.drainScheduled = false; + void this.drain(); + }); + } + + private async drain(): Promise { + while (this.activeCount < this.maxConcurrentCommands) { + const next = this.dequeueNextCommand(); + if (!next) { + break; + } + + this.launch(next); + } + } + + private dequeueNextCommand(): { + conversationKey: string; + command: SchedulerCommand; + } | null { + this.pruneConversationOrder(); + + if (this.conversationOrder.length === 0) { + return null; + } + + const orderSnapshot = [...this.conversationOrder]; + const startIndex = this.roundRobinCursor % orderSnapshot.length; + + for (let offset = 0; offset < orderSnapshot.length; offset += 1) { + const snapshotIndex = (startIndex + offset) % orderSnapshot.length; + const conversationKey = orderSnapshot[snapshotIndex]; + + if (this.activeConversations.has(conversationKey)) { + continue; + } + + const queue = this.queues.get(conversationKey); + if (!queue || queue.length === 0) { + continue; + } + + const command = queue[0]; + if ( + command.isHeavy && + this.activeHeavyCount >= this.maxConcurrentHeavyCommands + ) { + continue; + } + + queue.shift(); + const queueStillHasItems = queue.length > 0; + if (queue.length === 0) { + this.queues.delete(conversationKey); + } + + const liveIndex = this.conversationOrder.indexOf(conversationKey); + if (this.conversationOrder.length === 0) { + this.roundRobinCursor = 0; + } else if (liveIndex === -1) { + this.roundRobinCursor = 0; + } else if (queueStillHasItems) { + this.roundRobinCursor = (liveIndex + 1) % this.conversationOrder.length; + } else { + this.roundRobinCursor = liveIndex % this.conversationOrder.length; + } + + this.pruneConversationOrder(); + + return { conversationKey, command }; + } + + return null; + } + + private launch(next: { + conversationKey: string; + command: SchedulerCommand; + }): void { + const { conversationKey, command } = next; + this.activeConversations.add(conversationKey); + this.activeCount += 1; + if (command.isHeavy) { + this.activeHeavyCount += 1; + } + + const waitTime = this.now() - command.addedAt; + if (waitTime > this.warnWaitMs) { + console.warn(`⌛ Command waited ${waitTime}ms in scheduler queue`); + } + + void this.executeCommand(conversationKey, command); + } + + private async executeCommand( + conversationKey: string, + command: SchedulerCommand, + ): Promise { + try { + const result = await this.processCommand(command.data); + command.resolve(result); + } catch (error) { + command.reject(error as Error); + } finally { + this.activeConversations.delete(conversationKey); + this.activeCount = Math.max(0, this.activeCount - 1); + if (command.isHeavy) { + this.activeHeavyCount = Math.max(0, this.activeHeavyCount - 1); + } + + this.scheduleDrain(); + } + } + + private pruneConversationOrder(): void { + for ( + let index = this.conversationOrder.length - 1; + index >= 0; + index -= 1 + ) { + const conversationKey = this.conversationOrder[index]; + const queue = this.queues.get(conversationKey); + if (queue && queue.length > 0) { + continue; + } + + this.conversationOrder.splice(index, 1); + if (this.roundRobinCursor >= this.conversationOrder.length) { + this.roundRobinCursor = 0; + } + } + } +} diff --git a/extension/src/background/index.ts b/extension/src/background/index.ts index ca73359..ed538b3 100644 --- a/extension/src/background/index.ts +++ b/extension/src/background/index.ts @@ -6,6 +6,7 @@ */ import { wsClient } from '../websocket/client'; +import { CommandScheduler } from './command-scheduler'; import { captureScreenshot, compressIfNeeded, @@ -645,256 +646,161 @@ function cleanupTabState(conversationId: string, tabId: number): void { // Command Queue Management System // ============================================================================ -/** - * Command queue item interface - */ -interface QueuedCommand { - data: any; - resolve: (value: any) => void; - reject: (error: Error) => void; - addedAt: number; -} - -/** - * Command Queue Manager - * Prevents command stacking and ensures proper flow control - */ -class CommandQueueManager { - private queue: QueuedCommand[] = []; - private isProcessing = false; - private commandCooldown = 1000; // 1 second cooldown between commands - private lastCommandEndTime = 0; - private performanceHistory: Array<{ - type: string; - duration: number; - timestamp: number; - }> = []; - private readonly maxHistory = 20; - - /** - * Add command to queue - */ - async enqueue(data: any): Promise { - return new Promise((resolve, reject) => { - this.queue.push({ - data, - resolve, - reject, - addedAt: Date.now(), - }); +const commandPerformanceHistory: Array<{ + type: string; + duration: number; + timestamp: number; +}> = []; +const MAX_COMMAND_PERFORMANCE_HISTORY = 20; - // Start processing if not already processing - if (!this.isProcessing) { - this.processQueue(); - } - - // Log queue status - if (this.queue.length > 3) { - console.warn( - `⚠️ Command queue growing: ${this.queue.length} commands pending`, - ); - } - }); +function getCommandSchedulingKey(data: any): string { + if (typeof data?.conversation_id === 'string' && data.conversation_id) { + return data.conversation_id; } + if (typeof data?.tab_id === 'number') { + return `tab:${data.tab_id}`; + } + if (typeof data?.command_id === 'string' && data.command_id) { + return `command:${data.command_id}`; + } + return '__global__'; +} - /** - * Process the command queue - */ - private async processQueue(): Promise { - if (this.isProcessing || this.queue.length === 0) { - return; - } - - this.isProcessing = true; - - while (this.queue.length > 0) { - const queuedCommand = this.queue.shift()!; - const waitTime = Date.now() - queuedCommand.addedAt; +function isHeavyBrowserCommand(data: any): boolean { + switch (data?.type) { + case 'highlight_elements': + case 'highlight_single_element': + case 'screenshot': + case 'javascript_execute': + case 'click_element': + case 'hover_element': + case 'scroll_element': + case 'swipe_element': + case 'keyboard_input': + case 'select_element': + case 'handle_dialog': + return true; + case 'tab': + return ( + data?.action === 'init' || + data?.action === 'open' || + data?.action === 'switch' || + data?.action === 'refresh' || + data?.action === 'view' || + data?.action === 'back' || + data?.action === 'forward' + ); + default: + return false; + } +} - // Warn about long wait times - if (waitTime > 5000) { - console.warn( - `⌛ Command waited ${waitTime}ms in queue before processing`, - ); - } +function recordCommandPerformance(type: string, duration: number): void { + commandPerformanceHistory.push({ + type, + duration, + timestamp: Date.now(), + }); - try { - // Apply cooldown between commands if needed - const timeSinceLastCommand = Date.now() - this.lastCommandEndTime; - if (timeSinceLastCommand < this.commandCooldown) { - const cooldownDelay = this.commandCooldown - timeSinceLastCommand; - console.log(`⏸️ Command cooldown: waiting ${cooldownDelay}ms`); - await new Promise((resolve) => setTimeout(resolve, cooldownDelay)); - } + if (commandPerformanceHistory.length > MAX_COMMAND_PERFORMANCE_HISTORY) { + commandPerformanceHistory.shift(); + } - // Process the command - const result = await this.processCommand(queuedCommand.data); - queuedCommand.resolve(result); + if (commandPerformanceHistory.length < 5) { + return; + } - // Update last command end time - this.lastCommandEndTime = Date.now(); - } catch (error) { - queuedCommand.reject(error as Error); - this.lastCommandEndTime = Date.now(); - } - } + const recent = commandPerformanceHistory.slice(-5); + const avgDuration = + recent.reduce((sum, cmd) => sum + cmd.duration, 0) / recent.length; - this.isProcessing = false; + if (avgDuration > 5000) { + console.warn( + `📉 Performance degradation detected: avg command time ${avgDuration.toFixed(0)}ms`, + ); } +} - /** - * Process individual command (original command handling logic) - * Public method so watchdog can wrap it - */ - public async processCommand(data: any): Promise { - // This is the original command handling logic from wsClient.onMessage - const commandId = data.command_id || `unknown_${Date.now()}`; - const commandType = data.type || 'unknown'; - const commandStartTime = Date.now(); - - // Track command execution - wsClient.trackCommandStart(commandId, commandType, { - conversation_id: data.conversation_id, - action: data.action, - tab_id: data.tab_id, - url: data.url, - }); - - try { - const response = await handleCommand(data as Command); - const commandDuration = Date.now() - commandStartTime; +async function processQueuedCommand(data: any): Promise { + watchdog.tick(); - // Record performance - this.recordPerformance(commandType, commandDuration); + const commandId = data.command_id || `unknown_${Date.now()}`; + const commandType = data.type || 'unknown'; + const commandStartTime = Date.now(); - // Warn about long-running commands - if (commandDuration > 10000) { - console.warn( - `⚠️ Long command execution: ${commandType} took ${commandDuration}ms`, - ); - } + wsClient.trackCommandStart(commandId, commandType, { + conversation_id: data.conversation_id, + action: data.action, + tab_id: data.tab_id, + url: data.url, + }); - // Send response back to server - if (wsClient.isConnected()) { - const responseWithId = { - ...response, - command_id: data.command_id, - timestamp: Date.now(), - }; + try { + const response = await handleCommand(data as Command); + const commandDuration = Date.now() - commandStartTime; - wsClient - .sendMessage({ - type: 'command_response', - ...responseWithId, - }) - .catch((error) => { - console.error('Failed to send response:', error); - }); - } + recordCommandPerformance(commandType, commandDuration); - return response; - } catch (error) { - console.error('Error handling command:', error); - const commandDuration = Date.now() - commandStartTime; + if (commandDuration > 10000) { + console.warn( + `⚠️ Long command execution: ${commandType} took ${commandDuration}ms`, + ); + } - // Send error response - const errorResponse: CommandResponse = { - success: false, + if (wsClient.isConnected()) { + const responseWithId = { + ...response, command_id: data.command_id, - error: error instanceof Error ? error.message : 'Unknown error', timestamp: Date.now(), }; - if (wsClient.isConnected()) { - wsClient - .sendMessage({ type: 'command_response', ...errorResponse }) - .catch(console.error); - } - - if (commandDuration > 10000) { - console.warn( - `⚠️ Long failed command: ${commandType} failed after ${commandDuration}ms`, - ); - } - - throw error; - } finally { - // End command tracking - wsClient.trackCommandEnd(commandId); - } - } - - /** - * Record command performance for monitoring - */ - private recordPerformance(type: string, duration: number): void { - this.performanceHistory.push({ - type, - duration, - timestamp: Date.now(), - }); - - if (this.performanceHistory.length > this.maxHistory) { - this.performanceHistory.shift(); + wsClient + .sendMessage({ + type: 'command_response', + ...responseWithId, + }) + .catch((error) => { + console.error('Failed to send response:', error); + }); } - // Detect performance degradation - if (this.performanceHistory.length >= 5) { - const recent = this.performanceHistory.slice(-5); - const avgDuration = - recent.reduce((sum, cmd) => sum + cmd.duration, 0) / recent.length; - - if (avgDuration > 5000) { - console.warn( - `📉 Performance degradation detected: avg command time ${avgDuration.toFixed(0)}ms`, - ); - - // Adaptive cooldown adjustment - if (avgDuration > 10000) { - this.commandCooldown = 2000; // Increase to 2 seconds - console.log( - `⚙️ Increased command cooldown to ${this.commandCooldown}ms`, - ); - } - } else if (avgDuration < 1000 && this.commandCooldown > 1000) { - // Reset cooldown if performance improves - this.commandCooldown = 1000; - console.log(`⚙️ Reset command cooldown to ${this.commandCooldown}ms`); - } - } - } + return response; + } catch (error) { + console.error('Error handling command:', error); + const commandDuration = Date.now() - commandStartTime; - /** - * Get queue status - */ - getStatus() { - return { - queueLength: this.queue.length, - isProcessing: this.isProcessing, - lastCommandEndTime: this.lastCommandEndTime, - performanceHistory: [...this.performanceHistory], + const errorResponse: CommandResponse = { + success: false, + command_id: data.command_id, + error: error instanceof Error ? error.message : 'Unknown error', + timestamp: Date.now(), }; - } - /** - * Clear queue (emergency cleanup) - */ - clearQueue(): void { - console.warn( - `🧹 Clearing command queue with ${this.queue.length} pending commands`, - ); + if (wsClient.isConnected()) { + wsClient + .sendMessage({ type: 'command_response', ...errorResponse }) + .catch(console.error); + } - for (const queuedCommand of this.queue) { - queuedCommand.reject(new Error('Command queue cleared')); + if (commandDuration > 10000) { + console.warn( + `⚠️ Long failed command: ${commandType} failed after ${commandDuration}ms`, + ); } - this.queue = []; - this.isProcessing = false; + throw error; + } finally { + wsClient.trackCommandEnd(commandId); } } -// Initialize command queue manager -const commandQueue = new CommandQueueManager(); +const commandQueue = new CommandScheduler({ + processCommand: processQueuedCommand, + getConversationKey: getCommandSchedulingKey, + isHeavyCommand: isHeavyBrowserCommand, + maxConcurrentCommands: 3, + maxConcurrentHeavyCommands: 2, +}); // ============================================================================ // Watchdog Timer for Main Thread Freeze Detection @@ -979,13 +885,6 @@ class WatchdogTimer { const watchdog = new WatchdogTimer(); watchdog.start(); -// Update watchdog on each command processing - wrap the processCommand method -const originalProcessCommand = commandQueue.processCommand.bind(commandQueue); -commandQueue.processCommand = async function (data: any) { - watchdog.tick(); - return originalProcessCommand(data); -}; - // ============================================================================ // Initialize tab manager diff --git a/extension/src/commands/screenshot.ts b/extension/src/commands/screenshot.ts index fa3005e..112337c 100644 --- a/extension/src/commands/screenshot.ts +++ b/extension/src/commands/screenshot.ts @@ -1462,10 +1462,6 @@ export async function captureScreenshot( `📸 [Screenshot] Parameters: quality=${quality}, resizeToPreset=${resizeToPreset} (已忽略), waitForRender=${waitForRender}`, ); - // ⏱️ Always wait 500ms before capturing to ensure page is fully rendered - console.log(`⏳ [Screenshot] Waiting 500ms before capture...`); - await new Promise((resolve) => setTimeout(resolve, 500)); - // Validate parameters if (quality < 1 || quality > 100) { throw new Error(