diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml new file mode 100644 index 0000000..9f2fafa --- /dev/null +++ b/.github/workflows/check.yml @@ -0,0 +1,21 @@ +name: Check + +on: + pull_request: + push: + branches: + - main + +jobs: + check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-node@v4 + with: + node-version: 20 + - run: npm ci + + - name: Run test + run: npm run test diff --git a/README.md b/README.md index 233c05c..272474e 100644 --- a/README.md +++ b/README.md @@ -22,14 +22,92 @@ During SDK testing, it’s often useful to simulate real-world conditions or ins - Replay server messages for automated or regression testing. - Monitor raw protocol-level activity to catch subtle bugs or race conditions. -πŸš€ Getting Started +### πŸš€ Getting Started Install npm dependency: ```bash -npm i -D @ably-labs/local-proxy +npm install --save-dev @ably-labs/local-proxy ``` +### πŸ§ͺ Basic Usage +```ts +import { createInterceptingProxy } from '@ably-labs/local-proxy'; +// 1. Create and start the proxy +const proxy = createInterceptingProxy({ + // options here realtimeHost, restHost +}); +await proxy.start(); +// 2. Configure your SDK to use proxy.options +const client = new Ably.Realtime({ + ...proxy.options + // aditonal option +}); + +// 3. Observe an outgoing HTTP request +const request = await proxy.observeNextRequest(req => + req.url.includes('/channels') +); + +// 4. Observe an incoming protocol message +proxy.observeNextIncomingProtocolMessage(msg => + msg.action === 15 // Presence message +) + +// 5. Inject a fake message into the client +proxy.injectProtocolMessage(client.connection.id, { + action: 9, // Example: SYNC + channel: 'room:test', + connectionId: client.connection.id, + msgSerial: 0, + connectionSerial: -1, + data: { custom: 'data' }, +}); +``` + +### 🧩 Replace a Server Message + +You can also simulate faulty server responses: + +```ts +proxy.replaceNextIncomingProtocolMessage( + { + action: 9, // Fake SYNC + channel: 'room:test', + data: [], + }, + msg => msg.action === 9 // Replace only SYNC messages +); +``` + + +### πŸ”Œ Drop or Pause a Connection + +```ts +// Drop connection by ID (force disconnect) +proxy.dropConnection(client.connection.id); + +// Pause and resume connection manually +const resume = proxy.pauseConnection(); +// simulate a pause... +setTimeout(() => resume(), 5000); +``` + + +### πŸ”§ Register Middleware + +For more advanced use cases, register middlewares to continuously inspect or modify traffic: + +```ts +const unregister = proxy.registerRestMiddleware(req => { + if (req.url.includes('/channels')) { + console.log('Intercepted REST request:', req); + // You can modify headers, body, or response here + } +}); +// Later... +unregister(); +``` diff --git a/package.json b/package.json index b1811c1..2a5a722 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@ably-labs/local-proxy", - "version": "0.0.6", - "description": "Unified Test Suite for Chat SDKs. An executable npm package designed to provide a consistent testing environment for different Ably Chat SDKs", + "version": "0.1.0", + "description": "Local proxy for SDK testing", "scripts": { "build": "tsc", "prepare": "npm run build", diff --git a/src/async-queue.ts b/src/async-queue.ts new file mode 100644 index 0000000..de23ff6 --- /dev/null +++ b/src/async-queue.ts @@ -0,0 +1,36 @@ +import { CompletableDeferred } from './completable-deferred'; + +type AsyncTask = () => Promise; + +export class AsyncQueue { + private readonly queue: AsyncTask[] = []; + private processing: boolean = false; + + enqueue(task: AsyncTask): Promise { + const deferredValue = CompletableDeferred(); + this.queue.push(async () => { + try { + await task(); + } finally { + deferredValue.complete(); + } + }); + this.processNext(); + return deferredValue.get() + } + + private async processNext() { + if (this.processing || this.queue.length === 0) return; + + this.processing = true; + + const task = this.queue.shift(); + + try { + await task!!(); + } finally { + this.processing = false; + this.processNext(); + } + } +} diff --git a/src/completable-deferred.ts b/src/completable-deferred.ts new file mode 100644 index 0000000..78f1246 --- /dev/null +++ b/src/completable-deferred.ts @@ -0,0 +1,22 @@ +type CompletionHandler = (value: T) => void + +class DefaultCompletableDeferred { + private _completeWith!: CompletionHandler; + private value: T | undefined + + private valuePromise = new Promise(resolve => { + this._completeWith = resolve; + }); + + public complete(value: T): void { + this._completeWith(value); + } + + public async get(): Promise { + return this.value ?? this.valuePromise; + } +} + +export function CompletableDeferred() { + return new DefaultCompletableDeferred() +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..8e8691e --- /dev/null +++ b/src/index.ts @@ -0,0 +1 @@ +export * from './proxy.js'; diff --git a/src/proxy.ts b/src/proxy.ts new file mode 100644 index 0000000..8edde65 --- /dev/null +++ b/src/proxy.ts @@ -0,0 +1,460 @@ +import type Ably from 'ably'; +import http from 'http'; +import httpProxy from 'http-proxy'; +import WebSocket, { WebSocketServer } from 'ws'; +import { AsyncQueue } from './async-queue.js'; +import { CompletableDeferred } from './completable-deferred'; + +export interface ProtocolMessage { + action?: number; + flags?: number; + id?: string; + timestamp?: number; + count?: number; + error?: Ably.ErrorInfo; + connectionId?: string; + channel?: string; + channelSerial?: string | null; + msgSerial?: number; + messages?: Ably.Message[]; + presence?: Ably.PresenceMessage[]; + auth?: unknown; + connectionDetails?: Record; +} + +export interface Request { + headers: http.IncomingHttpHeaders; + data: T; + url: string | undefined; + params: URLSearchParams; +} + +export interface Response { + headers: http.IncomingHttpHeaders; + statusCode?: number; + data: T; +} + +export interface RealtimeMiddlewareCommand { + command: 'drop' | 'replace' | 'keep'; + replacement?: ProtocolMessage | undefined; +} + +export type MessageType = 'incoming' | 'outgoing'; + +type RestMiddleware = (req: Request, res: Response) => Promise>; +type RealtimeMiddleware = (message: ProtocolMessage, messageType: MessageType) => Promise; + +/** + * Local proxy for test purposes. Core functionality includes: + * + * - Spying on HTTP Requests and Responses + * - Inspect outgoing HTTP requests from the client. + * - Inspect and modify incoming HTTP responses. + * + * - Spying on WebSocket Messages + * - Observe outgoing WebSocket messages from the client. + * - Inspect and modify incoming WebSocket messages. + * + * - Injecting WebSocket Messages + * - Simulate server messages by injecting WebSocket frames into the client session. + * + * - Breaking WebSocket and HTTP Connections + * - Force WebSocket disconnections to test reconnection behavior. + * - Simulate slow or broken HTTP responses to test timeout handling. + */ +export interface InterceptingProxy { + /** + * Options that can be used to configure an Ably Realtime or Rest client + * to point to this proxy instead of directly to Ably backend. + * + * Includes overridden `restHost`, `realtimeHost`, and `tls` settings + * so SDK traffic can be routed through the proxy. + */ + get options(): Ably.ClientOptions; + + /** + * Starts the proxy server. This must be called before intercepting any traffic. + * + * @returns A promise that resolves once the proxy is running and ready. + */ + start(): Promise; + + /** + * Observes the next outgoing HTTP request made by the client. + * + * @param filter Optional predicate function to filter observed requests. + * @returns A promise that resolves with the matching request object. + */ + observeNextRequest(filter?: (req: Request) => boolean): Promise>; + + /** + * Observes the next outgoing Ably protocol message from the client. + * + * @param filter Optional predicate function to filter observed messages. + * @returns A promise that resolves with the matching protocol message. + */ + observeNextOutgoingProtocolMessage(filter?: (msg: ProtocolMessage) => boolean): Promise; + + /** + * Observes the next incoming HTTP response received by the client. + * + * @param filter Optional predicate function to filter observed responses. + * @returns A promise that resolves with the matching response object. + */ + observeNextResponse(filter?: (res: Response) => boolean): Promise>; + + /** + * Observes the next Ably protocol message received from the server. + * + * @param filter Optional predicate function to filter observed messages. + * @returns A promise that resolves with the matching protocol message. + */ + observeNextIncomingProtocolMessage(filter?: (msg: ProtocolMessage) => boolean): Promise; + + /** + * Replaces the next incoming Ably protocol message with a custom one. + * + * Can be used to simulate modified or corrupted server messages. + * + * @param replacement The message to inject in place of the real one. + * @param filter Optional predicate function to filter which message to replace. + */ + replaceNextIncomingProtocolMessage(replacement: ProtocolMessage, filter?: (msg: ProtocolMessage) => boolean): Promise; + + /** + * Injects a protocol message into the client's connection as if it was sent from the server. + * + * Useful for simulating specific server-side events or message sequences. + * + * @param connectionId The ID of the connection to inject the message into. + * @param msg The protocol message to inject. + */ + injectProtocolMessage(connectionId: string, msg: ProtocolMessage): void; + + /** + * Drops the connection with the specified ID. + * + * Forces a WebSocket disconnection to simulate network interruptions or server disconnects. + * + * @param connectionId The ID of the connection to drop. + */ + dropConnection(connectionId: string): void; + + /** + * Pauses all network activity for the active connection. + * + * Can be used to simulate network latency or stalled connections. + * Returns a function that, when called, resumes the connection. + * + * @returns A function to resume the paused connection. + */ + pauseAllConnections(): () => void; + + /** + * Registers a middleware function to inspect or modify HTTP REST requests and responses. + * + * @param middleware A function to intercept and optionally modify REST traffic. + * @returns A function to unregister the middleware. + */ + registerRestMiddleware(middleware: RestMiddleware): () => void; + + /** + * Registers a middleware function to inspect or modify WebSocket (Realtime) messages. + * + * @param middleware A function to intercept and optionally modify protocol messages. + * @returns A function to unregister the middleware. + */ + registerRealtimeMiddleware(middleware: RealtimeMiddleware): () => void; +} + +class DefaultInterceptingProxy implements InterceptingProxy { + private readonly realtimeClientOptions: Ably.ClientOptions; + private readonly restMiddlewares: RestMiddleware[] = []; + private readonly realtimeMiddlewares: RealtimeMiddleware[] = []; + private restQueue = new AsyncQueue(); + private realtimeQueue = new AsyncQueue(); + private connectionIdToWs = new Map(); + + private readonly proxy = httpProxy.createProxyServer({}); + private readonly wss = new WebSocketServer({ noServer: true }); + + private readonly server: http.Server = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { + this.proxy.web(req, res, { target: this.targetRestUrl, selfHandleResponse: true }, (err) => console.error(err)); + + const requestChunks: Buffer[] = []; + + this.proxy.once('proxyRes', (proxyRes, req, res) => { + const responseChunks: Buffer[] = []; + + proxyRes.on('data', (chunk) => responseChunks.push(chunk)); + proxyRes.on('end', () => { + try { + const responseData = JSON.parse(Buffer.concat(responseChunks).toString()); + const requestData = JSON.parse(Buffer.concat(requestChunks).toString()); + + this.restQueue.enqueue(async () => { + const modifiedResponse = await this.applyRestMiddlewares( + { + headers: req.headers, + data: requestData, + url: req.url, + params: new URLSearchParams(req.url), + }, + { + headers: proxyRes.headers, + data: responseData, + statusCode: proxyRes.statusCode || 200, + }, + ); + res.writeHead(modifiedResponse.statusCode || 200, modifiedResponse.headers); + res.end(JSON.stringify(modifiedResponse.data)); + }); + } catch (e: unknown) { + res.writeHead(500, proxyRes.headers); + res.end(`Proxy error: ${e}`); + } + }); + }); + + req.on('data', (chunk) => requestChunks.push(chunk)); + }); + + private serverIsRunning = false; + private port = 21345; + + constructor(realtimeClientOptions: Ably.ClientOptions) { + this.realtimeClientOptions = realtimeClientOptions; + + this.server.on('upgrade', (req, socket, head) => { + this.wss.handleUpgrade(req, socket, head, (ws) => { + const targetWs = new WebSocket(this.targetRealtimeUrl); + const deferredValue = CompletableDeferred(); + targetWs.on('open', () => deferredValue.complete()); + + ws.on('message', (message) => { + let data: ProtocolMessage; + try { + data = JSON.parse(message.toString()); + } catch (e: unknown) { + console.error(e); + return; + } + + this.realtimeQueue.enqueue(async () => { + await deferredValue.get(); + const modifiedMessage = await this.applyRealtimeMiddlewares(data, 'outgoing'); + if (modifiedMessage) { + targetWs.send(JSON.stringify(modifiedMessage)); + } + }); + }); + + targetWs.on('message', (message) => { + let data: ProtocolMessage; + try { + data = JSON.parse(message.toString()); + } catch (e: unknown) { + console.error(e); + return; + } + + if (data.action === 4) { + this.connectionIdToWs.set(data.connectionId!!, ws); + } + + this.realtimeQueue.enqueue(async () => { + const modifiedMessage = await this.applyRealtimeMiddlewares(data, 'incoming'); + if (modifiedMessage) { + ws.send(JSON.stringify(modifiedMessage)); + } + }); + }); + + ws.on('close', () => { + targetWs.close(); + this.closeWs(ws); + }); + targetWs.on('close', () => this.closeWs(ws)); + }); + }); + } + + async observeNextRequest(filter?: (req: Request) => boolean): Promise> { + const deferredValue = CompletableDeferred>(); + const unregister = this.registerRestMiddleware(async (req, res) => { + if (!filter || filter(req)) deferredValue.complete(req); + return res; + }); + const request: Request = await deferredValue.get(); + unregister(); + return request; + } + + async observeNextOutgoingProtocolMessage(filter?: (msg: ProtocolMessage) => boolean): Promise { + const deferredValue = CompletableDeferred(); + const unregister = this.registerRealtimeMiddleware(async (msg, messageType) => { + if (messageType === 'outgoing' && (!filter || filter(msg))) deferredValue.complete(msg); + return { command: 'keep' }; + }); + const message: ProtocolMessage = await deferredValue.get(); + unregister(); + return message; + } + + async observeNextResponse(filter?: (res: Response) => boolean): Promise> { + const deferredValue = CompletableDeferred>(); + const unregister = this.registerRestMiddleware(async (_, res) => { + if (filter && !filter(res)) return res; + deferredValue.complete(res); + return res; + }); + + const response: Response = await deferredValue.get(); + unregister(); + return response; + } + + async observeNextIncomingProtocolMessage(filter?: (msg: ProtocolMessage) => boolean): Promise { + const deferredValue = CompletableDeferred(); + const unregister = this.registerRealtimeMiddleware(async (msg, messageType) => { + if (messageType === 'incoming' && (!filter || filter(msg))) deferredValue.complete(msg); + return { command: 'keep' }; + }); + const message: ProtocolMessage = await deferredValue.get(); + unregister(); + return message; + } + + pauseAllConnections(): () => void { + let resume: () => void; + const unregister = this.registerRealtimeMiddleware(async (_, __) => { + await new Promise((resolve) => (resume = resolve)); + return { command: 'keep' }; + }); + return () => { + resume(); + unregister(); + }; + } + + registerRestMiddleware(middleware: RestMiddleware): () => void { + this.restMiddlewares.push(middleware); + + return () => { + const index = this.restMiddlewares.indexOf(middleware); + if (index !== -1) { + this.restMiddlewares.splice(index, 1); + } + }; + } + + registerRealtimeMiddleware(middleware: RealtimeMiddleware): () => void { + this.realtimeMiddlewares.push(middleware); + + return () => { + const index = this.realtimeMiddlewares.indexOf(middleware); + if (index !== -1) { + this.realtimeMiddlewares.splice(index, 1); + } + }; + } + + get options(): Ably.ClientOptions { + if (!this.serverIsRunning) throw Error('You need to connect to the server before getting '); + + return { + tls: false, + port: this.port, + realtimeHost: 'localhost', + restHost: 'localhost', + }; + } + + public async start() { + return new Promise((resolve) => { + this.server.listen(this.port, () => { + this.serverIsRunning = true; + resolve(); + }); + }); + } + + public async replaceNextIncomingProtocolMessage(replacement: ProtocolMessage, filter?: (msg: ProtocolMessage) => boolean): Promise { + const deferredValue = CompletableDeferred(); + const unregister = this.registerRealtimeMiddleware(async (msg, messageType) => { + if (messageType === 'incoming' && (!filter || filter(msg))) { + deferredValue.complete(); + return { command: 'replace', replacement }; + } else { + return { command: 'keep' }; + } + }); + await deferredValue.get(); + unregister(); + } + + public injectProtocolMessage(connectionId: string, msg: ProtocolMessage) { + this.connectionIdToWs.get(connectionId)?.send(JSON.stringify(msg)); + } + + public dropConnection(connectionId: string) { + this.connectionIdToWs.get(connectionId)?.close(); + } + + private get targetRestUrl(): string { + const { restHost, tls, port, tlsPort } = this.realtimeClientOptions; + return `${tls ? 'https' : 'http'}://${restHost}:${tls ? tlsPort : port}`; + } + + private get targetRealtimeUrl(): string { + const { realtimeHost, tls, port, tlsPort } = this.realtimeClientOptions; + return `${tls ? 'wss' : 'ws'}://${realtimeHost}:${tls ? tlsPort : port}`; + } + + private async applyRestMiddlewares(req: Request, res: Response): Promise { + let resultResponse = res; + for (const middleware of this.restMiddlewares) { + resultResponse = await middleware(req, resultResponse); + } + return resultResponse; + } + + private async applyRealtimeMiddlewares( + msg: ProtocolMessage, + messageType: MessageType, + ): Promise { + let resultMessage = msg; + for (const middleware of this.realtimeMiddlewares) { + const command = await middleware(resultMessage, messageType); + switch (command.command) { + case 'drop': + return undefined; + case 'replace': + resultMessage = command.replacement!!; + break; + case 'keep': + break; + } + } + return resultMessage; + } + + private closeWs(ws: WebSocket) { + ws.close(); + this.connectionIdToWs.forEach((value, key) => { + if (ws === value) this.connectionIdToWs.delete(key) + }); + } +} + +export function createInterceptingProxy(realtimeClientOptions: Ably.ClientOptions): InterceptingProxy { + return new DefaultInterceptingProxy({ + tls: true, + tlsPort: 443, + port: 80, + realtimeHost: 'sandbox-realtime.ably.io', + restHost: 'sandbox-rest.ably.io', + ...realtimeClientOptions, + }); +} diff --git a/test/echo-server.ts b/test/echo-server.ts new file mode 100644 index 0000000..3d78648 --- /dev/null +++ b/test/echo-server.ts @@ -0,0 +1,32 @@ +import http from 'http'; +import { WebSocketServer } from 'ws'; + +export async function startEchoServer(port = 8080): Promise { + const echoServer = http.createServer((req, res) => { + const bodyChunks: any[] = []; + req.on('data', (chunk) => bodyChunks.push(chunk)); + req.on('end', () => { + const requestData = Buffer.concat(bodyChunks).toString(); + res.writeHead(200); + res.end(requestData); + }); + }); + + const wss = new WebSocketServer({ server: echoServer, path: '/' }); + + wss.on('connection', (ws) => { + ws.on('message', (message) => { + ws.send(message); + }); + }); + + await new Promise((resolve) => { + echoServer.listen(port, () => resolve()); + }); + + return echoServer; +} + +export async function stopEchoServer(server: http.Server) { + return new Promise((resolve) => server.close(() => resolve())); +} diff --git a/test/proxy.test.ts b/test/proxy.test.ts new file mode 100644 index 0000000..516f600 --- /dev/null +++ b/test/proxy.test.ts @@ -0,0 +1,183 @@ +import { beforeAll, describe, expect, it } from 'vitest'; +import WebSocket from 'ws'; +import { createInterceptingProxy, type InterceptingProxy } from '../src'; +import { delay, openConnection, runAndObserve, waitForConnectionClose, waitForNextReceivedMessage } from './utils'; +import { startEchoServer, stopEchoServer } from './echo-server'; + +describe('Local proxy tests', () => { + let proxy: InterceptingProxy; + + beforeAll(async () => { + const port = 8080; + const server = await startEchoServer(port); + proxy = createInterceptingProxy({ + realtimeHost: 'localhost', + restHost: 'localhost', + port, + tls: false, + }); + await proxy.start(); + return async () => stopEchoServer(server); + }); + + it('should modify incoming websocket messages from server', async () => { + const testWs = new WebSocket(`ws://localhost:${proxy.options.port}`); + + await openConnection(testWs); + + proxy.replaceNextIncomingProtocolMessage({ action: 3 }); + + await runAndObserve( + async () => testWs.send(JSON.stringify({ action: 1 })), + async () => { + const message = await proxy.observeNextIncomingProtocolMessage(); + const rawWebsocketMessage = await waitForNextReceivedMessage(testWs); + expect(message).toEqual({ action: 3 }); + expect(rawWebsocketMessage.toString()).toBe('{"action":3}'); + }, + ); + + testWs.close(); + }); + + it('should spy on outgoing websocket messages from server', async () => { + const testWs = new WebSocket(`ws://localhost:${proxy.options.port}`); + + await openConnection(testWs); + + await runAndObserve( + async () => testWs.send(JSON.stringify({ action: 0 })), + async () => { + const message = await proxy.observeNextOutgoingProtocolMessage(); + expect(message).toEqual({ action: 0 }); + }, + ); + + testWs.close(); + }); + + it('should spy on incoming websocket messages from server', async () => { + const testWs = new WebSocket(`ws://localhost:${proxy.options.port}`); + + await openConnection(testWs); + + await runAndObserve( + async () => testWs.send(JSON.stringify({ action: 0 })), + async () => { + const message = await proxy.observeNextIncomingProtocolMessage(); + expect(message).toEqual({ action: 0 }); + }, + ); + + testWs.close(); + }); + + it('should inject websocket messages', async () => { + const testWs = new WebSocket(`ws://localhost:${proxy.options.port}`); + + // we simulate connectionId that should be available in RealtimeClient + const connectionId = 'connectionId'; + + await openConnection(testWs); + testWs.send(JSON.stringify({ action: 4, connectionId })); + + await waitForNextReceivedMessage(testWs); + + await runAndObserve( + async () => proxy.injectProtocolMessage(connectionId, { action: 5, channel: 'test-channel' }), + async () => { + const message = await waitForNextReceivedMessage(testWs); + expect(message.toString()).toEqual(JSON.stringify({ action: 5, channel: 'test-channel' })); + }, + ); + + testWs.close(); + }); + + it('should drop websocket connection', async () => { + const testWs = new WebSocket(`ws://localhost:${proxy.options.port}`); + + // we simulate connectionId that should be available in RealtimeClient + const connectionId = 'connectionId'; + + await openConnection(testWs); + testWs.send(JSON.stringify({ action: 4, connectionId })); + await waitForNextReceivedMessage(testWs); + + await runAndObserve( + async () => proxy.dropConnection(connectionId), + async () => { + await waitForConnectionClose(testWs); + }, + ); + + testWs.close(); + }); + + it('should observe http requests', async () => { + await runAndObserve( + async () => { + await fetch(`http://localhost:${proxy.options.port}/foo/bar`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ foo: 'bar' }), + }); + }, + async () => { + const request = await proxy.observeNextRequest(); + expect(request.data).toEqual({ foo: 'bar' }); + }, + ); + }); + + it('should modify http responses', async () => { + const unregister = proxy.registerRestMiddleware(async (_, res) => { + return { + ...res, + data: { bar: 'foo' }, + }; + }); + const result = await fetch(`http://localhost:${proxy.options.port}/foo/bar`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ foo: 'bar' }), + }); + const response = await result.json(); + expect(response).toMatchObject({ bar: 'foo' }); + unregister(); + }); + + it('should be able to pause connection', async () => { + const testWs = new WebSocket(`ws://localhost:${proxy.options.port}`); + + // we simulate connectionId that should be available in RealtimeClient + await openConnection(testWs); + + const failOnMessageListener = () => { + expect.fail("We shouldn't receive any messages, while on pause"); + }; + + testWs.on('message', failOnMessageListener); + + const resume = proxy.pauseAllConnections(); + testWs.send(JSON.stringify({ action: 0 })); + await delay(1000) + testWs.off('message', failOnMessageListener); + + runAndObserve( + async () => { + resume(); + }, + async () => { + const rawWebsocketMessage = await waitForNextReceivedMessage(testWs); + expect(rawWebsocketMessage.toString()).toBe('{"action":0}'); + } + ) + + testWs.close(); + }); +}); diff --git a/test/tsconfig.json b/test/tsconfig.json index b1986ba..ec83f01 100644 --- a/test/tsconfig.json +++ b/test/tsconfig.json @@ -13,6 +13,5 @@ "allowJs": true, "allowSyntheticDefaultImports": true, "resolveJsonModule": true, - "outDir": "dist" } } diff --git a/test/utils.ts b/test/utils.ts new file mode 100644 index 0000000..ed30a09 --- /dev/null +++ b/test/utils.ts @@ -0,0 +1,33 @@ +import type WebSocket from 'ws'; + +export async function runAndObserve(runner: () => Promise, observer: () => Promise) { + const observerPromise = observer(); + await runner(); + await observerPromise; +} + +export async function openConnection(ws: WebSocket) { + return new Promise((resolve) => ws.once('open', () => resolve())); +} + +export async function waitForNextReceivedMessage(ws: WebSocket) { + return new Promise((resolve) => { + ws.once('message', (data) => { + resolve(data); + }); + }); +} + +export async function waitForConnectionClose(ws: WebSocket) { + return new Promise((resolve) => { + ws.once('close', () => { + resolve(); + }); + }); +} + +export function delay(timeoutMs: number) { + return new Promise((resolve) => { + setTimeout(() => resolve(), timeoutMs); + }); +} diff --git a/vitest.config.mts b/vitest.config.mts index ff24688..4ba7f1e 100644 --- a/vitest.config.mts +++ b/vitest.config.mts @@ -3,9 +3,7 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ root: '.', test: { - globalSetup: './test/helper/test-setup.ts', - setupFiles: ['./test/helper/expectations.ts'], - include: ['test/core/**/*.test.{ts,js}'], + include: ['test/**/*.test.{ts,js}'], environment: 'node', poolOptions: { threads: {