From ca8cdaefbb32d88dee21f7a9a26abdff55d6e241 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Sat, 29 Nov 2025 14:43:40 +0100 Subject: [PATCH 1/5] Use TypedPromise for typesafe errors --- src/room/PCTransportManager.ts | 15 ++++++++----- src/room/RTCEngine.ts | 19 +++++++++++------ src/room/Room.ts | 20 ++++++++++++------ src/room/participant/LocalParticipant.ts | 27 ++++++++++++++++-------- src/room/utils.ts | 5 +++-- src/utils/TypedPromise.ts | 18 ++++++++++++++++ 6 files changed, 75 insertions(+), 29 deletions(-) create mode 100644 src/utils/TypedPromise.ts diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index e4d7f6709f..b3ae7ae164 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -1,9 +1,10 @@ import { Mutex } from '@livekit/mutex'; import { SignalTarget } from '@livekit/protocol'; import log, { LoggerNames, getLogger } from '../logger'; +import TypedPromise from '../utils/TypedPromise'; import PCTransport, { PCEvents } from './PCTransport'; import { roomConnectOptionDefaults } from './defaults'; -import { ConnectionError } from './errors'; +import { ConnectionError, NegotiationError } from './errors'; import CriticalTimers from './timers'; import type { LoggerOptions } from './types'; import { sleep } from './utils'; @@ -220,14 +221,14 @@ export class PCTransportManager { } async negotiate(abortController: AbortController) { - return new Promise(async (resolve, reject) => { + return new TypedPromise(async (resolve, reject) => { const negotiationTimeout = setTimeout(() => { - reject('negotiation timed out'); + reject(new NegotiationError('negotiation timed out')); }, this.peerConnectionTimeout); const abortHandler = () => { clearTimeout(negotiationTimeout); - reject('negotiation aborted'); + reject(new NegotiationError('negotiation aborted')); }; abortController.signal.addEventListener('abort', abortHandler); @@ -243,7 +244,11 @@ export class PCTransportManager { await this.publisher.negotiate((e) => { clearTimeout(negotiationTimeout); - reject(e); + if (e instanceof Error) { + reject(e); + } else { + reject(new Error(String(e))); + } }); }); } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 2bff3e3850..99313a0c34 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -51,6 +51,7 @@ import type { BaseE2EEManager } from '../e2ee/E2eeManager'; import { asEncryptablePacket } from '../e2ee/utils'; import log, { LoggerNames, getLogger } from '../logger'; import type { InternalRoomOptions } from '../options'; +import TypedPromise from '../utils/TypedPromise'; import { DataPacketBuffer } from '../utils/dataPacketBuffer'; import { TTLMap } from '../utils/ttlmap'; import PCTransport, { PCEvents } from './PCTransport'; @@ -1378,12 +1379,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } }; - waitForBufferStatusLow(kind: DataPacket_Kind): Promise { - return new Promise(async (resolve, reject) => { + waitForBufferStatusLow(kind: DataPacket_Kind): TypedPromise { + return new TypedPromise(async (resolve, reject) => { if (this.isBufferStatusLow(kind)) { resolve(); } else { - const onClosing = () => reject('Engine closed'); + const onClosing = () => reject(new UnexpectedConnectionState('engine closed')); this.once(EngineEvent.Closing, onClosing); while (!this.dcBufferStatus.get(kind)) { await sleep(10); @@ -1480,7 +1481,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** @internal */ async negotiate(): Promise { // observe signal state - return new Promise(async (resolve, reject) => { + return new TypedPromise(async (resolve, reject) => { if (!this.pcManager) { reject(new NegotiationError('PC manager is closed')); return; @@ -1506,7 +1507,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; if (this.isClosed) { - reject('cannot negotiate on closed engine'); + reject(new NegotiationError('cannot negotiate on closed engine')); } this.on(EngineEvent.Closing, handleClosed); @@ -1527,12 +1528,16 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit try { await this.pcManager.negotiate(abortController); resolve(); - } catch (e: any) { + } catch (e: unknown) { if (e instanceof NegotiationError) { this.fullReconnectOnNext = true; } this.handleDisconnect('negotiation', ReconnectReason.RR_UNKNOWN); - reject(e); + if (e instanceof Error) { + reject(e); + } else { + reject(new Error(String(e))); + } } finally { this.off(EngineEvent.Closing, handleClosed); } diff --git a/src/room/Room.ts b/src/room/Room.ts index a7a7bd0530..9491c91217 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -42,6 +42,7 @@ import type { RoomConnectOptions, RoomOptions, } from '../options'; +import TypedPromise from '../utils/TypedPromise'; import { getBrowser } from '../utils/browserParser'; import { BackOffStrategy } from './BackOffStrategy'; import DeviceManager from './DeviceManager'; @@ -60,7 +61,12 @@ import { roomOptionDefaults, videoDefaults, } from './defaults'; -import { ConnectionError, ConnectionErrorReason, UnsupportedServer } from './errors'; +import { + ConnectionError, + ConnectionErrorReason, + UnexpectedConnectionState, + UnsupportedServer, +} from './errors'; import { EngineEvent, ParticipantEvent, RoomEvent, TrackEvent } from './events'; import LocalParticipant from './participant/LocalParticipant'; import type Participant from './participant/Participant'; @@ -414,14 +420,14 @@ class Room extends (EventEmitter as new () => TypedEmitter) * server assigned unique room id. * returns once a sid has been issued by the server. */ - async getSid(): Promise { + getSid(): TypedPromise { if (this.state === ConnectionState.Disconnected) { - return ''; + return TypedPromise.resolve(''); } if (this.roomInfo && this.roomInfo.sid !== '') { - return this.roomInfo.sid; + return TypedPromise.resolve(this.roomInfo.sid); } - return new Promise((resolve, reject) => { + return new TypedPromise((resolve, reject) => { const handleRoomUpdate = (roomInfo: RoomModel) => { if (roomInfo.sid !== '') { this.engine.off(EngineEvent.RoomUpdate, handleRoomUpdate); @@ -431,7 +437,9 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.engine.on(EngineEvent.RoomUpdate, handleRoomUpdate); this.once(RoomEvent.Disconnected, () => { this.engine.off(EngineEvent.RoomUpdate, handleRoomUpdate); - reject('Room disconnected before room server id was available'); + reject( + new UnexpectedConnectionState('Room disconnected before room server id was available'), + ); }); }); } diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index f1f2f6ee6c..082517e8df 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -24,6 +24,7 @@ import { } from '@livekit/protocol'; import { SignalConnectionState } from '../../api/SignalClient'; import type { InternalRoomOptions } from '../../options'; +import TypedPromise from '../../utils/TypedPromise'; import { PCTransportState } from '../PCTransportManager'; import type RTCEngine from '../RTCEngine'; import type OutgoingDataStreamManager from '../data-stream/outgoing/OutgoingDataStreamManager'; @@ -370,7 +371,7 @@ export default class LocalParticipant extends Participant { name?: string; attributes?: Record; }) { - return new Promise(async (resolve, reject) => { + return new TypedPromise(async (resolve, reject) => { try { let isRejected = false; const requestId = await this.engine.client.sendUpdateLocalMetadata( @@ -406,8 +407,12 @@ export default class LocalParticipant extends Participant { reject( new SignalRequestError('Request to update local metadata timed out', 'TimeoutError'), ); - } catch (e: any) { - if (e instanceof Error) reject(e); + } catch (e: unknown) { + if (e instanceof Error) { + reject(e); + } else { + reject(new Error(String(e))); + } } }); } @@ -1583,7 +1588,7 @@ export default class LocalParticipant extends Participant { if (this.republishPromise) { await this.republishPromise; } - this.republishPromise = new Promise(async (resolve, reject) => { + this.republishPromise = new TypedPromise(async (resolve, reject) => { try { const localPubs: LocalTrackPublication[] = []; this.trackPublications.forEach((pub) => { @@ -1619,8 +1624,12 @@ export default class LocalParticipant extends Participant { }), ); resolve(); - } catch (error: any) { - reject(error); + } catch (error: unknown) { + if (error instanceof Error) { + reject(error); + } else { + reject(new Error(String(error))); + } } finally { this.republishPromise = undefined; } @@ -1774,16 +1783,16 @@ export default class LocalParticipant extends Participant { * @returns A promise that resolves with the response payload or rejects with an error. * @throws Error on failure. Details in `message`. */ - async performRpc({ + performRpc({ destinationIdentity, method, payload, responseTimeout = 15000, - }: PerformRpcParams): Promise { + }: PerformRpcParams): TypedPromise { const maxRoundTripLatency = 7000; const minEffectiveTimeout = maxRoundTripLatency + 1000; - return new Promise(async (resolve, reject) => { + return new TypedPromise(async (resolve, reject) => { if (byteLength(payload) > MAX_PAYLOAD_BYTES) { reject(RpcError.builtIn('REQUEST_PAYLOAD_TOO_LARGE')); return; diff --git a/src/room/utils.ts b/src/room/utils.ts index f0d3c9a7d2..42e071ccbc 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -5,6 +5,7 @@ import { DisconnectReason, Transcription as TranscriptionModel, } from '@livekit/protocol'; +import TypedPromise from '../utils/TypedPromise'; import { getBrowser } from '../utils/browserParser'; import type { BrowserDetails } from '../utils/browserParser'; import { protocolVersion, version } from '../version'; @@ -39,8 +40,8 @@ export function unpackStreamId(packed: string): string[] { return [packed, '']; } -export async function sleep(duration: number): Promise { - return new Promise((resolve) => CriticalTimers.setTimeout(resolve, duration)); +export function sleep(duration: number): TypedPromise { + return new TypedPromise((resolve) => CriticalTimers.setTimeout(resolve, duration)); } /** @internal */ diff --git a/src/utils/TypedPromise.ts b/src/utils/TypedPromise.ts new file mode 100644 index 0000000000..667617eaea --- /dev/null +++ b/src/utils/TypedPromise.ts @@ -0,0 +1,18 @@ +export default class TypedPromise extends Promise { + // eslint-disable-next-line @typescript-eslint/no-useless-constructor + constructor( + executor: (resolve: (value: T | PromiseLike) => void, reject: (reason: E) => void) => void, + ) { + super(executor); + } + + catch( + onrejected?: ((reason: E) => TResult | PromiseLike) | null | undefined, + ): TypedPromise { + return super.catch(onrejected); + } + + static reject(reason: E) { + return super.reject(reason) as TypedPromise; + } +} From 0b0ba7a28c5489752a369a05ac85da9bbfab81d2 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Sat, 29 Nov 2025 14:46:46 +0100 Subject: [PATCH 2/5] Create fuzzy-elephants-fail.md --- .changeset/fuzzy-elephants-fail.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fuzzy-elephants-fail.md diff --git a/.changeset/fuzzy-elephants-fail.md b/.changeset/fuzzy-elephants-fail.md new file mode 100644 index 0000000000..b014e1d03c --- /dev/null +++ b/.changeset/fuzzy-elephants-fail.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Use TypedPromise for typesafe errors From b2569278d225343707b21430f7a2fdffaadd71f4 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Sat, 29 Nov 2025 15:14:16 +0100 Subject: [PATCH 3/5] more helpers --- src/utils/TypedPromise.ts | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/utils/TypedPromise.ts b/src/utils/TypedPromise.ts index 667617eaea..3a5c4a8240 100644 --- a/src/utils/TypedPromise.ts +++ b/src/utils/TypedPromise.ts @@ -1,3 +1,5 @@ +type InferErrors = T extends TypedPromise ? E : never; + export default class TypedPromise extends Promise { // eslint-disable-next-line @typescript-eslint/no-useless-constructor constructor( @@ -12,7 +14,26 @@ export default class TypedPromise extends Promise { return super.catch(onrejected); } - static reject(reason: E) { - return super.reject(reason) as TypedPromise; + static reject(reason: E): TypedPromise { + return super.reject(reason); + } + + static all( + values: T, + ): TypedPromise<{ -readonly [P in keyof T]: Awaited }, InferErrors> { + return super.all(values) as any; + } + + static race | any)[]>( + values: T, + ): TypedPromise< + T[number] extends TypedPromise + ? U + : T[number] extends PromiseLike + ? U + : Awaited, + InferErrors + > { + return super.race(values); } } From a35da4e88843fa0753fefc2a50b8b753bdaad0f6 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Sat, 29 Nov 2025 15:24:45 +0100 Subject: [PATCH 4/5] typed promise in stream --- src/api/WebSocketStream.ts | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/api/WebSocketStream.ts b/src/api/WebSocketStream.ts index d930c212b9..fad5d988fd 100644 --- a/src/api/WebSocketStream.ts +++ b/src/api/WebSocketStream.ts @@ -1,5 +1,7 @@ // https://github.com/CarterLi/websocketstream-polyfill +import { ConnectionError } from '../room/errors'; import { sleep } from '../room/utils'; +import TypedPromise from '../utils/TypedPromise'; export interface WebSocketConnection { readable: ReadableStream; @@ -18,6 +20,8 @@ export interface WebSocketStreamOptions { signal?: AbortSignal; } +type WebsocketError = ReturnType; + /** * [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) with [Streams API](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API) * @@ -26,9 +30,9 @@ export interface WebSocketStreamOptions { export class WebSocketStream { readonly url: string; - readonly opened: Promise>; + readonly opened: TypedPromise, WebsocketError>; - readonly closed: Promise; + readonly closed: TypedPromise; readonly close: (closeInfo?: WebSocketCloseInfo) => void; @@ -52,7 +56,12 @@ export class WebSocketStream ws.close(code, reason); - this.opened = new Promise((resolve, reject) => { + this.opened = new TypedPromise, WebsocketError>((resolve, reject) => { + const rejectHandler = () => { + reject( + ConnectionError.websocket('Encountered websocket error during connection establishment'), + ); + }; ws.onopen = () => { resolve({ readable: new ReadableStream({ @@ -74,12 +83,12 @@ export class WebSocketStream((resolve, reject) => { + this.closed = new TypedPromise((resolve, reject) => { const rejectHandler = async () => { const closePromise = new Promise((res) => { if (ws.readyState === WebSocket.CLOSED) return; @@ -95,7 +104,11 @@ export class WebSocketStream Date: Sat, 29 Nov 2025 15:29:10 +0100 Subject: [PATCH 5/5] more --- src/api/WebSocketStream.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/WebSocketStream.ts b/src/api/WebSocketStream.ts index fad5d988fd..636ea2c7ca 100644 --- a/src/api/WebSocketStream.ts +++ b/src/api/WebSocketStream.ts @@ -90,7 +90,7 @@ export class WebSocketStream((resolve, reject) => { const rejectHandler = async () => { - const closePromise = new Promise((res) => { + const closePromise = new TypedPromise((res) => { if (ws.readyState === WebSocket.CLOSED) return; else { ws.addEventListener( @@ -102,7 +102,7 @@ export class WebSocketStream