diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 477c68ad51..bcc96ac413 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4,6 +4,9 @@ settings: autoInstallPeers: true excludeLinksFromLockfile: false +overrides: + '@livekit/protocol': link:../protocol/packages/javascript + importers: .: @@ -12,8 +15,8 @@ importers: specifier: 1.1.1 version: 1.1.1 '@livekit/protocol': - specifier: 1.39.3 - version: 1.39.3 + specifier: link:../protocol/packages/javascript + version: link:../protocol/packages/javascript '@types/dom-mediacapture-record': specifier: ^1 version: 1.0.22 @@ -1035,9 +1038,6 @@ packages: '@livekit/mutex@1.1.1': resolution: {integrity: sha512-EsshAucklmpuUAfkABPxJNhzj9v2sG7JuzFDL4ML1oJQSV14sqrpTYnsaOudMAw9yOaW53NU3QQTlUQoRs4czw==} - '@livekit/protocol@1.39.3': - resolution: {integrity: sha512-hfOnbwPCeZBEvMRdRhU2sr46mjGXavQcrb3BFRfG+Gm0Z7WUSeFdy5WLstXJzEepz17Iwp/lkGwJ4ZgOOYfPuA==} - '@manypkg/find-root@1.1.0': resolution: {integrity: sha512-mki5uBvhHzO8kYYix/WRy2WX8S3B5wdVSc9D6KcU5lQNglP2yt58/VfLuAK49glRXChosY8ap2oJ1qgma3GUVA==} @@ -3439,8 +3439,8 @@ packages: engines: {node: '>=14.17'} hasBin: true - typescript@5.9.0-dev.20250716: - resolution: {integrity: sha512-wBOPAX99Y5n6c4JF9ZIPy4xrnUDfFmrzOaFoVWGeDAu8cfV3773xfvNMColH3EnaWf8JytB3rgbcLYDE2nhCQw==} + typescript@5.9.0-dev.20250717: + resolution: {integrity: sha512-txdoKMuQg9HgT4dRcGBUrpIXIBMabbBOjVWrS915z/yskIIq7FwGgriVoIlVIVNZP04INeuJw3NEFDnWb1Vzbw==} engines: {node: '>=14.17'} hasBin: true @@ -4794,10 +4794,6 @@ snapshots: '@livekit/mutex@1.1.1': {} - '@livekit/protocol@1.39.3': - dependencies: - '@bufbuild/protobuf': 1.10.1 - '@manypkg/find-root@1.1.0': dependencies: '@babel/runtime': 7.23.2 @@ -5657,7 +5653,7 @@ snapshots: dependencies: semver: 7.6.0 shelljs: 0.8.5 - typescript: 5.9.0-dev.20250716 + typescript: 5.9.0-dev.20250717 dunder-proto@1.0.1: dependencies: @@ -7496,7 +7492,7 @@ snapshots: typescript@5.8.3: {} - typescript@5.9.0-dev.20250716: {} + typescript@5.9.0-dev.20250717: {} uc.micro@2.1.0: {} diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml new file mode 100644 index 0000000000..dcd8ec42c8 --- /dev/null +++ b/pnpm-workspace.yaml @@ -0,0 +1,2 @@ +overrides: + '@livekit/protocol': link:../protocol/packages/javascript diff --git a/src/api/SignalAPI.test.ts b/src/api/SignalAPI.test.ts new file mode 100644 index 0000000000..d7f6e190c4 --- /dev/null +++ b/src/api/SignalAPI.test.ts @@ -0,0 +1,141 @@ +import { JoinResponse, ReconnectResponse } from '@livekit/protocol'; +import { describe, expect, it, vi } from 'vitest'; +import { sleep } from '../room/utils'; +import { SignalAPI } from './SignalAPI'; +import type { ITransport } from './SignalAPI'; + +// A helper to create a minimal dummy transport whose methods are jest/vi spies +function createDummyTransport(overrides: Partial = {}): ITransport { + // placeholders that will be overridden when `onMessage` / `onError` are registered + let messageHandler: ((data: Uint8Array) => void) | undefined; + let errorHandler: ((error: Error) => void) | undefined; + + const dummyTransport: ITransport = { + connect: vi.fn(async (...args: unknown[]) => { + void args; // silence unused parameter lint errors + return {} as unknown as JoinResponse; + }), + send: vi.fn(async () => {}), + close: vi.fn(async () => {}), + reconnect: vi.fn(async () => ({}) as unknown as ReconnectResponse), + onMessage: (cb) => { + messageHandler = cb; + }, + onError: (cb) => { + errorHandler = cb; + }, + ...overrides, + } as ITransport; + + // Expose ways to trigger the callbacks inside tests + // @ts-expect-error – we attach these for test-only usage + dummyTransport.__triggerMessage = (data: Uint8Array) => messageHandler?.(data); + // @ts-expect-error – we attach these for test-only usage + dummyTransport.__triggerError = (err: Error) => errorHandler?.(err); + + return dummyTransport; +} + +describe('SignalAPI', () => { + it('calls transport.connect when join is invoked', async () => { + const joinResponse = { joined: true } as unknown as JoinResponse; + + const transport = createDummyTransport({ + connect: vi.fn(async () => joinResponse), + }); + + const api = new SignalAPI(transport); + void api; + + const url = 'wss://example.com'; + const token = 'fake-token'; + + const result = await api.join(url, token); + + expect(transport.connect).toHaveBeenCalledWith(url, token); + expect(result).toBe(joinResponse); + }); + + it('forwards reconnect to transport.reconnect', async () => { + const reconnectResponse = { reconnected: true } as unknown as ReconnectResponse; + + const transport = createDummyTransport({ + reconnect: vi.fn(async () => reconnectResponse), + }); + + const api = new SignalAPI(transport); + void api; + + const result = await api.reconnect(); + + expect(transport.reconnect).toHaveBeenCalled(); + expect(result).toBe(reconnectResponse); + }); + + it('handles onMessage events from the transport', () => { + const transport = createDummyTransport(); + const api = new SignalAPI(transport); + void api; + + const consoleSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); + + // @ts-expect-error – trigger helper added in createDummyTransport + transport.__triggerMessage(new Uint8Array([1, 2, 3])); + + expect(consoleSpy).toHaveBeenCalledWith('onMessage', new Uint8Array([1, 2, 3])); + + consoleSpy.mockRestore(); + }); + + it('handles onError events from the transport', () => { + const transport = createDummyTransport(); + const api = new SignalAPI(transport); + void api; + + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + const error = new Error('dummy'); + // @ts-expect-error – trigger helper added in createDummyTransport + transport.__triggerError(error); + + expect(consoleErrorSpy).toHaveBeenCalledWith('onError', error); + + consoleErrorSpy.mockRestore(); + }); + + it('ensures parallel join calls are executed sequentially', async () => { + const resolvers: Array<() => void> = []; + + const connect = vi.fn((url: string, token: string) => { + void url; + void token; + return new Promise((resolve) => { + resolvers.push(() => resolve({} as unknown as JoinResponse)); + }); + }); + + const transport = createDummyTransport({ connect }); + const api = new SignalAPI(transport); + void api; + + // Trigger two join calls without awaiting the first + const joinPromise1 = api.join('wss://example.com', 'token-1'); + const joinPromise2 = api.join('wss://example.com', 'token-2'); + + // Only the first connect should have been invoked at this point + await sleep(5); + expect(connect).toHaveBeenCalledTimes(1); + + // Resolve the first join + resolvers[0](); + await joinPromise1; + + // Now the second connect should have been called + await sleep(5); + expect(connect).toHaveBeenCalledTimes(2); + + // Resolve the second join + resolvers[1](); + await joinPromise2; + }); +}); diff --git a/src/api/SignalAPI.ts b/src/api/SignalAPI.ts new file mode 100644 index 0000000000..52a34b2799 --- /dev/null +++ b/src/api/SignalAPI.ts @@ -0,0 +1,148 @@ +import { SignalRequest, SignalResponse, JoinResponse } from '@livekit/protocol'; +import type { ITransport } from './SignalTransport'; +import { Future, getClientInfo } from '../room/utils'; +import { atomic } from '../decorators'; + + +export class SignalAPI { + + private writer?: WritableStreamDefaultWriter; + + private promiseMap = new Map>(); + + private offerId = 0; + + private transport: ITransport; + + private sequenceNumber = 0; + + private latestRemoteSequenceNumber = 0; + + constructor(transport: ITransport) { + this.transport = transport; + } + + @atomic + async join(url: string, token: string, connectOpts: ConnectOpts): Promise { + const clientInfo = getClientInfo(); + const { readableStream, writableStream } = await this.transport.connect({ url, token, clientInfo, connectOpts }); + const reader = readableStream.getReader(); + const { done, value } = await reader.read(); + reader.releaseLock(); + if(value?.message?.case !== 'join') { + throw new Error('Expected join response'); + } + if(done || !value) { + throw new Error('Connection closed without join response'); + } + this.readLoop(readableStream); + this.writer = writableStream.getWriter(); + + return value.message.value; + } + + async readLoop(readableStream: ReadableStream) { + const reader = readableStream.getReader(); + while (true) { + try { + + const { done, value } = await reader.read(); + if (done || !value) break; + + + const resolverId = getResolverId(value.message); + if(resolverId) { + const responseKey = getResponseKey(value.message.case, resolverId); + const future = this.promiseMap.get(responseKey); + if (future) { + future.resolve?.(value); + continue; + } + } + + switch(value.message.case) { + case 'join': + case 'answer': + case 'requestResponse': + console.warn(`received ${value.message.case} these should all be handled by the promise map`); + break; + case 'leave': + value.message.value. + this.close(); + break; + default: + console.debug(`received unsupported message ${value.message.case} `); + break; + } + + } catch(e) { + Array.from(this.promiseMap.values()).forEach(future => future.reject?.(e)); + this.promiseMap.clear(); + break; + } + } + } + + @atomic + async sendOfferAndAwaitAnswer(offer: RTCSessionDescriptionInit): Promise { + // const offerId = this.offerId++; + // if(!this.writer) { + // throw new Error('Writable stream not initialized'); + // } + + // const request = new SessionDescription({ + // type: 'offer', + // sdp: offer.sdp, + // // id: offer.id, + // }); + + // await this.writer.write([this.createClientRequest({ case: 'offer', value: request })]); + + // const future = new Future(); + // // we want an answer for this offer so we queue up a future for it + // this.promiseMap.set(getResponseKey('answer', offerId), future); + // const answerResponse = await future.promise; + + // if(answerResponse.message.case === 'answer') { + // return answerResponse.message.value; + // } + + throw new Error('Answer not found'); + } + + private getNextSequencer(): Sequencer { + return new Sequencer({ + messageId: this.sequenceNumber++, + lastProcessedRemoteMessageId: this.latestRemoteSequenceNumber, + }); + } + + + // @loggedMethod + async reconnect(): Promise { + //return this.transport.reconnect(); + } + + // @loggedMethod + close() { + return this.transport.disconnect(); + } +} + + +function getResponseKey(requestType: SignalResponse['message']['case'], messageId: number) { + return `${requestType}-${messageId}`; +} + + +function getResolverId(message: SignalResponse['message']) { + if(typeof message.value !== 'object') { + return null; + } + if('requestId' in message.value) { + return message.value.requestId; + } else if('id' in message.value) { + return message.value.id; + } + return null; +} \ No newline at end of file diff --git a/src/api/SignalTransport.ts b/src/api/SignalTransport.ts new file mode 100644 index 0000000000..5578881e00 --- /dev/null +++ b/src/api/SignalTransport.ts @@ -0,0 +1,216 @@ +import { ClientInfo, SignalRequest, SignalResponse } from '@livekit/protocol'; +import { WebSocketStream } from './WebsocketStream'; +import { atomic } from '../decorators'; +import { createRtcUrl, createValidateUrl } from './utils'; +import { isReactNative } from '../room/utils'; +import { ConnectionError, ConnectionErrorReason } from '../room/errors'; + +export enum SignalConnectionState { + Initial, + Connecting, + Connected, + Reconnecting, + Disconnecting, + Disconnected, +} + +// internal options +interface ConnectOpts extends SignalOptions { + /** internal */ + reconnect?: boolean; + /** internal */ + reconnectReason?: number; + /** internal */ + sid?: string; + /** internal */ + // joinRequest: JoinRequest; // TODO: add this back in +} + +// public options +export interface SignalOptions { + autoSubscribe: boolean; + adaptiveStream?: boolean; + maxRetries: number; + e2eeEnabled: boolean; + websocketTimeout: number; +} + +export interface ITransportOptions { + url: string; + token: string; + connectOpts: ConnectOpts; + clientInfo: ClientInfo; +} + +export interface ITransportConnection { + readableStream: ReadableStream; + writableStream: WritableStream; +} + +export interface ITransport { + connect(options: ITransportOptions): Promise; + disconnect(): Promise; + state: SignalConnectionState; +} + +export interface ITransportFactory { + create(): Promise; +} + +export class WSTransport implements ITransport { + private wsStream?: WebSocketStream; + + private _state: SignalConnectionState; + + constructor() { + this._state = SignalConnectionState.Initial; + } + + get state() { + return this._state; + } + + private updateState(state: SignalConnectionState) { + this._state = state; + } + + @atomic + async connect(options: ITransportOptions) { + this.updateState(SignalConnectionState.Connecting); + + const params = createConnectionParams(options.token, options.clientInfo, options.connectOpts); + const rtcUrl = createRtcUrl(options.url, params); + const validateUrl = createValidateUrl(rtcUrl); + + try { + + this.wsStream = new WebSocketStream(rtcUrl); + + + const connection = await this.wsStream.opened; + + this.wsStream.closed.catch((e) => { + console.error('encountered websocket error', e); + }).finally(() => { + this.updateState(SignalConnectionState.Disconnected); + this.wsStream = undefined; + }); + + this.updateState(SignalConnectionState.Connected); + + const requestEncoder = new ClientRequestEncoder(); + requestEncoder.readable.pipeTo(connection.writable); + + return { + readableStream: connection.readable.pipeThrough(new ServerResponseDecoder()), + writableStream: requestEncoder.writable, + }; + } catch (error) { + this.updateState(SignalConnectionState.Disconnecting); + const resp = await fetch(validateUrl); + this.updateState(SignalConnectionState.Disconnected); + if (resp.status.toFixed(0).startsWith('4')) { + const msg = await resp.text(); + throw new ConnectionError(msg, ConnectionErrorReason.NotAllowed, resp.status); + } else if (error instanceof Error) { + throw new ConnectionError( + `Encountered unknown websocket error during connection: ${error.name}: ${error.message}`, + ConnectionErrorReason.InternalError, + resp.status, + ); + } else { + throw error; + } + } + } + + async disconnect(reason?: string) { + this.updateState(SignalConnectionState.Disconnecting); + await this.wsStream?.close({ reason }); + this.updateState(SignalConnectionState.Disconnected); + } +} + +class ServerResponseDecoder extends TransformStream { + constructor() { + super({ + transform(chunk, controller) { + let resp: SignalResponse; + + if (typeof chunk === 'string') { + resp = SignalResponse.fromJson(JSON.parse(chunk), { ignoreUnknownFields: true }); + } else { + resp = SignalResponse.fromBinary(chunk); + } + + controller.enqueue(resp); + }, + }); + } +} + +class ClientRequestEncoder extends TransformStream { + constructor() { + super({ + transform(chunk, controller) { + controller.enqueue(chunk.toBinary()); + }, + }); + } +} + +function createConnectionParams( + token: string, + info: ClientInfo, + opts: ConnectOpts, +): URLSearchParams { + const params = new URLSearchParams(); + params.set('access_token', token); + + // opts + if (opts.reconnect) { + params.set('reconnect', '1'); + if (opts.sid) { + params.set('sid', opts.sid); + } + } + + params.set('auto_subscribe', opts.autoSubscribe ? '1' : '0'); + + // ClientInfo + params.set('sdk', isReactNative() ? 'reactnative' : 'js'); + params.set('version', info.version!); + params.set('protocol', info.protocol!.toString()); + if (info.deviceModel) { + params.set('device_model', info.deviceModel); + } + if (info.os) { + params.set('os', info.os); + } + if (info.osVersion) { + params.set('os_version', info.osVersion); + } + if (info.browser) { + params.set('browser', info.browser); + } + if (info.browserVersion) { + params.set('browser_version', info.browserVersion); + } + + if (opts.adaptiveStream) { + params.set('adaptive_stream', '1'); + } + + if (opts.reconnectReason) { + params.set('reconnect_reason', opts.reconnectReason.toString()); + } + + // @ts-ignore + if (navigator.connection?.type) { + // @ts-ignore + params.set('network', navigator.connection.type); + } + + return params; +} + diff --git a/src/api/WebSocketStream.ts b/src/api/WebSocketStream.ts new file mode 100644 index 0000000000..51f520e272 --- /dev/null +++ b/src/api/WebSocketStream.ts @@ -0,0 +1,90 @@ +/** + * This is a polyfill for the WebSocketStream API. + * source: https://github.com/CarterLi/websocketstream-polyfill + */ + +export interface WebSocketConnection { + readable: ReadableStream; + writable: WritableStream; + protocol: string; + extensions: string; +} + +export interface WebSocketCloseInfo { + closeCode?: number; + reason?: string; +} + +export interface WebSocketStreamOptions { + protocols?: string[]; + signal?: AbortSignal; +} + +/** + * [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) with [Streams API](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API) + * + * @see https://web.dev/websocketstream/ + */ +export class WebSocketStream { + readonly url: string; + + readonly opened: Promise>; + + readonly closed: Promise; + + readonly close: (closeInfo?: WebSocketCloseInfo) => void; + + constructor(url: string, options: WebSocketStreamOptions = {}) { + if (options.signal?.aborted) { + throw new DOMException('This operation was aborted', 'AbortError'); + } + + this.url = url; + + const ws = new WebSocket(url, options.protocols ?? []); + + const closeWithInfo = ({ closeCode: code, reason }: WebSocketCloseInfo = {}) => + ws.close(code, reason); + + this.opened = new Promise((resolve, reject) => { + ws.onopen = () => { + resolve({ + readable: new ReadableStream({ + start(controller) { + ws.onmessage = ({ data }) => controller.enqueue(data); + ws.onerror = (e) => controller.error(e); + }, + cancel: closeWithInfo, + }), + writable: new WritableStream({ + write(chunk) { + ws.send(chunk); + }, + abort() { + closeWithInfo({ closeCode: 1000, reason: 'Aborted' }); + }, + close: closeWithInfo, + }), + protocol: ws.protocol, + extensions: ws.extensions, + }); + ws.removeEventListener('error', reject); + }; + ws.addEventListener('error', reject); + }); + + this.closed = new Promise((resolve, reject) => { + ws.onclose = ({ code, reason }) => { + resolve({ closeCode: code, reason }); + ws.removeEventListener('error', reject); + }; + ws.addEventListener('error', reject); + }); + + if (options.signal) { + options.signal.onabort = () => ws.close(); + } + + this.close = closeWithInfo; + } +} diff --git a/src/api/WireMessageConverter.test.ts b/src/api/WireMessageConverter.test.ts new file mode 100644 index 0000000000..542cf29bb0 --- /dev/null +++ b/src/api/WireMessageConverter.test.ts @@ -0,0 +1,497 @@ +import { Envelope, Fragment, Signalv2ClientMessage, Signalv2WireMessage } from '@livekit/protocol'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { WireMessageConverter } from './WireMessageConverter'; + +describe('WireMessageConverter', () => { + let converter: WireMessageConverter; + + beforeEach(() => { + converter = new WireMessageConverter(); + }); + + describe('wireMessageToEnvelope', () => { + it('should return envelope directly when wire message contains envelope', () => { + const testEnvelope = new Envelope({ + clientMessages: [], + }); + + const wireMessage = new Signalv2WireMessage({ + message: { + case: 'envelope', + value: testEnvelope, + }, + }); + + const result = converter.wireMessageToEnvelope(wireMessage); + + expect(result).toBe(testEnvelope); + }); + + it('should return null for incomplete fragments', () => { + const fragment = new Fragment({ + packetId: 1, + fragmentNumber: 1, + data: new Uint8Array([1, 2, 3]), + totalSize: 10, + numFragments: 3, + }); + + const wireMessage = new Signalv2WireMessage({ + message: { + case: 'fragment', + value: fragment, + }, + }); + + const result = converter.wireMessageToEnvelope(wireMessage); + + expect(result).toBeNull(); + }); + + it('should assemble fragments when all are received', () => { + const connectRequest = new Signalv2ClientMessage({ + message: { + case: 'connectRequest', + value: { + metadata: 'test-metadata', + }, + }, + }); + + const testEnvelope = new Envelope({ + clientMessages: [connectRequest], + }); + const binaryEnvelope = testEnvelope.toBinary(); + + // Create fragments + const fragment1 = new Fragment({ + packetId: 1, + fragmentNumber: 1, + data: binaryEnvelope.slice(0, Math.ceil(binaryEnvelope.length / 2)), + totalSize: binaryEnvelope.byteLength, + numFragments: 2, + }); + + const fragment2 = new Fragment({ + packetId: 1, + fragmentNumber: 2, + data: binaryEnvelope.slice(Math.ceil(binaryEnvelope.length / 2)), + totalSize: binaryEnvelope.byteLength, + numFragments: 2, + }); + + const wireMessage1 = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment1 }, + }); + + const wireMessage2 = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment2 }, + }); + + // First fragment should return null + const result1 = converter.wireMessageToEnvelope(wireMessage1); + expect(result1).toBeNull(); + + // Second fragment should return assembled envelope + const result2 = converter.wireMessageToEnvelope(wireMessage2); + expect(result2).not.toBeNull(); + expect(result2?.clientMessages).toEqual([connectRequest]); + }); + + it('should handle fragments received out of order', () => { + const testEnvelope = new Envelope({ + clientMessages: [], + }); + const binaryEnvelope = testEnvelope.toBinary(); + + // Create fragments + const fragment1 = new Fragment({ + packetId: 2, + fragmentNumber: 1, + data: binaryEnvelope.slice(0, Math.ceil(binaryEnvelope.length / 2)), + totalSize: binaryEnvelope.byteLength, + numFragments: 2, + }); + + const fragment2 = new Fragment({ + packetId: 2, + fragmentNumber: 2, + data: binaryEnvelope.slice(Math.ceil(binaryEnvelope.length / 2)), + totalSize: binaryEnvelope.byteLength, + numFragments: 2, + }); + + const wireMessage1 = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment1 }, + }); + + const wireMessage2 = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment2 }, + }); + + // Receive second fragment first + const result1 = converter.wireMessageToEnvelope(wireMessage2); + expect(result1).toBeNull(); + + // Receive first fragment second + const result2 = converter.wireMessageToEnvelope(wireMessage1); + expect(result2).not.toBeNull(); + expect(result2?.clientMessages).toEqual([]); + }); + + it('should return null and clear buffer when fragment total size is incorrect', () => { + const consoleWarnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const fragment1 = new Fragment({ + packetId: 3, + fragmentNumber: 1, + data: new Uint8Array([1, 2, 3]), + totalSize: 10, // Incorrect total size + numFragments: 2, + }); + + const fragment2 = new Fragment({ + packetId: 3, + fragmentNumber: 2, + data: new Uint8Array([4, 5, 6]), + totalSize: 10, // Incorrect total size + numFragments: 2, + }); + + const wireMessage1 = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment1 }, + }); + + const wireMessage2 = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment2 }, + }); + + converter.wireMessageToEnvelope(wireMessage1); + const result = converter.wireMessageToEnvelope(wireMessage2); + + expect(result).toBeNull(); + expect(consoleWarnSpy).toHaveBeenCalledWith( + expect.stringContaining('Fragments of packet 3 have incorrect size'), + ); + + consoleWarnSpy.mockRestore(); + }); + + it('should handle multiple fragment sets simultaneously', () => { + const testEnvelope1 = new Envelope({ + clientMessages: [], + }); + const testEnvelope2 = new Envelope({ + clientMessages: [], + }); + + const binaryEnvelope1 = testEnvelope1.toBinary(); + const binaryEnvelope2 = testEnvelope2.toBinary(); + + // Create fragments for first envelope (packet ID 1) + const frag1Part1 = new Fragment({ + packetId: 1, + fragmentNumber: 1, + data: binaryEnvelope1.slice(0, Math.ceil(binaryEnvelope1.length / 2)), + totalSize: binaryEnvelope1.byteLength, + numFragments: 2, + }); + + // Create fragments for second envelope (packet ID 2) + const frag2Part1 = new Fragment({ + packetId: 2, + fragmentNumber: 1, + data: binaryEnvelope2.slice(0, Math.ceil(binaryEnvelope2.length / 2)), + totalSize: binaryEnvelope2.byteLength, + numFragments: 2, + }); + + // Process first fragments + const result1 = converter.wireMessageToEnvelope( + new Signalv2WireMessage({ + message: { case: 'fragment', value: frag1Part1 }, + }), + ); + expect(result1).toBeNull(); + + const result2 = converter.wireMessageToEnvelope( + new Signalv2WireMessage({ + message: { case: 'fragment', value: frag2Part1 }, + }), + ); + expect(result2).toBeNull(); + + // Complete first envelope + const frag1Part2 = new Fragment({ + packetId: 1, + fragmentNumber: 2, + data: binaryEnvelope1.slice(Math.ceil(binaryEnvelope1.length / 2)), + totalSize: binaryEnvelope1.byteLength, + numFragments: 2, + }); + + const result3 = converter.wireMessageToEnvelope( + new Signalv2WireMessage({ + message: { case: 'fragment', value: frag1Part2 }, + }), + ); + expect(result3?.clientMessages).toEqual([]); + }); + + it('should return null for wire messages with unknown message case', () => { + const wireMessage = new Signalv2WireMessage({ + message: { + case: undefined, + value: undefined, + }, + }); + + const result = converter.wireMessageToEnvelope(wireMessage); + expect(result).toBeNull(); + }); + }); + + describe('envelopeToWireMessages', () => { + it('should return single wire message for small envelope', () => { + const testEnvelope = new Envelope({ + clientMessages: [], + }); + + const wireMessages = converter.envelopeToWireMessages(testEnvelope); + + expect(wireMessages).toHaveLength(1); + expect(wireMessages[0].message.case).toBe('envelope'); + expect(wireMessages[0].message.value).toBe(testEnvelope); + }); + + it('should fragment large envelope into multiple wire messages', () => { + const consoleInfoSpy = vi.spyOn(console, 'info').mockImplementation(() => {}); + + // Create a large envelope (larger than 16,000 bytes) + const largeEnvelope = new Envelope({ + clientMessages: [], + }); + + // Override toBinary to return large data + largeEnvelope.toBinary = vi.fn(() => new Uint8Array(20000).fill(42)); + + const wireMessages = converter.envelopeToWireMessages(largeEnvelope); + + expect(wireMessages.length).toBeGreaterThan(1); + expect(consoleInfoSpy).toHaveBeenCalledWith( + expect.stringContaining('Sending fragmented envelope'), + ); + + // Verify all wire messages are fragments + wireMessages.forEach((wireMessage) => { + expect(wireMessage.message.case).toBe('fragment'); + expect(wireMessage.message.value).toBeInstanceOf(Fragment); + }); + + // Verify fragment properties + const fragments = wireMessages.map((wm) => wm.message.value as Fragment); + const totalFragments = fragments.length; + + fragments.forEach((fragment, index) => { + expect(fragment.packetId).toBe(0); + expect(fragment.fragmentNumber).toBe(index + 1); + expect(fragment.numFragments).toBe(totalFragments); + expect(fragment.totalSize).toBe(20000); + }); + + consoleInfoSpy.mockRestore(); + }); + + it('should create fragments that can be reassembled correctly', () => { + // Create a moderately large envelope that will be fragmented + const connectRequest = new Signalv2ClientMessage({ + message: { + case: 'connectRequest', + value: { + metadata: new Array(20_000).fill('a').join(''), + }, + }, + }); + const testEnvelope = new Envelope({ + clientMessages: [connectRequest], + }); + + // Fragment the envelope + const wireMessages = converter.envelopeToWireMessages(testEnvelope); + expect(wireMessages.length).toBeGreaterThan(1); + console.log('wireMessages', wireMessages); + // Create new converter to test reassembly + const reassemblyConverter = new WireMessageConverter(); + + // Feed fragments back to converter + let reassembledEnvelope: Envelope | null = null; + for (const wireMessage of wireMessages) { + const result = reassemblyConverter.wireMessageToEnvelope(wireMessage); + if (result !== null) { + reassembledEnvelope = result; + } + } + + expect(reassembledEnvelope).not.toBeNull(); + expect(reassembledEnvelope?.clientMessages).toEqual([connectRequest]); + }); + + it('should handle exactly MAX_WIRE_MESSAGE_SIZE envelope', () => { + // Create envelope that's exactly under the size limit + const envelope = new Envelope({ + clientMessages: [], + }); + + const wireMessages = converter.envelopeToWireMessages(envelope); + + // Should be a single message since it's not over the limit + expect(wireMessages).toHaveLength(1); + expect(wireMessages[0].message.case).toBe('envelope'); + }); + + it('should handle empty envelope', () => { + const emptyEnvelope = new Envelope({ + clientMessages: [], + }); + + const wireMessages = converter.envelopeToWireMessages(emptyEnvelope); + + expect(wireMessages).toHaveLength(1); + expect(wireMessages[0].message.case).toBe('envelope'); + expect(wireMessages[0].message.value).toBe(emptyEnvelope); + }); + + it('should create fragments with sequential packet IDs', () => { + const envelope = new Envelope({ + clientMessages: [], + }); + + // Override toBinary to return large data + envelope.toBinary = vi.fn(() => new Uint8Array(20000).fill(1)); + + const wireMessages = converter.envelopeToWireMessages(envelope); + const fragments = wireMessages.map((wm) => wm.message.value as Fragment); + + // All fragments should have the same packet ID (0 in this implementation) + const packetIds = fragments.map((f) => f.packetId); + expect(new Set(packetIds).size).toBe(1); + expect(packetIds[0]).toBe(0); + }); + }); + + describe('clearFragmentBuffer', () => { + it('should clear all buffered fragments', () => { + // Add some fragments to the buffer + const fragment = new Fragment({ + packetId: 1, + fragmentNumber: 1, + data: new Uint8Array([1, 2, 3]), + totalSize: 6, + numFragments: 2, + }); + + const wireMessage = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment }, + }); + + // This should buffer the fragment + converter.wireMessageToEnvelope(wireMessage); + + // Clear the buffer + converter.clearFragmentBuffer(); + + // Adding the second fragment should now return null since buffer was cleared + const fragment2 = new Fragment({ + packetId: 1, + fragmentNumber: 2, + data: new Uint8Array([4, 5, 6]), + totalSize: 6, + numFragments: 2, + }); + + const wireMessage2 = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment2 }, + }); + + const result = converter.wireMessageToEnvelope(wireMessage2); + expect(result).toBeNull(); + }); + + it('should not affect direct envelope processing', () => { + const testEnvelope = new Envelope({ + clientMessages: [], + }); + + const wireMessage = new Signalv2WireMessage({ + message: { case: 'envelope', value: testEnvelope }, + }); + + converter.clearFragmentBuffer(); + + const result = converter.wireMessageToEnvelope(wireMessage); + expect(result).toBe(testEnvelope); + }); + }); + + describe('edge cases and error handling', () => { + it('should handle fragments with zero data', () => { + const fragment = new Fragment({ + packetId: 1, + fragmentNumber: 1, + data: new Uint8Array(0), + totalSize: 0, + numFragments: 1, + }); + + const wireMessage = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragment }, + }); + + const result = converter.wireMessageToEnvelope(wireMessage); + expect(result).not.toBeNull(); + }); + + it('should handle very large fragment counts', () => { + const testEnvelope = new Envelope({ + clientMessages: [], + }); + const binaryEnvelope = testEnvelope.toBinary(); + + // Create many small fragments + const numFragments = 100; + const fragmentSize = Math.ceil(binaryEnvelope.length / numFragments); + + const fragments: Fragment[] = []; + for (let i = 1; i <= numFragments; i++) { + const start = i * fragmentSize; + const end = Math.min(start + fragmentSize, binaryEnvelope.length); + fragments.push( + new Fragment({ + packetId: 1, + fragmentNumber: i, + data: binaryEnvelope.slice(start, end), + totalSize: binaryEnvelope.byteLength, + numFragments, + }), + ); + } + + // Process all but the last fragment + for (let i = 0; i < fragments.length - 1; i++) { + const wireMessage = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragments[i] }, + }); + const result = converter.wireMessageToEnvelope(wireMessage); + expect(result).toBeNull(); + } + + // Process the last fragment + const lastWireMessage = new Signalv2WireMessage({ + message: { case: 'fragment', value: fragments[fragments.length - 1] }, + }); + const result = converter.wireMessageToEnvelope(lastWireMessage); + expect(result).not.toBeNull(); + expect(result?.clientMessages).toEqual([]); + }); + }); +}); diff --git a/src/api/WireMessageConverter.ts b/src/api/WireMessageConverter.ts new file mode 100644 index 0000000000..9f09a4f472 --- /dev/null +++ b/src/api/WireMessageConverter.ts @@ -0,0 +1,96 @@ +import { Envelope, Fragment, Signalv2WireMessage } from '@livekit/protocol'; + +const MAX_WIRE_MESSAGE_SIZE = 16_000; + +export class WireMessageConverter { + private readonly fragmentBuffer: Map> = new Map(); + + /** + * @param wireMessage - The wire message to convert to an envelope + * @returns The envelope of the wire message. If the wire message is a fragment, it will be buffered and returned only when the envelope completing fragment is passed. Immediately returns null if the wire message is a fragment and the envelope is not complete. + */ + wireMessageToEnvelope(wireMessage: Signalv2WireMessage): Envelope | null { + if (wireMessage.message.case === 'envelope') { + return wireMessage.message.value; + } else if (wireMessage.message.case === 'fragment') { + const fragment = wireMessage.message.value; + + const buffer = + this.fragmentBuffer.get(fragment.packetId) || + new Array(fragment.numFragments).fill(null); + buffer[fragment.fragmentNumber - 1] = fragment; + this.fragmentBuffer.set(fragment.packetId, buffer); + + if (buffer.every((f) => f !== null)) { + const totalDataReceived = buffer.reduce((acc, f) => acc + f.data.byteLength, 0); + if (totalDataReceived !== fragment.totalSize) { + console.warn( + `Fragments of packet ${fragment.packetId} have incorrect size: ${totalDataReceived} !== ${fragment.totalSize}`, + ); + console.log('buffer', buffer); + + this.fragmentBuffer.delete(fragment.packetId); + return null; + } + const rawEnvelope = new Uint8Array(totalDataReceived); + let offset = 0; + for (const f of buffer) { + rawEnvelope.set(f.data, offset); + offset += f.data.byteLength; + } + const envelope = Envelope.fromBinary(rawEnvelope); + this.fragmentBuffer.delete(fragment.packetId); + return envelope; + } + return null; + } + return null; + } + + clearFragmentBuffer(): void { + this.fragmentBuffer.clear(); + } + + /** + * @param envelope - The envelope to convert to wire messages + * @returns The wire messages + */ + envelopeToWireMessages(envelope: Envelope): Array { + const binaryEnvelope = envelope.toBinary(); + const envelopeSize = binaryEnvelope.byteLength; + if (envelopeSize > MAX_WIRE_MESSAGE_SIZE) { + console.info(`Sending fragmented envelope of ${envelopeSize} bytes`); + const numFragments = Math.ceil(envelopeSize / MAX_WIRE_MESSAGE_SIZE); + const fragments = []; + + for (let i = 0; i < numFragments; i++) { + fragments.push( + new Fragment({ + packetId: 0, + fragmentNumber: i + 1, + data: binaryEnvelope.slice(i * MAX_WIRE_MESSAGE_SIZE, (i + 1) * MAX_WIRE_MESSAGE_SIZE), + totalSize: envelopeSize, + numFragments, + }), + ); + } + return fragments.map( + (fragment) => + new Signalv2WireMessage({ + message: { + case: 'fragment', + value: fragment, + }, + }), + ); + } else { + const wireMessage = new Signalv2WireMessage({ + message: { + case: 'envelope', + value: envelope, + }, + }); + return [wireMessage]; + } + } +} diff --git a/src/decorators.ts b/src/decorators.ts new file mode 100644 index 0000000000..59db57505b --- /dev/null +++ b/src/decorators.ts @@ -0,0 +1,32 @@ +// export function loggedMethod( +// target: (this: This, ...args: Args) => Return, +// context: ClassMethodDecoratorContext Return> +// ) { +// const methodName = String(context.name); +import { Mutex } from '@livekit/mutex'; + +// function replacementMethod(this: This, ...args: Args): Return { +// console.debug(`LOG: Entering method '${methodName}'.`) +// const result = target.call(this, ...args); +// console.debug(`LOG: Exiting method '${methodName}'.`) +// return result; +// } + +// return replacementMethod; +// } + +export function atomic(originalMethod: any) { + const mutex = new Mutex(); + + async function replacementMethod(this: any, ...args: any[]) { + const unlock = await mutex.lock(); + try { + const result = await originalMethod.call(this, ...args); + return result; + } finally { + unlock(); + } + } + + return replacementMethod; +} diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index fe2eaf8b12..e5cc404339 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -9,7 +9,7 @@ import { DataPacket, DataPacket_Kind, DisconnectReason, - type JoinResponse, + JoinResponse, type LeaveRequest, LeaveRequest_Action, ParticipantInfo, @@ -37,12 +37,14 @@ import { import { EventEmitter } from 'events'; import type { MediaAttributes } from 'sdp-transform'; import type TypedEventEmitter from 'typed-emitter'; +import { SignalAPI } from '../api/SignalAPI'; import type { SignalOptions } from '../api/SignalClient'; import { SignalClient, SignalConnectionState, toProtoSessionDescription, } from '../api/SignalClient'; +import { DCSignalTransport } from '../api/SignalTransport'; import log, { LoggerNames, getLogger } from '../logger'; import type { InternalRoomOptions } from '../options'; import { DataPacketBuffer } from '../utils/dataPacketBuffer'; @@ -98,6 +100,8 @@ enum PCState { export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter) { client: SignalClient; + signalAPI: SignalAPI; + rtcConfig: RTCConfiguration = {}; peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; @@ -201,6 +205,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit loggerContextCb: () => this.logContext, }; this.client = new SignalClient(undefined, this.loggerOptions); + this.signalAPI = new SignalAPI(new DCSignalTransport(new RTCPeerConnection())); this.client.signalLatency = this.options.expSignalLatency; this.reconnectPolicy = this.options.reconnectPolicy; this.registerOnLineListener(); @@ -249,10 +254,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.joinAttempts += 1; this.setupSignalClientCallbacks(); - const joinResponse = await this.client.join(url, token, opts, abortSignal); + const connectResponse = await this.signalAPI.join(url, token); + const joinResponse = new JoinResponse({ + ...connectResponse, + }); this._isClosed = false; this.latestJoinResponse = joinResponse; - this.subscriberPrimary = joinResponse.subscriberPrimary; if (!this.pcManager) { await this.configure(joinResponse); diff --git a/src/room/Room.ts b/src/room/Room.ts index 979b0a7a4f..30c6b7c397 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -215,6 +215,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) */ constructor(options?: RoomOptions) { super(); + console.warn('Room constructor again'); this.setMaxListeners(100); this.remoteParticipants = new Map(); this.sidToIdentity = new Map(); diff --git a/src/room/agent.ts b/src/room/agent.ts new file mode 100644 index 0000000000..2ab7410e06 --- /dev/null +++ b/src/room/agent.ts @@ -0,0 +1,26 @@ +import type { AgentState } from './attribute-typings'; +import RemoteParticipant from './participant/RemoteParticipant'; + +export interface Agent extends RemoteParticipant { + interrupt(): Promise; + sendContext(context: string): Promise; +} + +export interface AgentSession { + // connection + connect(): Promise; + disconnect(): Promise; + + // agent controls + interrupt(): Promise; + sendContext(context: string): Promise; + agent?: Agent; + + // local user controls + setMicrophoneEnabled(enabled: boolean): Promise; + setCameraEnabled(enabled: boolean): Promise; + + // messaging + sendMessage(message: Message): Promise; + messages: Message[]; +}