Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions src/daemon-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
getConfigHash,
getSocketDir,
getSocketPath,
getTimeoutMs,
} from './config.js';
import {
type DaemonRequest,
Expand Down Expand Up @@ -49,45 +50,74 @@ function generateRequestId(): string {
}

/**
* Send a request to the daemon and wait for response
* Default timeout for quick IPC requests (ping, listTools, etc.)
*/
const DEFAULT_IPC_TIMEOUT_MS = 5000;

/**
* Send a request to the daemon and wait for response.
*
* The daemon closes the connection after writing the full response,
* so we buffer all incoming data and parse on connection close.
*
* @param socketPath - Unix socket path for daemon IPC
* @param request - The daemon request to send
* @param timeoutMs - Timeout in milliseconds (default: 5000 for quick IPC,
* use getTimeoutMs() for tool execution)
*/
async function sendRequest(
socketPath: string,
request: DaemonRequest,
timeoutMs: number = DEFAULT_IPC_TIMEOUT_MS,
): Promise<DaemonResponse> {
return new Promise((resolve, reject) => {
let buffer = '';
let settled = false;

const socket = Bun.connect({
unix: socketPath,
socket: {
open(socket) {
socket.write(JSON.stringify(request));
},
data(socket, data) {
try {
const response = JSON.parse(data.toString().trim());
socket.end();
resolve(response);
} catch (err) {
socket.end();
reject(new Error('Invalid response from daemon'));
}
if (settled) return;
buffer += data.toString();
},
error(socket, error) {
reject(error);
if (!settled) {
settled = true;
reject(error);
}
},
close() {
// Connection closed
// Daemon closes the connection after writing the full response
if (!settled && buffer.length > 0) {
try {
const response = JSON.parse(buffer.trim());
settled = true;
resolve(response);
} catch {
settled = true;
reject(new Error('Invalid response from daemon'));
}
}
},
connectError(socket, error) {
reject(error);
if (!settled) {
settled = true;
reject(error);
}
},
},
});

// Timeout after 5 seconds (fast fallback to direct connection)
setTimeout(() => {
reject(new Error('Daemon request timeout'));
}, 5000);
if (!settled) {
settled = true;
reject(new Error('Daemon request timeout'));
}
}, timeoutMs);
});
}

Expand Down Expand Up @@ -286,12 +316,16 @@ export async function getDaemonConnection(
toolName: string,
args: Record<string, unknown>,
): Promise<unknown> {
const response = await sendRequest(socketPath, {
id: generateRequestId(),
type: 'callTool',
toolName,
args,
});
const response = await sendRequest(
socketPath,
{
id: generateRequestId(),
type: 'callTool',
toolName,
args,
},
getTimeoutMs(),
);

if (!response.success) {
throw new Error(response.error?.message ?? 'callTool failed');
Expand Down
14 changes: 13 additions & 1 deletion src/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,19 @@ export async function runDaemon(
},
async data(socket, data) {
const response = await handleRequest(data);
socket.write(`${JSON.stringify(response)}\n`);
const payload = `${JSON.stringify(response)}\n`;
// Write in chunks to handle large payloads that exceed socket buffer
let offset = 0;
while (offset < payload.length) {
const written = socket.write(payload.slice(offset));
if (written === 0) {
await new Promise((r) => setTimeout(r, 1));
continue;
}
offset += written;
}
socket.flush();
socket.end();
},
close(socket) {
activeConnections.delete(socket);
Expand Down