Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fuzzy-elephants-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

Use TypedPromise for typesafe errors
31 changes: 22 additions & 9 deletions src/api/WebSocketStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// https://github.com/CarterLi/websocketstream-polyfill
import { ConnectionError } from '../room/errors';
import { sleep } from '../room/utils';
import TypedPromise from '../utils/TypedPromise';

export interface WebSocketConnection<T extends ArrayBuffer | string = ArrayBuffer | string> {
readable: ReadableStream<T>;
Expand All @@ -18,6 +20,8 @@ export interface WebSocketStreamOptions {
signal?: AbortSignal;
}

type WebsocketError = ReturnType<typeof ConnectionError.websocket>;

/**
* [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) with [Streams API](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API)
*
Expand All @@ -26,9 +30,9 @@ export interface WebSocketStreamOptions {
export class WebSocketStream<T extends ArrayBuffer | string = ArrayBuffer | string> {
readonly url: string;

readonly opened: Promise<WebSocketConnection<T>>;
readonly opened: TypedPromise<WebSocketConnection<T>, WebsocketError>;

readonly closed: Promise<WebSocketCloseInfo>;
readonly closed: TypedPromise<WebSocketCloseInfo, WebsocketError>;

readonly close: (closeInfo?: WebSocketCloseInfo) => void;

Expand All @@ -52,7 +56,12 @@ export class WebSocketStream<T extends ArrayBuffer | string = ArrayBuffer | stri
const closeWithInfo = ({ closeCode: code, reason }: WebSocketCloseInfo = {}) =>
ws.close(code, reason);

this.opened = new Promise((resolve, reject) => {
this.opened = new TypedPromise<WebSocketConnection<T>, WebsocketError>((resolve, reject) => {
const rejectHandler = () => {
reject(
ConnectionError.websocket('Encountered websocket error during connection establishment'),
);
};
ws.onopen = () => {
resolve({
readable: new ReadableStream<T>({
Expand All @@ -74,14 +83,14 @@ export class WebSocketStream<T extends ArrayBuffer | string = ArrayBuffer | stri
protocol: ws.protocol,
extensions: ws.extensions,
});
ws.removeEventListener('error', reject);
ws.removeEventListener('error', rejectHandler);
};
ws.addEventListener('error', reject);
ws.addEventListener('error', rejectHandler);
});

this.closed = new Promise<WebSocketCloseInfo>((resolve, reject) => {
this.closed = new TypedPromise<WebSocketCloseInfo, WebsocketError>((resolve, reject) => {
const rejectHandler = async () => {
const closePromise = new Promise<CloseEvent>((res) => {
const closePromise = new TypedPromise<CloseEvent, never>((res) => {
if (ws.readyState === WebSocket.CLOSED) return;
else {
ws.addEventListener(
Expand All @@ -93,9 +102,13 @@ export class WebSocketStream<T extends ArrayBuffer | string = ArrayBuffer | stri
);
}
});
const reason = await Promise.race([sleep(250), closePromise]);
const reason = await TypedPromise.race([sleep(250), closePromise]);
if (!reason) {
reject(new Error('Encountered unspecified websocket error without a timely close event'));
reject(
ConnectionError.websocket(
'Encountered unspecified websocket error without a timely close event',
),
);
} else {
// if we can infer the close reason from the close event then resolve the promise, we don't need to throw
resolve(reason);
Expand Down
15 changes: 10 additions & 5 deletions src/room/PCTransportManager.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Mutex } from '@livekit/mutex';
import { SignalTarget } from '@livekit/protocol';
import log, { LoggerNames, getLogger } from '../logger';
import TypedPromise from '../utils/TypedPromise';
import PCTransport, { PCEvents } from './PCTransport';
import { roomConnectOptionDefaults } from './defaults';
import { ConnectionError } from './errors';
import { ConnectionError, NegotiationError } from './errors';
import CriticalTimers from './timers';
import type { LoggerOptions } from './types';
import { sleep } from './utils';
Expand Down Expand Up @@ -220,14 +221,14 @@ export class PCTransportManager {
}

async negotiate(abortController: AbortController) {
return new Promise<void>(async (resolve, reject) => {
return new TypedPromise<void, NegotiationError | Error>(async (resolve, reject) => {
const negotiationTimeout = setTimeout(() => {
reject('negotiation timed out');
reject(new NegotiationError('negotiation timed out'));
}, this.peerConnectionTimeout);

const abortHandler = () => {
clearTimeout(negotiationTimeout);
reject('negotiation aborted');
reject(new NegotiationError('negotiation aborted'));
};

abortController.signal.addEventListener('abort', abortHandler);
Expand All @@ -243,7 +244,11 @@ export class PCTransportManager {

await this.publisher.negotiate((e) => {
clearTimeout(negotiationTimeout);
reject(e);
if (e instanceof Error) {
reject(e);
} else {
reject(new Error(String(e)));
}
});
});
}
Expand Down
19 changes: 12 additions & 7 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import type { BaseE2EEManager } from '../e2ee/E2eeManager';
import { asEncryptablePacket } from '../e2ee/utils';
import log, { LoggerNames, getLogger } from '../logger';
import type { InternalRoomOptions } from '../options';
import TypedPromise from '../utils/TypedPromise';
import { DataPacketBuffer } from '../utils/dataPacketBuffer';
import { TTLMap } from '../utils/ttlmap';
import PCTransport, { PCEvents } from './PCTransport';
Expand Down Expand Up @@ -1378,12 +1379,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
};

waitForBufferStatusLow(kind: DataPacket_Kind): Promise<void> {
return new Promise(async (resolve, reject) => {
waitForBufferStatusLow(kind: DataPacket_Kind): TypedPromise<void, UnexpectedConnectionState> {
return new TypedPromise(async (resolve, reject) => {
if (this.isBufferStatusLow(kind)) {
resolve();
} else {
const onClosing = () => reject('Engine closed');
const onClosing = () => reject(new UnexpectedConnectionState('engine closed'));
this.once(EngineEvent.Closing, onClosing);
while (!this.dcBufferStatus.get(kind)) {
await sleep(10);
Expand Down Expand Up @@ -1480,7 +1481,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
/** @internal */
async negotiate(): Promise<void> {
// observe signal state
return new Promise<void>(async (resolve, reject) => {
return new TypedPromise<void, NegotiationError | Error>(async (resolve, reject) => {
if (!this.pcManager) {
reject(new NegotiationError('PC manager is closed'));
return;
Expand All @@ -1506,7 +1507,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
};

if (this.isClosed) {
reject('cannot negotiate on closed engine');
reject(new NegotiationError('cannot negotiate on closed engine'));
}
this.on(EngineEvent.Closing, handleClosed);

Expand All @@ -1527,12 +1528,16 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
try {
await this.pcManager.negotiate(abortController);
resolve();
} catch (e: any) {
} catch (e: unknown) {
if (e instanceof NegotiationError) {
this.fullReconnectOnNext = true;
}
this.handleDisconnect('negotiation', ReconnectReason.RR_UNKNOWN);
reject(e);
if (e instanceof Error) {
reject(e);
} else {
reject(new Error(String(e)));
}
} finally {
this.off(EngineEvent.Closing, handleClosed);
}
Expand Down
20 changes: 14 additions & 6 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import type {
RoomConnectOptions,
RoomOptions,
} from '../options';
import TypedPromise from '../utils/TypedPromise';
import { getBrowser } from '../utils/browserParser';
import { BackOffStrategy } from './BackOffStrategy';
import DeviceManager from './DeviceManager';
Expand All @@ -60,7 +61,12 @@ import {
roomOptionDefaults,
videoDefaults,
} from './defaults';
import { ConnectionError, ConnectionErrorReason, UnsupportedServer } from './errors';
import {
ConnectionError,
ConnectionErrorReason,
UnexpectedConnectionState,
UnsupportedServer,
} from './errors';
import { EngineEvent, ParticipantEvent, RoomEvent, TrackEvent } from './events';
import LocalParticipant from './participant/LocalParticipant';
import type Participant from './participant/Participant';
Expand Down Expand Up @@ -414,14 +420,14 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
* server assigned unique room id.
* returns once a sid has been issued by the server.
*/
async getSid(): Promise<string> {
getSid(): TypedPromise<string, UnexpectedConnectionState> {
if (this.state === ConnectionState.Disconnected) {
return '';
return TypedPromise.resolve('');
}
if (this.roomInfo && this.roomInfo.sid !== '') {
return this.roomInfo.sid;
return TypedPromise.resolve(this.roomInfo.sid);
}
return new Promise((resolve, reject) => {
return new TypedPromise<string, UnexpectedConnectionState>((resolve, reject) => {
const handleRoomUpdate = (roomInfo: RoomModel) => {
if (roomInfo.sid !== '') {
this.engine.off(EngineEvent.RoomUpdate, handleRoomUpdate);
Expand All @@ -431,7 +437,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.engine.on(EngineEvent.RoomUpdate, handleRoomUpdate);
this.once(RoomEvent.Disconnected, () => {
this.engine.off(EngineEvent.RoomUpdate, handleRoomUpdate);
reject('Room disconnected before room server id was available');
reject(
new UnexpectedConnectionState('Room disconnected before room server id was available'),
);
});
});
}
Expand Down
27 changes: 18 additions & 9 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
} from '@livekit/protocol';
import { SignalConnectionState } from '../../api/SignalClient';
import type { InternalRoomOptions } from '../../options';
import TypedPromise from '../../utils/TypedPromise';
import { PCTransportState } from '../PCTransportManager';
import type RTCEngine from '../RTCEngine';
import type OutgoingDataStreamManager from '../data-stream/outgoing/OutgoingDataStreamManager';
Expand Down Expand Up @@ -370,7 +371,7 @@ export default class LocalParticipant extends Participant {
name?: string;
attributes?: Record<string, string>;
}) {
return new Promise<void>(async (resolve, reject) => {
return new TypedPromise<void, Error>(async (resolve, reject) => {
try {
let isRejected = false;
const requestId = await this.engine.client.sendUpdateLocalMetadata(
Expand Down Expand Up @@ -406,8 +407,12 @@ export default class LocalParticipant extends Participant {
reject(
new SignalRequestError('Request to update local metadata timed out', 'TimeoutError'),
);
} catch (e: any) {
if (e instanceof Error) reject(e);
} catch (e: unknown) {
if (e instanceof Error) {
reject(e);
} else {
reject(new Error(String(e)));
}
}
});
}
Expand Down Expand Up @@ -1583,7 +1588,7 @@ export default class LocalParticipant extends Participant {
if (this.republishPromise) {
await this.republishPromise;
}
this.republishPromise = new Promise(async (resolve, reject) => {
this.republishPromise = new TypedPromise<void, Error>(async (resolve, reject) => {
try {
const localPubs: LocalTrackPublication[] = [];
this.trackPublications.forEach((pub) => {
Expand Down Expand Up @@ -1619,8 +1624,12 @@ export default class LocalParticipant extends Participant {
}),
);
resolve();
} catch (error: any) {
reject(error);
} catch (error: unknown) {
if (error instanceof Error) {
reject(error);
} else {
reject(new Error(String(error)));
}
} finally {
this.republishPromise = undefined;
}
Expand Down Expand Up @@ -1774,16 +1783,16 @@ export default class LocalParticipant extends Participant {
* @returns A promise that resolves with the response payload or rejects with an error.
* @throws Error on failure. Details in `message`.
*/
async performRpc({
performRpc({
destinationIdentity,
method,
payload,
responseTimeout = 15000,
}: PerformRpcParams): Promise<string> {
}: PerformRpcParams): TypedPromise<string, RpcError> {
const maxRoundTripLatency = 7000;
const minEffectiveTimeout = maxRoundTripLatency + 1000;

return new Promise(async (resolve, reject) => {
return new TypedPromise<string, RpcError>(async (resolve, reject) => {
if (byteLength(payload) > MAX_PAYLOAD_BYTES) {
reject(RpcError.builtIn('REQUEST_PAYLOAD_TOO_LARGE'));
return;
Expand Down
5 changes: 3 additions & 2 deletions src/room/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
DisconnectReason,
Transcription as TranscriptionModel,
} from '@livekit/protocol';
import TypedPromise from '../utils/TypedPromise';
import { getBrowser } from '../utils/browserParser';
import type { BrowserDetails } from '../utils/browserParser';
import { protocolVersion, version } from '../version';
Expand Down Expand Up @@ -39,8 +40,8 @@ export function unpackStreamId(packed: string): string[] {
return [packed, ''];
}

export async function sleep(duration: number): Promise<void> {
return new Promise((resolve) => CriticalTimers.setTimeout(resolve, duration));
export function sleep(duration: number): TypedPromise<void, never> {
return new TypedPromise<void, never>((resolve) => CriticalTimers.setTimeout(resolve, duration));
}

/** @internal */
Expand Down
39 changes: 39 additions & 0 deletions src/utils/TypedPromise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
type InferErrors<T> = T extends TypedPromise<any, infer E> ? E : never;

export default class TypedPromise<T, E extends Error> extends Promise<T> {
// eslint-disable-next-line @typescript-eslint/no-useless-constructor
constructor(
executor: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason: E) => void) => void,
) {
super(executor);
}

catch<TResult = never>(
onrejected?: ((reason: E) => TResult | PromiseLike<TResult>) | null | undefined,
): TypedPromise<T | TResult, E> {
return super.catch(onrejected);
}

static reject<E extends Error>(reason: E): TypedPromise<never, E> {
return super.reject(reason);
}

static all<T extends readonly unknown[] | []>(
values: T,
): TypedPromise<{ -readonly [P in keyof T]: Awaited<T[P]> }, InferErrors<T[number]>> {
return super.all(values) as any;
}

static race<T extends readonly (TypedPromise<any, any> | any)[]>(
values: T,
): TypedPromise<
T[number] extends TypedPromise<infer U, any>
? U
: T[number] extends PromiseLike<infer U>
? U
: Awaited<T[number]>,
InferErrors<T[number]>
> {
return super.race(values);
}
}
Loading