-
Notifications
You must be signed in to change notification settings - Fork 0
refactor: modularize the log manager #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| import { WebSocketServer, WebSocket } from 'ws'; | ||
| import type { IncomingMessage } from 'http'; | ||
| import Docker from 'dockerode'; | ||
| import { Readable } from 'stream'; | ||
| import url from 'url'; | ||
|
|
||
| type LogClients = { | ||
| stream: Readable; | ||
| clients: Set<WebSocket>; | ||
| buffer: string[]; | ||
| maxBuffer: number; | ||
| }; | ||
|
|
||
| export type LogManagerOptions = { | ||
| /** WebSocket endpoint path (default: "/logs") */ | ||
| path?: string; | ||
| /** Number of recent log lines to retrieve and buffer (default: 100) */ | ||
| bufferLines?: number; | ||
| }; | ||
|
|
||
| export class LogManager { | ||
| private docker: Docker; | ||
| private path: string; | ||
| private bufferLines: number; | ||
| private logStreams = new Map<string, LogClients>(); | ||
|
|
||
| constructor(docker: Docker, opts: LogManagerOptions = {}) { | ||
| this.docker = docker; | ||
| this.path = opts.path ?? '/logs'; | ||
| this.bufferLines = Math.max(0, opts.bufferLines ?? 100); | ||
| } | ||
|
|
||
| attach(wss: WebSocketServer) { | ||
| wss.on('connection', (ws: WebSocket, req: IncomingMessage) => { | ||
| const parsed = url.parse(req.url || '', true); | ||
| if (!parsed.pathname || parsed.pathname !== this.path) { | ||
| ws.close(1008, 'Invalid WebSocket path'); | ||
| return; | ||
| } | ||
| const containerId = parsed.query.id as string | undefined; | ||
| if (!containerId) { | ||
| if (ws.readyState === ws.OPEN) { | ||
| ws.send('Missing container ID'); | ||
| } | ||
| ws.close(); | ||
| return; | ||
| } | ||
| this.handleConnection(ws, containerId).catch(err => { | ||
| try { | ||
| if (ws.readyState === ws.OPEN) { | ||
| ws.send(`Error: ${err?.message ?? String(err)}`); | ||
| } | ||
| } finally { | ||
| ws.close(); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| buildLogsUrl(protocol: 'ws' | 'wss', host: string, containerId: string) { | ||
| return `${protocol}://${host}${this.path}?id=${encodeURIComponent(containerId)}`; | ||
| } | ||
|
|
||
| // ----------------- Internals ----------------- | ||
|
|
||
| private async handleConnection(ws: WebSocket, containerId: string) { | ||
| let entry = this.logStreams.get(containerId); | ||
|
|
||
| if (!entry) { | ||
| const container = this.docker.getContainer(containerId); | ||
| const info = await container.inspect().catch(() => null); | ||
| if (!info) { | ||
| if (ws.readyState === ws.OPEN) ws.send('Container not found'); | ||
| ws.close(); | ||
| return; | ||
| } | ||
| if (!info.State?.Running && ws.readyState === ws.OPEN) { | ||
| ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); | ||
| } | ||
|
|
||
| container.logs( | ||
| { | ||
| follow: true, | ||
| stdout: true, | ||
| stderr: true, | ||
| tail: this.bufferLines, | ||
| }, | ||
| (err, stream) => { | ||
| if (err || !stream) { | ||
| try { | ||
| if (ws.readyState === ws.OPEN) { | ||
| ws.send(`Error retrieving logs: ${err?.message || 'Unknown error'}`); | ||
| } | ||
| } finally { | ||
| ws.close(); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| const nodeStream = stream as Readable; | ||
| const clientsSet = new Set<WebSocket>([ws]); | ||
|
|
||
| const created: LogClients = { | ||
| stream: nodeStream, | ||
| clients: clientsSet, | ||
| buffer: [], | ||
| maxBuffer: this.bufferLines, | ||
| }; | ||
| this.logStreams.set(containerId, created); | ||
|
|
||
| nodeStream.on('data', (chunk: Buffer) => { | ||
| const text = chunk.toString(); | ||
| for (const client of created.clients) { | ||
| if (client.readyState === client.OPEN) client.send(text); | ||
| } | ||
| this.pushToBuffer(created, text); | ||
| }); | ||
|
|
||
| nodeStream.on('error', (e: Error) => { | ||
| console.error('Log stream error:', e); | ||
| for (const client of created.clients) { | ||
| if (client.readyState === client.OPEN) { | ||
| client.send(`Log stream error: ${e.message}`); | ||
| client.close(); | ||
| } | ||
| } | ||
|
ConnorNeed marked this conversation as resolved.
|
||
| created.stream.destroy(); | ||
| this.logStreams.delete(containerId); | ||
| }); | ||
|
|
||
| nodeStream.on('end', () => { | ||
| for (const client of created.clients) { | ||
| if (client.readyState === client.OPEN) client.close(); | ||
| } | ||
| this.logStreams.delete(containerId); | ||
|
||
| }); | ||
|
|
||
| ws.on('close', () => this.detachClient(containerId, ws)); | ||
| } | ||
| ); | ||
| } else { | ||
| // replay buffer then join live | ||
| for (const line of entry.buffer) { | ||
| if (ws.readyState !== ws.OPEN) break; | ||
| ws.send(line); | ||
| } | ||
| entry.clients.add(ws); | ||
| ws.on('close', () => this.detachClient(containerId, ws)); | ||
| } | ||
| } | ||
|
|
||
| private detachClient(containerId: string, ws: WebSocket) { | ||
| const entry = this.logStreams.get(containerId); | ||
| if (!entry) return; | ||
| entry.clients.delete(ws); | ||
| if (entry.clients.size === 0) { | ||
| entry.stream.destroy(); | ||
| this.logStreams.delete(containerId); | ||
| } | ||
| } | ||
|
|
||
| private pushToBuffer(entry: LogClients, chunk: string | Buffer) { | ||
| const text = typeof chunk === 'string' ? chunk : chunk.toString(); | ||
| for (const line of text.split(/\r?\n/)) { | ||
| if (!line) continue; | ||
| entry.buffer.push(line); | ||
| if (entry.buffer.length > entry.maxBuffer) entry.buffer.shift(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,18 +1,12 @@ | ||||||||
| import express, { Request, Response } from 'express'; | ||||||||
| import http from 'http'; | ||||||||
| import { WebSocketServer, WebSocket } from 'ws'; | ||||||||
| import { WebSocketServer } from 'ws'; | ||||||||
| import Docker, { Container } from 'dockerode'; | ||||||||
| import url from 'url'; | ||||||||
| import { Readable } from 'stream'; | ||||||||
| import type { IncomingMessage } from 'http'; | ||||||||
| import { launchOptions, OptionConfig } from './config/launchOptions'; | ||||||||
| import cors from 'cors'; | ||||||||
| import { launchOptions } from './config/launchOptions'; | ||||||||
| import { LogManager } from './LogManager'; | ||||||||
|
|
||||||||
| type LogClients = { | ||||||||
| stream: Readable, | ||||||||
| clients: Set<WebSocket> | ||||||||
| }; | ||||||||
| const logStreams = new Map<string, LogClients>(); | ||||||||
| const DOCKER_LOG_BUFFER_SIZE = 100; | ||||||||
|
|
||||||||
| const sharedConfig: Partial<Docker.ContainerCreateOptions> = { | ||||||||
| Tty: true, | ||||||||
|
|
@@ -41,17 +35,17 @@ app.use(express.json()); | |||||||
|
|
||||||||
| const server = http.createServer(app); | ||||||||
| const wss = new WebSocketServer({ server }); | ||||||||
| const clients = new Map<WebSocket, Readable>(); | ||||||||
| const logs = new LogManager(docker, { path: '/logs', bufferLines: DOCKER_LOG_BUFFER_SIZE }); | ||||||||
| logs.attach(wss); | ||||||||
|
|
||||||||
| const sseClients: Response[] = []; | ||||||||
|
|
||||||||
| // ---- helpers ---- | ||||||||
| async function getContainerByName(name: string): Promise<Container | null> { | ||||||||
| try { | ||||||||
| const containers = await docker.listContainers({ all: true }); | ||||||||
| const containerInfo = containers.find(c => c.Names.includes(`/${name}`)); | ||||||||
| if (containerInfo) { | ||||||||
| return docker.getContainer(containerInfo.Id); | ||||||||
| } | ||||||||
| return null; | ||||||||
| const info = containers.find(c => c.Names.includes(`/${name}`)); | ||||||||
| return info ? docker.getContainer(info.Id) : null; | ||||||||
| } catch (err) { | ||||||||
| console.error('Error retrieving container by name:', err); | ||||||||
| return null; | ||||||||
|
|
@@ -110,92 +104,6 @@ app.get('/events', (req: Request, res: Response) => { | |||||||
| }); | ||||||||
| }); | ||||||||
|
|
||||||||
| wss.on('connection', async (ws: WebSocket, req: IncomingMessage) => { | ||||||||
| const parsed = url.parse(req.url || '', true); | ||||||||
| if (!parsed.pathname || parsed.pathname !== '/logs') { | ||||||||
| ws.close(1008, 'Invalid WebSocket path'); | ||||||||
| return; | ||||||||
| } | ||||||||
|
|
||||||||
| const containerId = parsed.query.id as string | undefined; | ||||||||
| if (!containerId) { | ||||||||
| ws.send('Missing container ID'); | ||||||||
| ws.close(); | ||||||||
| return; | ||||||||
| } | ||||||||
|
|
||||||||
| let logClients = logStreams.get(containerId); | ||||||||
| if (!logClients) { | ||||||||
|
|
||||||||
| try { | ||||||||
| const container = docker.getContainer(containerId); | ||||||||
| const info = await container.inspect(); | ||||||||
| if (!info.State.Running) { | ||||||||
| ws.send(`Container ${containerId} is not running. Logs may be incomplete.`); | ||||||||
| } | ||||||||
|
|
||||||||
| container.logs({ follow: true, stdout: true, stderr: true }, (err, stream) => { | ||||||||
| if (err || !stream) { | ||||||||
| ws.send(`Error retrieving logs: ${err?.message || 'Unknown error'}`); | ||||||||
| ws.close(); | ||||||||
| return; | ||||||||
| } | ||||||||
|
|
||||||||
| const nodeStream = stream as Readable; | ||||||||
| const clientsSet = new Set<WebSocket>(); | ||||||||
| clientsSet.add(ws); | ||||||||
|
|
||||||||
| logStreams.set(containerId, { stream: nodeStream, clients: clientsSet }); | ||||||||
|
|
||||||||
| nodeStream.on('data', chunk => { | ||||||||
| for (const client of clientsSet) { | ||||||||
| if (client.readyState === client.OPEN) { | ||||||||
| client.send(chunk.toString()); | ||||||||
| } | ||||||||
| } | ||||||||
| }); | ||||||||
|
|
||||||||
| nodeStream.on('error', err => { | ||||||||
| console.error('Log stream error:', err); | ||||||||
| for (const client of clientsSet) { | ||||||||
| if (client.readyState === client.OPEN) { | ||||||||
| client.send(`Log stream error: ${err.message}`); | ||||||||
| client.close(); | ||||||||
| } | ||||||||
| } | ||||||||
| logStreams.delete(containerId); | ||||||||
| }); | ||||||||
|
|
||||||||
| nodeStream.on('end', () => { | ||||||||
| for (const client of clientsSet) { | ||||||||
| if (client.readyState === client.OPEN) { | ||||||||
| client.close(); | ||||||||
| } | ||||||||
| } | ||||||||
| logStreams.delete(containerId); | ||||||||
| }); | ||||||||
| }); | ||||||||
|
|
||||||||
| } catch (err) { | ||||||||
| ws.send(`Container not found`); | ||||||||
| ws.close(); | ||||||||
| return; | ||||||||
| } | ||||||||
| } else { | ||||||||
| logClients.clients.add(ws); | ||||||||
| } | ||||||||
|
|
||||||||
| ws.on('close', () => { | ||||||||
| const logClients = logStreams.get(containerId); | ||||||||
| if (!logClients) return; | ||||||||
| logClients.clients.delete(ws); | ||||||||
| if (logClients.clients.size === 0) { | ||||||||
| logClients.stream.destroy(); | ||||||||
| logStreams.delete(containerId); | ||||||||
| } | ||||||||
| }); | ||||||||
| }); | ||||||||
|
|
||||||||
| app.post('/start/:option', async (req, res) => { | ||||||||
| const option = req.params.option; | ||||||||
| const optionConfig = launchOptions[option]; | ||||||||
|
|
@@ -319,6 +227,5 @@ app.post('/stop/:option', async (req: Request, res: Response) => { | |||||||
| })(); | ||||||||
|
|
||||||||
| const PORT = process.env.PORT || 8080; | ||||||||
| server.listen(PORT, () => { | ||||||||
| console.log(`Server running on port ${PORT}`); | ||||||||
| }); | ||||||||
| const shared = server.listen(PORT, () => console.log(`Server running on port ${PORT}`)); | ||||||||
| export default shared; | ||||||||
|
Comment on lines
+230
to
+231
|
||||||||
| const shared = server.listen(PORT, () => console.log(`Server running on port ${PORT}`)); | |
| export default shared; | |
| export default server.listen(PORT, () => console.log(`Server running on port ${PORT}`)); |
Uh oh!
There was an error while loading. Please reload this page.