diff --git a/.changeset/cool-tips-smoke.md b/.changeset/cool-tips-smoke.md new file mode 100644 index 0000000000..8fce065223 --- /dev/null +++ b/.changeset/cool-tips-smoke.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Use result type for websocket stream diff --git a/eslint.config.mjs b/eslint.config.mjs index 246c84842a..5c061396e6 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -2,6 +2,7 @@ import js from '@eslint/js'; import { configs, plugins, rules } from 'eslint-config-airbnb-extended'; import { rules as prettierConfigRules } from 'eslint-config-prettier'; +import neverthrowMustUse from 'eslint-plugin-neverthrow-must-use'; import prettierPlugin from 'eslint-plugin-prettier'; const strictness = 'off'; @@ -31,6 +32,15 @@ const typescriptConfig = [ rules.typescript.typescriptEslintStrict, ]; +const neverthrowConfig = [ + { + name: 'neverthrow-must-use', + plugins: { + 'neverthrow-must-use': neverthrowMustUse, + }, + }, +]; + const prettierConfig = [ // Prettier Plugin { @@ -56,6 +66,7 @@ export default [ ...typescriptConfig, // Prettier Config ...prettierConfig, + ...neverthrowConfig, { languageOptions: { parserOptions: { @@ -158,6 +169,7 @@ export default [ 'one-var': strictness, 'no-multi-assign': strictness, 'new-cap': strictness, + 'require-yield': strictness, radix: strictness, eqeqeq: strictness, diff --git a/package.json b/package.json index 3fa231d8fb..b089fe9274 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "events": "^3.3.0", "jose": "^6.1.0", "loglevel": "^1.9.2", + "neverthrow": "^8.2.0", "sdp-transform": "^2.15.0", "ts-debounce": "^4.0.0", "tslib": "2.8.1", @@ -96,6 +97,7 @@ "eslint-config-prettier": "10.1.8", "eslint-plugin-compat": "^6.0.2", "eslint-plugin-import-x": "^4.16.1", + "eslint-plugin-neverthrow-must-use": "^0.1.2", "eslint-plugin-prettier": "^5.5.4", "gh-pages": "6.3.0", "happy-dom": "^17.2.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c4ec70fc34..228bec32fd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -26,6 +26,9 @@ importers: loglevel: specifier: ^1.9.2 version: 1.9.2 + neverthrow: + specifier: ^8.2.0 + version: 8.2.0 sdp-transform: specifier: ^2.15.0 version: 2.15.0 @@ -120,6 +123,9 @@ importers: eslint-plugin-import-x: specifier: ^4.16.1 version: 4.16.1(@typescript-eslint/utils@8.47.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint-import-resolver-node@0.3.9)(eslint@9.39.1(jiti@2.4.2)) + eslint-plugin-neverthrow-must-use: + specifier: ^0.1.2 + version: 0.1.2(@typescript-eslint/parser@7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint@9.39.1(jiti@2.4.2)) eslint-plugin-prettier: specifier: ^5.5.4 version: 5.5.4(@types/eslint@8.44.7)(eslint-config-prettier@10.1.8(eslint@9.39.1(jiti@2.4.2)))(eslint@9.39.1(jiti@2.4.2))(prettier@3.6.2) @@ -2270,6 +2276,13 @@ packages: peerDependencies: eslint: '>=8.23.0' + eslint-plugin-neverthrow-must-use@0.1.2: + resolution: {integrity: sha512-Wt/u1wjnH8rWtbc8zqTK5yOcB79zVlCCWXi6ChJNer5ACkqldNQ6/+RKVUErACbv0Oex9aqaKYoTd0OqLe4o3Q==} + engines: {node: '>=16'} + peerDependencies: + '@typescript-eslint/parser': ^8.0.0 + eslint: ^9.0.0 + eslint-plugin-prettier@5.5.4: resolution: {integrity: sha512-swNtI95SToIz05YINMA6Ox5R057IMAmWZ26GqPxusAp1TZzj+IdY9tXNWWD3vkF/wEqydCONcwjTFpxybBqZsg==} engines: {node: ^14.18.0 || >=16.0.0} @@ -2982,6 +2995,10 @@ packages: neo-async@2.6.2: resolution: {integrity: sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==} + neverthrow@8.2.0: + resolution: {integrity: sha512-kOCT/1MCPAxY5iUV3wytNFUMUolzuwd/VF/1KCx7kf6CutrOsTie+84zTGTpgQycjvfLdBBdvBvFLqFD2c0wkQ==} + engines: {node: '>=18'} + node-fetch@2.7.0: resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==} engines: {node: 4.x || >=6.0.0} @@ -3624,8 +3641,8 @@ packages: engines: {node: '>=14.17'} hasBin: true - typescript@6.0.0-dev.20251120: - resolution: {integrity: sha512-dkvZw2/09r7JIltGCeubJXLYE7+NapbKj68BtGtm47TiwjyKxTDTG2nWZu8Gpopzi0ub9bNVn0rEgh5CgOlE4w==} + typescript@6.0.0-dev.20251124: + resolution: {integrity: sha512-3dp4cPGjA35NMKhzreQI41rYQTiLFWUjIoWCBAp/r27Ccf6HLZwniD0f3KSgruR6hhzb+2m5yrEn0RP7IaK1Bg==} engines: {node: '>=14.17'} hasBin: true @@ -6023,7 +6040,7 @@ snapshots: dependencies: semver: 7.6.0 shelljs: 0.8.5 - typescript: 6.0.0-dev.20251120 + typescript: 6.0.0-dev.20251124 dunder-proto@1.0.1: dependencies: @@ -6331,6 +6348,11 @@ snapshots: - typescript optional: true + eslint-plugin-neverthrow-must-use@0.1.2(@typescript-eslint/parser@7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint@9.39.1(jiti@2.4.2)): + dependencies: + '@typescript-eslint/parser': 7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3) + eslint: 9.39.1(jiti@2.4.2) + eslint-plugin-prettier@5.5.4(@types/eslint@8.44.7)(eslint-config-prettier@10.1.8(eslint@9.39.1(jiti@2.4.2)))(eslint@9.39.1(jiti@2.4.2))(prettier@3.6.2): dependencies: eslint: 9.39.1(jiti@2.4.2) @@ -7088,6 +7110,10 @@ snapshots: neo-async@2.6.2: {} + neverthrow@8.2.0: + optionalDependencies: + '@rollup/rollup-linux-x64-gnu': 4.53.2 + node-fetch@2.7.0: dependencies: whatwg-url: 5.0.0 @@ -7789,7 +7815,7 @@ snapshots: typescript@5.8.3: {} - typescript@6.0.0-dev.20251120: {} + typescript@6.0.0-dev.20251124: {} uc.micro@2.1.0: {} diff --git a/src/api/SignalClient.test.ts b/src/api/SignalClient.test.ts index 991d935dfe..c2e4ad5d24 100644 --- a/src/api/SignalClient.test.ts +++ b/src/api/SignalClient.test.ts @@ -6,10 +6,11 @@ import { SignalRequest, SignalResponse, } from '@livekit/protocol'; +import { ResultAsync } from 'neverthrow'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { ConnectionError, ConnectionErrorReason } from '../room/errors'; import { SignalClient, SignalConnectionState } from './SignalClient'; -import type { WebSocketCloseInfo, WebSocketConnection } from './WebSocketStream'; +import type { WebSocketCloseInfo, WebSocketConnection, WebSocketError } from './WebSocketStream'; import { WebSocketStream } from './WebSocketStream'; // Mock the WebSocketStream @@ -58,16 +59,27 @@ function createMockConnection(readable: ReadableStream): WebSocketC interface MockWebSocketStreamOptions { connection?: WebSocketConnection; - opened?: Promise; - closed?: Promise; + opened?: ResultAsync, WebSocketError>; + closed?: ResultAsync; readyState?: number; } function mockWebSocketStream(options: MockWebSocketStreamOptions = {}) { const { connection, - opened = connection ? Promise.resolve(connection) : new Promise(() => {}), - closed = new Promise(() => {}), + // eslint-disable-next-line neverthrow-must-use/must-use-result + opened = connection + ? ResultAsync.fromPromise(Promise.resolve(connection), (error) => ({ + type: 'connection' as const, + error: error as Event, + })) + : // eslint-disable-next-line neverthrow-must-use/must-use-result + ResultAsync.fromPromise(new Promise(() => {}), (error) => ({ + type: 'connection' as const, + error: error as Event, + })), + // eslint-disable-next-line neverthrow-must-use/must-use-result + closed = ResultAsync.fromPromise(new Promise(() => {}), (error) => error as WebSocketError), readyState = 1, } = options; @@ -197,8 +209,8 @@ describe('SignalClient.connect', () => { return { url: 'wss://test.livekit.io', - opened: new Promise(() => {}), // Never resolves - closed: new Promise(() => {}), + opened: ResultAsync.fromPromise(new Promise(() => {}), (e) => e as WebSocketError), // Never resolves + closed: ResultAsync.fromPromise(new Promise(() => {}), (e) => e as WebSocketError), close: vi.fn(), readyState: 0, } as any; @@ -253,10 +265,20 @@ describe('SignalClient.connect', () => { }; vi.mocked(WebSocketStream).mockImplementation(() => { + // eslint-disable-next-line neverthrow-must-use/must-use-result + const opened = ResultAsync.fromPromise(Promise.resolve(mockConnection), (error) => ({ + type: 'connection' as const, + error: error as Event, + })); + // eslint-disable-next-line neverthrow-must-use/must-use-result + const closed = ResultAsync.fromPromise( + new Promise(() => {}), + (error) => error as WebSocketError, + ); return { url: 'wss://test.livekit.io', - opened: Promise.resolve(mockConnection), - closed: new Promise(() => {}), + opened, + closed, close: vi.fn(), readyState: 1, } as any; @@ -301,7 +323,10 @@ describe('SignalClient.connect', () => { describe('Failure Case - WebSocket Connection Errors', () => { it('should reject with NotAllowed error for 4xx HTTP status', async () => { mockWebSocketStream({ - opened: Promise.reject(new Error('Connection failed')), + opened: ResultAsync.fromPromise( + Promise.reject(new Error('Connection failed')), + (e) => e as WebSocketError, + ), readyState: 3, }); @@ -322,7 +347,10 @@ describe('SignalClient.connect', () => { it('should reject with ServerUnreachable when fetch fails', async () => { mockWebSocketStream({ - opened: Promise.reject(new Error('Connection failed')), + opened: ResultAsync.fromPromise( + Promise.reject(new Error('Connection failed')), + (e) => e as WebSocketError, + ), readyState: 3, }); @@ -337,24 +365,26 @@ describe('SignalClient.connect', () => { }); it('should handle ConnectionError from WebSocket rejection', async () => { - const customError = ConnectionError.internal('Custom error', { status: 500 }); + const customError = ConnectionError.websocket('Custom error'); mockWebSocketStream({ - opened: Promise.reject(customError), + opened: ResultAsync.fromPromise(Promise.reject(customError), (e) => e as WebSocketError), readyState: 3, }); // Mock fetch to return 500 (global.fetch as any).mockResolvedValueOnce({ - status: 500, - text: async () => 'Internal Server Error', + status: 200, + text: async () => 'ok', }); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject({ - reason: ConnectionErrorReason.InternalError, - }); + const error = await signalClient + .join('wss://test.livekit.io', 'test-token', defaultOptions) + .catch((e) => e); + + expect(error).toBeInstanceOf(ConnectionError); + expect(error.message).toBe('Custom error'); + expect(error.reason).toBe(ConnectionErrorReason.WebSocket); }); }); @@ -435,10 +465,13 @@ describe('SignalClient.connect', () => { closedResolve({ closeCode: 1006, reason: 'Connection lost' }); }); + // eslint-disable-next-line neverthrow-must-use/must-use-result + const closed = ResultAsync.fromPromise(closedPromise, (e) => e as WebSocketError); + return { url: 'wss://test.livekit.io', - opened: new Promise(() => {}), // Never resolves - closed: closedPromise, + opened: ResultAsync.fromPromise(new Promise(() => {}), (e) => e as WebSocketError), // Never resolves + closed: closed, close: vi.fn(), readyState: 2, // CLOSING } as any; @@ -448,7 +481,7 @@ describe('SignalClient.connect', () => { signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), ).rejects.toMatchObject({ message: 'Websocket got closed during a (re)connection attempt: Connection lost', - reason: ConnectionErrorReason.InternalError, + reason: ConnectionErrorReason.WebSocket, }); }); }); diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index 0623a1ca27..9cee74d20b 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -300,6 +300,9 @@ export class SignalClient { return new Promise(async (resolve, reject) => { try { let alreadyAborted = false; + if (abortSignal?.aborted) { + reject(ConnectionError.cancelled('join aborted before connection attempt')); + } const abortHandler = async (eventOrError: Event | Error) => { if (alreadyAborted) { return; @@ -352,18 +355,15 @@ export class SignalClient { if (this.ws) { await this.close(false); } + if (abortSignal?.aborted) { + // abortHandler() calls reject we simply return here + return; + } this.ws = new WebSocketStream(rtcUrl); try { - this.ws.closed - .then((closeInfo) => { - if (this.isEstablishingConnection) { - reject( - ConnectionError.internal( - `Websocket got closed during a (re)connection attempt: ${closeInfo.reason}`, - ), - ); - } + this.ws.closed.match( + (closeInfo) => { if (closeInfo.closeCode !== 1000) { this.log.warn(`websocket closed`, { ...this.logContext, @@ -372,22 +372,30 @@ export class SignalClient { wasClean: closeInfo.closeCode === 1000, state: this.state, }); - if (this.state === SignalConnectionState.CONNECTED) { - this.handleOnClose(closeInfo.reason ?? 'Unexpected WS error'); - } } - return; - }) - .catch((reason) => { if (this.isEstablishingConnection) { reject( - ConnectionError.internal( - `Websocket error during a (re)connection attempt: ${reason}`, + ConnectionError.websocket( + `Websocket got closed during a (re)connection attempt: ${closeInfo.reason}`, + ), + ); + } else if (this.state === SignalConnectionState.CONNECTED) { + this.handleOnClose(closeInfo.reason ?? 'Unexpected WS error'); + } + }, + (reason) => { + if (this.isEstablishingConnection) { + reject( + ConnectionError.websocket( + `Websocket error during a (re)connection attempt: ${reason.message}`, ), ); } - }); - const connection = await this.ws.opened.catch(async (reason: unknown) => { + }, + ); + const openResult = await this.ws.opened; + if (openResult.isErr()) { + const reason = openResult.error; if (this.state !== SignalConnectionState.CONNECTED) { this.state = SignalConnectionState.DISCONNECTED; clearTimeout(wsTimeout); @@ -399,11 +407,10 @@ export class SignalClient { this.handleWSError(reason); reject(reason); return; - }); - clearTimeout(wsTimeout); - if (!connection) { - return; } + clearTimeout(wsTimeout); + const connection = openResult.value; + const signalReader = connection.readable.getReader(); this.streamWriter = connection.writable.getWriter(); const firstMessage = await signalReader.read(); diff --git a/src/api/WebSocketStream.test.ts b/src/api/WebSocketStream.test.ts index 3445348042..32d1a5d5ad 100644 --- a/src/api/WebSocketStream.test.ts +++ b/src/api/WebSocketStream.test.ts @@ -1,5 +1,6 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ConnectionErrorReason } from '../room/errors'; import { WebSocketStream } from './WebSocketStream'; // Mock WebSocket @@ -122,6 +123,16 @@ vi.mock('../room/utils', () => ({ sleep: vi.fn((duration: number) => new Promise((resolve) => setTimeout(resolve, duration))), })); +// Helper function to unwrap Result from opened promise +async function getConnectionOrFail(wsStream: WebSocketStream) { + const result = await wsStream.opened; + expect(result.isOk()).toBe(true); + if (!result.isOk()) { + throw new Error('Failed to open connection'); + } + return result.value; +} + describe('WebSocketStream', () => { let mockWebSocket: MockWebSocket; let originalWebSocket: typeof WebSocket; @@ -174,7 +185,7 @@ describe('WebSocketStream', () => { new WebSocketStream('wss://test.example.com', { signal: abortController.signal, }); - }).toThrow('This operation was aborted'); + }).toThrow('Aborted before WS was initialized'); }); it('should close when abort signal is triggered', () => { @@ -201,21 +212,29 @@ describe('WebSocketStream', () => { const removeEventListenerSpy = vi.spyOn(mockWebSocket, 'removeEventListener'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; - - expect(connection.readable).toBeInstanceOf(ReadableStream); - expect(connection.writable).toBeInstanceOf(WritableStream); - expect(connection.protocol).toBe('test-protocol'); - expect(connection.extensions).toBe('test-extension'); + const result = await wsStream.opened; + + expect(result.isOk()).toBe(true); + if (result.isOk()) { + const connection = result.value; + expect(connection.readable).toBeInstanceOf(ReadableStream); + expect(connection.writable).toBeInstanceOf(WritableStream); + expect(connection.protocol).toBe('test-protocol'); + expect(connection.extensions).toBe('test-extension'); + } expect(removeEventListenerSpy).toHaveBeenCalledWith('error', expect.any(Function)); }); - it('should reject when WebSocket errors before opening', async () => { + it('should return error Result when WebSocket errors before opening', async () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerError(); - await expect(wsStream.opened).rejects.toThrow(); + const result = await wsStream.opened; + expect(result.isErr()).toBe(true); + if (result.isErr()) { + expect(result.error.reason).toBe(ConnectionErrorReason.WebSocket); + } }); }); @@ -227,10 +246,13 @@ describe('WebSocketStream', () => { mockWebSocket.triggerOpen(); mockWebSocket.triggerClose(1001, 'Going away'); - const closeInfo = await wsStream.closed; + const result = await wsStream.closed; - expect(closeInfo.closeCode).toBe(1001); - expect(closeInfo.reason).toBe('Going away'); + expect(result.isOk()).toBe(true); + if (result.isOk()) { + expect(result.value.closeCode).toBe(1001); + expect(result.value.reason).toBe('Going away'); + } expect(removeEventListenerSpy).toHaveBeenCalledWith('error', expect.any(Function)); }); @@ -241,13 +263,16 @@ describe('WebSocketStream', () => { mockWebSocket.triggerError(); mockWebSocket.triggerClose(1006, 'Connection failed'); - const closeInfo = await wsStream.closed; + const result = await wsStream.closed; - expect(closeInfo.closeCode).toBe(1006); - expect(closeInfo.reason).toBe('Connection failed'); + expect(result.isOk()).toBe(true); + if (result.isOk()) { + expect(result.value.closeCode).toBe(1006); + expect(result.value.reason).toBe('Connection failed'); + } }); - it('should reject when error occurs without timely close event', async () => { + it('should return error Result when error occurs without timely close event', async () => { const { sleep } = await import('../room/utils'); vi.mocked(sleep).mockResolvedValue(undefined); @@ -256,9 +281,14 @@ describe('WebSocketStream', () => { mockWebSocket.triggerOpen(); mockWebSocket.triggerError(); - await expect(wsStream.closed).rejects.toThrow( - 'Encountered unspecified websocket error without a timely close event', - ); + const result = await wsStream.closed; + expect(result.isErr()).toBe(true); + if (result.isErr()) { + expect(result.error.reason).toBe(ConnectionErrorReason.WebSocket); + expect(result.error.message).toBe( + 'Encountered unspecified websocket error without a timely close event', + ); + } }); }); @@ -267,8 +297,11 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const result = await wsStream.opened; + expect(result.isOk()).toBe(true); + if (!result.isOk()) return; + const connection = result.value; const reader = connection.readable.getReader(); const message1 = new ArrayBuffer(8); @@ -292,23 +325,22 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); mockWebSocket.triggerError(); - await Promise.all([ - expect(reader.read()).rejects.toBeDefined(), - expect(wsStream.closed).rejects.toBeDefined(), - ]); + const closedResult = await wsStream.closed; + await expect(reader.read()).rejects.toBeDefined(); + expect(closedResult.isErr()).toBe(true); }); it('should close WebSocket with custom close info when cancelled', async () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); const closeSpy = vi.spyOn(mockWebSocket, 'close'); @@ -322,7 +354,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader1 = connection.readable.getReader(); @@ -337,7 +369,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const writer = connection.writable.getWriter(); const sendSpy = vi.spyOn(mockWebSocket, 'send'); @@ -362,7 +394,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const writer = connection.writable.getWriter(); const closeSpy = vi.spyOn(mockWebSocket, 'close'); @@ -376,7 +408,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const writer = connection.writable.getWriter(); @@ -418,7 +450,7 @@ describe('WebSocketStream', () => { }); mockWebSocket.triggerOpen(); - await wsStream.opened; + await getConnectionOrFail(wsStream); const closeSpy = vi.spyOn(mockWebSocket, 'close'); @@ -433,7 +465,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); @@ -467,7 +499,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); const writer = connection.writable.getWriter(); @@ -493,7 +525,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const sourceData = [new ArrayBuffer(8), new ArrayBuffer(16), new ArrayBuffer(32)]; let dataIndex = 0; @@ -524,7 +556,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const msg1 = new ArrayBuffer(8); const msg2 = new ArrayBuffer(16); @@ -552,7 +584,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); @@ -562,17 +594,16 @@ describe('WebSocketStream', () => { // Trigger error while read is pending mockWebSocket.triggerError(); - await Promise.all([ - expect(readPromise).rejects.toBeDefined(), - expect(wsStream.closed).rejects.toBeDefined(), - ]); + const closedResult = await wsStream.closed; + await expect(readPromise).rejects.toBeDefined(); + expect(closedResult.isErr()).toBe(true); }); it('should support zero-length and empty messages', async () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); const writer = connection.writable.getWriter(); @@ -599,7 +630,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); diff --git a/src/api/WebSocketStream.ts b/src/api/WebSocketStream.ts index d930c212b9..6e25c223f7 100644 --- a/src/api/WebSocketStream.ts +++ b/src/api/WebSocketStream.ts @@ -1,4 +1,6 @@ // https://github.com/CarterLi/websocketstream-polyfill +import { ResultAsync } from 'neverthrow'; +import { ConnectionError } from '../room/errors'; import { sleep } from '../room/utils'; export interface WebSocketConnection { @@ -18,6 +20,8 @@ export interface WebSocketStreamOptions { signal?: AbortSignal; } +export type WebSocketError = ReturnType; + /** * [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) with [Streams API](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API) * @@ -26,11 +30,11 @@ export interface WebSocketStreamOptions { export class WebSocketStream { readonly url: string; - readonly opened: Promise>; + readonly opened: ResultAsync, WebSocketError>; - readonly closed: Promise; + readonly closed: ResultAsync; - readonly close: (closeInfo?: WebSocketCloseInfo) => void; + readonly close!: (closeInfo?: WebSocketCloseInfo) => void; get readyState(): number { return this.ws.readyState; @@ -40,77 +44,120 @@ export class WebSocketStream ws.close(code, reason); - this.opened = new Promise((resolve, reject) => { - ws.onopen = () => { - resolve({ - readable: new ReadableStream({ - start(controller) { - ws.onmessage = ({ data }) => controller.enqueue(data); - ws.onerror = (e) => controller.error(e); - }, - cancel: closeWithInfo, - }), - writable: new WritableStream({ - write(chunk) { - ws.send(chunk); - }, - abort() { - ws.close(); - }, - close: closeWithInfo, - }), - protocol: ws.protocol, - extensions: ws.extensions, - }); - ws.removeEventListener('error', reject); - }; - ws.addEventListener('error', reject); - }); - - this.closed = new Promise((resolve, reject) => { - const rejectHandler = async () => { - const closePromise = new Promise((res) => { - if (ws.readyState === WebSocket.CLOSED) return; - else { - ws.addEventListener( - 'close', - (closeEv: CloseEvent) => { - res(closeEv); + // eslint-disable-next-line neverthrow-must-use/must-use-result + this.opened = ResultAsync.fromPromise, WebSocketError>( + new Promise((resolve, r) => { + const reject = (err: WebSocketError) => r(err); + const errorHandler = (e: Event) => { + console.error(e); + reject( + ConnectionError.websocket('Encountered websocket error while establishing connection'), + ); + ws.removeEventListener('open', openHandler); + }; + + const onCloseDuringOpen = (ev: CloseEvent) => { + reject( + ConnectionError.websocket( + `WS closed during connection establishment: ${ev.reason}`, + ev.code, + ev.reason, + ), + ); + }; + + const openHandler = () => { + resolve({ + readable: new ReadableStream({ + start(controller) { + ws.onmessage = ({ data }) => controller.enqueue(data); + ws.onerror = (e) => controller.error(e); }, - { once: true }, + cancel: closeWithInfo, + }), + writable: new WritableStream({ + write(chunk) { + ws.send(chunk); + }, + abort() { + ws.close(); + }, + close: closeWithInfo, + }), + protocol: ws.protocol, + extensions: ws.extensions, + }); + ws.removeEventListener('error', errorHandler); + ws.removeEventListener('close', onCloseDuringOpen); + }; + + console.log('websocket setup registering event listeners'); + + ws.addEventListener('open', openHandler, { once: true }); + ws.addEventListener('error', errorHandler, { once: true }); + ws.addEventListener('close', onCloseDuringOpen, { once: true }); + }), + (error) => error as WebSocketError, + ); + + // eslint-disable-next-line neverthrow-must-use/must-use-result + this.closed = ResultAsync.fromPromise( + new Promise((resolve, r) => { + const reject = (err: WebSocketError) => r(err); + const errorHandler = async () => { + const closePromise = new Promise((res) => { + if (ws.readyState === WebSocket.CLOSED) return; + else { + ws.addEventListener( + 'close', + (closeEv: CloseEvent) => { + res(closeEv); + }, + { once: true }, + ); + } + }); + const reason = await Promise.race([sleep(250), closePromise]); + if (!reason) { + reject( + ConnectionError.websocket( + 'Encountered unspecified websocket error without a timely close event', + ), ); + } else { + // if we can infer the close reason from the close event then resolve with ok, we don't need to throw + resolve({ closeCode: reason.code, reason: reason.reason }); } - }); - const reason = await Promise.race([sleep(250), closePromise]); - if (!reason) { - reject(new Error('Encountered unspecified websocket error without a timely close event')); - } else { - // if we can infer the close reason from the close event then resolve the promise, we don't need to throw - resolve(reason); + }; + + if (ws.readyState === WebSocket.CLOSED) { + reject(ConnectionError.websocket('Websocket already closed at initialization time')); + return; } - }; - ws.onclose = ({ code, reason }) => { - resolve({ closeCode: code, reason }); - ws.removeEventListener('error', rejectHandler); - }; - ws.addEventListener('error', rejectHandler); - }); + ws.onclose = ({ code, reason }) => { + resolve({ closeCode: code, reason }); + ws.removeEventListener('error', errorHandler); + }; + + ws.addEventListener('error', errorHandler); + }), + (error) => error as WebSocketError, + ); if (options.signal) { - options.signal.onabort = () => ws.close(); + options.signal.onabort = () => ws.close(undefined, 'AbortSignal triggered'); } this.close = closeWithInfo;