Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pnpm-workspace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
overrides:
'@livekit/protocol': link:../protocol/packages/javascript
141 changes: 141 additions & 0 deletions src/api/SignalAPI.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { JoinResponse, ReconnectResponse } from '@livekit/protocol';
import { describe, expect, it, vi } from 'vitest';
import { sleep } from '../room/utils';
import { SignalAPI } from './SignalAPI';
import type { ITransport } from './SignalAPI';

// A helper to create a minimal dummy transport whose methods are jest/vi spies
function createDummyTransport(overrides: Partial<ITransport> = {}): ITransport {
// placeholders that will be overridden when `onMessage` / `onError` are registered
let messageHandler: ((data: Uint8Array) => void) | undefined;
let errorHandler: ((error: Error) => void) | undefined;

const dummyTransport: ITransport = {
connect: vi.fn(async (...args: unknown[]) => {
void args; // silence unused parameter lint errors
return {} as unknown as JoinResponse;
}),
send: vi.fn(async () => {}),
close: vi.fn(async () => {}),
reconnect: vi.fn(async () => ({}) as unknown as ReconnectResponse),
onMessage: (cb) => {
messageHandler = cb;
},
onError: (cb) => {
errorHandler = cb;
},
...overrides,
} as ITransport;

// Expose ways to trigger the callbacks inside tests
// @ts-expect-error – we attach these for test-only usage
dummyTransport.__triggerMessage = (data: Uint8Array) => messageHandler?.(data);
// @ts-expect-error – we attach these for test-only usage
dummyTransport.__triggerError = (err: Error) => errorHandler?.(err);

return dummyTransport;
}

describe('SignalAPI', () => {
it('calls transport.connect when join is invoked', async () => {
const joinResponse = { joined: true } as unknown as JoinResponse;

const transport = createDummyTransport({
connect: vi.fn(async () => joinResponse),
});

const api = new SignalAPI(transport);
void api;

const url = 'wss://example.com';
const token = 'fake-token';

const result = await api.join(url, token);

expect(transport.connect).toHaveBeenCalledWith(url, token);
expect(result).toBe(joinResponse);
});

it('forwards reconnect to transport.reconnect', async () => {
const reconnectResponse = { reconnected: true } as unknown as ReconnectResponse;

const transport = createDummyTransport({
reconnect: vi.fn(async () => reconnectResponse),
});

const api = new SignalAPI(transport);
void api;

const result = await api.reconnect();

expect(transport.reconnect).toHaveBeenCalled();
expect(result).toBe(reconnectResponse);
});

it('handles onMessage events from the transport', () => {
const transport = createDummyTransport();
const api = new SignalAPI(transport);
void api;

const consoleSpy = vi.spyOn(console, 'log').mockImplementation(() => {});

// @ts-expect-error – trigger helper added in createDummyTransport
transport.__triggerMessage(new Uint8Array([1, 2, 3]));

expect(consoleSpy).toHaveBeenCalledWith('onMessage', new Uint8Array([1, 2, 3]));

consoleSpy.mockRestore();
});

it('handles onError events from the transport', () => {
const transport = createDummyTransport();
const api = new SignalAPI(transport);
void api;

const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});

const error = new Error('dummy');
// @ts-expect-error – trigger helper added in createDummyTransport
transport.__triggerError(error);

expect(consoleErrorSpy).toHaveBeenCalledWith('onError', error);

consoleErrorSpy.mockRestore();
});

it('ensures parallel join calls are executed sequentially', async () => {
const resolvers: Array<() => void> = [];

const connect = vi.fn((url: string, token: string) => {
void url;
void token;
return new Promise<JoinResponse>((resolve) => {
resolvers.push(() => resolve({} as unknown as JoinResponse));
});
});

const transport = createDummyTransport({ connect });
const api = new SignalAPI(transport);
void api;

// Trigger two join calls without awaiting the first
const joinPromise1 = api.join('wss://example.com', 'token-1');
const joinPromise2 = api.join('wss://example.com', 'token-2');

// Only the first connect should have been invoked at this point
await sleep(5);
expect(connect).toHaveBeenCalledTimes(1);

// Resolve the first join
resolvers[0]();
await joinPromise1;

// Now the second connect should have been called
await sleep(5);
expect(connect).toHaveBeenCalledTimes(2);

// Resolve the second join
resolvers[1]();
await joinPromise2;
});
});
148 changes: 148 additions & 0 deletions src/api/SignalAPI.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { SignalRequest, SignalResponse, JoinResponse } from '@livekit/protocol';
import type { ITransport } from './SignalTransport';
import { Future, getClientInfo } from '../room/utils';
import { atomic } from '../decorators';


export class SignalAPI {

private writer?: WritableStreamDefaultWriter<SignalRequest>;

private promiseMap = new Map<string, Future<SignalResponse>>();

private offerId = 0;

private transport: ITransport;

private sequenceNumber = 0;

private latestRemoteSequenceNumber = 0;

constructor(transport: ITransport) {
this.transport = transport;
}

@atomic
async join(url: string, token: string, connectOpts: ConnectOpts): Promise<JoinResponse> {
const clientInfo = getClientInfo();
const { readableStream, writableStream } = await this.transport.connect({ url, token, clientInfo, connectOpts });
const reader = readableStream.getReader();
const { done, value } = await reader.read();
reader.releaseLock();
if(value?.message?.case !== 'join') {
throw new Error('Expected join response');
}
if(done || !value) {
throw new Error('Connection closed without join response');
}
this.readLoop(readableStream);
this.writer = writableStream.getWriter();

return value.message.value;
}

async readLoop(readableStream: ReadableStream<SignalResponse>) {
const reader = readableStream.getReader();
while (true) {
try {

const { done, value } = await reader.read();
if (done || !value) break;


const resolverId = getResolverId(value.message);
if(resolverId) {
const responseKey = getResponseKey(value.message.case, resolverId);
const future = this.promiseMap.get(responseKey);
if (future) {
future.resolve?.(value);
continue;
}
}

switch(value.message.case) {
case 'join':
case 'answer':
case 'requestResponse':
console.warn(`received ${value.message.case} these should all be handled by the promise map`);
break;
case 'leave':
value.message.value.
this.close();
break;
default:
console.debug(`received unsupported message ${value.message.case} `);
break;
}

} catch(e) {
Array.from(this.promiseMap.values()).forEach(future => future.reject?.(e));
this.promiseMap.clear();
break;
}
}
}

@atomic
async sendOfferAndAwaitAnswer(offer: RTCSessionDescriptionInit): Promise<SessionDescription> {

Check failure on line 87 in src/api/SignalAPI.ts

View workflow job for this annotation

GitHub Actions / test

'offer' is defined but never used
// const offerId = this.offerId++;
// if(!this.writer) {
// throw new Error('Writable stream not initialized');
// }

// const request = new SessionDescription({
// type: 'offer',
// sdp: offer.sdp,
// // id: offer.id,
// });

// await this.writer.write([this.createClientRequest({ case: 'offer', value: request })]);

// const future = new Future<Signalv2ServerMessage>();
// // we want an answer for this offer so we queue up a future for it
// this.promiseMap.set(getResponseKey('answer', offerId), future);
// const answerResponse = await future.promise;

// if(answerResponse.message.case === 'answer') {
// return answerResponse.message.value;
// }

throw new Error('Answer not found');
}

private getNextSequencer(): Sequencer {
return new Sequencer({
messageId: this.sequenceNumber++,
lastProcessedRemoteMessageId: this.latestRemoteSequenceNumber,
});
}


// @loggedMethod
async reconnect(): Promise<void> {
//return this.transport.reconnect();
}

// @loggedMethod
close() {
return this.transport.disconnect();
}
}


function getResponseKey(requestType: SignalResponse['message']['case'], messageId: number) {
return `${requestType}-${messageId}`;
}


function getResolverId(message: SignalResponse['message']) {
if(typeof message.value !== 'object') {
return null;
}
if('requestId' in message.value) {
return message.value.requestId;
} else if('id' in message.value) {
return message.value.id;
}
return null;
}
Loading
Loading