From 3d281849ef117f79b3b20a7047c99c4094820f2d Mon Sep 17 00:00:00 2001 From: ConnorN Date: Mon, 10 Nov 2025 10:08:22 -0500 Subject: [PATCH 1/3] refactor: modularize the log manager --- src/LogManager.ts | 155 ++++++++++++++++++++++++++++++++++++++++++++++ src/server.ts | 117 ++++------------------------------ 2 files changed, 167 insertions(+), 105 deletions(-) create mode 100644 src/LogManager.ts diff --git a/src/LogManager.ts b/src/LogManager.ts new file mode 100644 index 0000000..e4d5f48 --- /dev/null +++ b/src/LogManager.ts @@ -0,0 +1,155 @@ +import { WebSocketServer, WebSocket } from 'ws'; +import type { IncomingMessage } from 'http'; +import Docker from 'dockerode'; +import { Readable } from 'stream'; +import url from 'url'; + +type LogClients = { + stream: Readable; + clients: Set; + buffer: string[]; + maxBuffer: number; +}; + +export type LogManagerOptions = { + /** WebSocket endpoint path (default: "/logs") */ + path?: string; + /** Number of lines to request from Docker initially AND keep in memory (default: 100) */ + bufferLines?: number; +}; + +export class LogManager { + private docker: Docker; + private path: string; + private bufferLines: number; + private logStreams = new Map(); + + constructor(docker: Docker, opts: LogManagerOptions = {}) { + this.docker = docker; + this.path = opts.path ?? '/logs'; + this.bufferLines = Math.max(0, opts.bufferLines ?? 100); + } + + attach(wss: WebSocketServer) { + wss.on('connection', (ws: WebSocket, req: IncomingMessage) => { + const parsed = url.parse(req.url || '', true); + if (!parsed.pathname || parsed.pathname !== this.path) { + ws.close(1008, 'Invalid WebSocket path'); + return; + } + const containerId = parsed.query.id as string | undefined; + if (!containerId) { + ws.send('Missing container ID'); + ws.close(); + return; + } + this.handleConnection(ws, containerId).catch(err => { + try { ws.send(`Error: ${err?.message ?? String(err)}`); } finally { ws.close(); } + }); + }); + } + + buildLogsUrl(protocol: 'ws' | 'wss', host: string, containerId: string) { + return `${protocol}://${host}${this.path}?id=${encodeURIComponent(containerId)}`; + } + + // ----------------- Internals ----------------- + + private async handleConnection(ws: WebSocket, containerId: string) { + let entry = this.logStreams.get(containerId); + + if (!entry) { + const container = this.docker.getContainer(containerId); + const info = await container.inspect().catch(() => null); + if (!info) { + ws.send('Container not found'); + ws.close(); + return; + } + if (!info.State?.Running) { + ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); + } + + container.logs( + { + follow: true, + stdout: true, + stderr: true, + tail: this.bufferLines, + }, + (err, stream) => { + if (err || !stream) { + try { ws.send(`Error retrieving logs: ${err?.message || 'Unknown error'}`); } finally { ws.close(); } + return; + } + + const nodeStream = stream as Readable; + const clientsSet = new Set([ws]); + + const created: LogClients = { + stream: nodeStream, + clients: clientsSet, + buffer: [], + maxBuffer: this.bufferLines, + }; + this.logStreams.set(containerId, created); + + nodeStream.on('data', (chunk: Buffer) => { + const text = chunk.toString(); + for (const client of created.clients) { + if (client.readyState === client.OPEN) client.send(text); + } + this.pushToBuffer(created, text); + }); + + nodeStream.on('error', (e: Error) => { + console.error('Log stream error:', e); + for (const client of created.clients) { + if (client.readyState === client.OPEN) { + client.send(`Log stream error: ${e.message}`); + client.close(); + } + } + this.logStreams.delete(containerId); + }); + + nodeStream.on('end', () => { + for (const client of created.clients) { + if (client.readyState === client.OPEN) client.close(); + } + this.logStreams.delete(containerId); + }); + + ws.on('close', () => this.detachClient(containerId, ws)); + } + ); + } else { + // replay buffer then join live + for (const line of entry.buffer) { + if (ws.readyState !== ws.OPEN) break; + ws.send(line + '\n'); + } + entry.clients.add(ws); + ws.on('close', () => this.detachClient(containerId, ws)); + } + } + + private detachClient(containerId: string, ws: WebSocket) { + const entry = this.logStreams.get(containerId); + if (!entry) return; + entry.clients.delete(ws); + if (entry.clients.size === 0) { + entry.stream.destroy(); + this.logStreams.delete(containerId); + } + } + + private pushToBuffer(entry: LogClients, chunk: string | Buffer) { + const text = typeof chunk === 'string' ? chunk : chunk.toString(); + for (const line of text.split(/\r?\n/)) { + if (!line) continue; + entry.buffer.push(line); + if (entry.buffer.length > entry.maxBuffer) entry.buffer.shift(); + } + } +} diff --git a/src/server.ts b/src/server.ts index cc22901..84dd8b5 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,18 +1,12 @@ import express, { Request, Response } from 'express'; import http from 'http'; -import { WebSocketServer, WebSocket } from 'ws'; +import { WebSocketServer } from 'ws'; import Docker, { Container } from 'dockerode'; -import url from 'url'; -import { Readable } from 'stream'; -import type { IncomingMessage } from 'http'; -import { launchOptions, OptionConfig } from './config/launchOptions'; import cors from 'cors'; +import { launchOptions } from './config/launchOptions'; +import { LogManager } from './LogManager'; -type LogClients = { - stream: Readable, - clients: Set -}; -const logStreams = new Map(); +const DOCKER_LOG_BUFFER_SIZE = 100; const sharedConfig: Partial = { Tty: true, @@ -41,17 +35,17 @@ app.use(express.json()); const server = http.createServer(app); const wss = new WebSocketServer({ server }); -const clients = new Map(); +const logs = new LogManager(docker, { path: '/logs', bufferLines: DOCKER_LOG_BUFFER_SIZE }); +logs.attach(wss); + const sseClients: Response[] = []; +// ---- helpers ---- async function getContainerByName(name: string): Promise { try { const containers = await docker.listContainers({ all: true }); - const containerInfo = containers.find(c => c.Names.includes(`/${name}`)); - if (containerInfo) { - return docker.getContainer(containerInfo.Id); - } - return null; + const info = containers.find(c => c.Names.includes(`/${name}`)); + return info ? docker.getContainer(info.Id) : null; } catch (err) { console.error('Error retrieving container by name:', err); return null; @@ -110,92 +104,6 @@ app.get('/events', (req: Request, res: Response) => { }); }); -wss.on('connection', async (ws: WebSocket, req: IncomingMessage) => { - const parsed = url.parse(req.url || '', true); - if (!parsed.pathname || parsed.pathname !== '/logs') { - ws.close(1008, 'Invalid WebSocket path'); - return; - } - - const containerId = parsed.query.id as string | undefined; - if (!containerId) { - ws.send('Missing container ID'); - ws.close(); - return; - } - - let logClients = logStreams.get(containerId); - if (!logClients) { - - try { - const container = docker.getContainer(containerId); - const info = await container.inspect(); - if (!info.State.Running) { - ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); - } - - container.logs({ follow: true, stdout: true, stderr: true }, (err, stream) => { - if (err || !stream) { - ws.send(`Error retrieving logs: ${err?.message || 'Unknown error'}`); - ws.close(); - return; - } - - const nodeStream = stream as Readable; - const clientsSet = new Set(); - clientsSet.add(ws); - - logStreams.set(containerId, { stream: nodeStream, clients: clientsSet }); - - nodeStream.on('data', chunk => { - for (const client of clientsSet) { - if (client.readyState === client.OPEN) { - client.send(chunk.toString()); - } - } - }); - - nodeStream.on('error', err => { - console.error('Log stream error:', err); - for (const client of clientsSet) { - if (client.readyState === client.OPEN) { - client.send(`Log stream error: ${err.message}`); - client.close(); - } - } - logStreams.delete(containerId); - }); - - nodeStream.on('end', () => { - for (const client of clientsSet) { - if (client.readyState === client.OPEN) { - client.close(); - } - } - logStreams.delete(containerId); - }); - }); - - } catch (err) { - ws.send(`Container not found`); - ws.close(); - return; - } - } else { - logClients.clients.add(ws); - } - - ws.on('close', () => { - const logClients = logStreams.get(containerId); - if (!logClients) return; - logClients.clients.delete(ws); - if (logClients.clients.size === 0) { - logClients.stream.destroy(); - logStreams.delete(containerId); - } - }); -}); - app.post('/start/:option', async (req, res) => { const option = req.params.option; const optionConfig = launchOptions[option]; @@ -319,6 +227,5 @@ app.post('/stop/:option', async (req: Request, res: Response) => { })(); const PORT = process.env.PORT || 8080; -server.listen(PORT, () => { - console.log(`Server running on port ${PORT}`); -}); +const shared = server.listen(PORT, () => console.log(`Server running on port ${PORT}`)); +export default shared; From 215c76bc65bfc9a44188747c29f0dc03f86bb1ce Mon Sep 17 00:00:00 2001 From: ConnorN Date: Fri, 14 Nov 2025 07:20:32 -0500 Subject: [PATCH 2/3] fix: check websockets before send --- src/LogManager.ts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/LogManager.ts b/src/LogManager.ts index e4d5f48..872cb75 100644 --- a/src/LogManager.ts +++ b/src/LogManager.ts @@ -14,7 +14,7 @@ type LogClients = { export type LogManagerOptions = { /** WebSocket endpoint path (default: "/logs") */ path?: string; - /** Number of lines to request from Docker initially AND keep in memory (default: 100) */ + /** Number of recent log lines to retrieve and buffer (used for Docker tail parameter and in-memory buffer size; default: 100) */ bufferLines?: number; }; @@ -39,7 +39,9 @@ export class LogManager { } const containerId = parsed.query.id as string | undefined; if (!containerId) { - ws.send('Missing container ID'); + if (ws.readyState === ws.OPEN) { + ws.send('Missing container ID'); + } ws.close(); return; } @@ -62,12 +64,14 @@ export class LogManager { const container = this.docker.getContainer(containerId); const info = await container.inspect().catch(() => null); if (!info) { - ws.send('Container not found'); + if (ws.readyState === ws.OPEN) ws.send('Container not found'); ws.close(); return; } if (!info.State?.Running) { - ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); + if (ws.readyState === ws.OPEN) { + ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); + } } container.logs( @@ -110,6 +114,7 @@ export class LogManager { client.close(); } } + created.stream.destroy(); this.logStreams.delete(containerId); }); @@ -127,7 +132,7 @@ export class LogManager { // replay buffer then join live for (const line of entry.buffer) { if (ws.readyState !== ws.OPEN) break; - ws.send(line + '\n'); + ws.send(line); } entry.clients.add(ws); ws.on('close', () => this.detachClient(containerId, ws)); From 6097a1a1a28f85eb54bf6c3c6ca72b124148a048 Mon Sep 17 00:00:00 2001 From: ConnorN Date: Fri, 14 Nov 2025 10:37:46 -0500 Subject: [PATCH 3/3] style: break long lines --- src/LogManager.ts | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/LogManager.ts b/src/LogManager.ts index 872cb75..96f492f 100644 --- a/src/LogManager.ts +++ b/src/LogManager.ts @@ -14,7 +14,7 @@ type LogClients = { export type LogManagerOptions = { /** WebSocket endpoint path (default: "/logs") */ path?: string; - /** Number of recent log lines to retrieve and buffer (used for Docker tail parameter and in-memory buffer size; default: 100) */ + /** Number of recent log lines to retrieve and buffer (default: 100) */ bufferLines?: number; }; @@ -46,7 +46,13 @@ export class LogManager { return; } this.handleConnection(ws, containerId).catch(err => { - try { ws.send(`Error: ${err?.message ?? String(err)}`); } finally { ws.close(); } + try { + if (ws.readyState === ws.OPEN) { + ws.send(`Error: ${err?.message ?? String(err)}`); + } + } finally { + ws.close(); + } }); }); } @@ -68,10 +74,8 @@ export class LogManager { ws.close(); return; } - if (!info.State?.Running) { - if (ws.readyState === ws.OPEN) { - ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); - } + if (!info.State?.Running && ws.readyState === ws.OPEN) { + ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); } container.logs( @@ -83,7 +87,13 @@ export class LogManager { }, (err, stream) => { if (err || !stream) { - try { ws.send(`Error retrieving logs: ${err?.message || 'Unknown error'}`); } finally { ws.close(); } + try { + if (ws.readyState === ws.OPEN) { + ws.send(`Error retrieving logs: ${err?.message || 'Unknown error'}`); + } + } finally { + ws.close(); + } return; }