diff --git a/package.json b/package.json index 27c1e09..c6571c5 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,6 @@ "release": "np" }, "dependencies": { - "@microsoft/fetch-event-source": "^2.0.1", "deepmerge": "^4.2.2", "linkify-html": "^3.0.5", "linkifyjs": "^3.0.5", diff --git a/src/network/TockEventSource.ts b/src/network/TockEventSource.ts index 556b7e4..23e40ab 100644 --- a/src/network/TockEventSource.ts +++ b/src/network/TockEventSource.ts @@ -1,20 +1,46 @@ import { BotConnectorResponse } from '../model/responses'; -import { - EventSourceMessage, - EventStreamContentType, - fetchEventSource, -} from '@microsoft/fetch-event-source'; - -class RetriableError extends Error {} -class FatalError extends Error {} const INITIAL_RETRY_DELAY = 0; const RETRY_DELAY_INCREMENT = 1000; const MAX_RETRY_DELAY = 15000; +enum SseStatus { + /** + * The server is not answering, or answering with a 1XX, 3XX, 429, or 5XX HTTP status code + */ + SERVER_UNAVAILABLE = -1, + /** + * The server is answering with a 4XX HTTP status code, except 429 (rate limit) + */ + UNSUPPORTED = 0, + /** + * The server is answering with a 2XX HTTP status code + */ + SUPPORTED = 1, +} + +async function getSseStatus(url: string) { + try { + const response = await fetch(url); + if (response.ok) { + return SseStatus.SUPPORTED; + } else if ( + response.status >= 400 && + response.status < 500 && + response.status !== 429 + ) { + return SseStatus.UNSUPPORTED; + } else { + return SseStatus.SERVER_UNAVAILABLE; + } + } catch (_) { + return SseStatus.SERVER_UNAVAILABLE; + } +} + export class TockEventSource { private initialized: boolean; - private abortController: AbortController; + private eventSource: EventSource | null; private retryDelay: number; onResponse: (botResponse: BotConnectorResponse) => void; onStateChange: (state: number) => void; @@ -38,60 +64,55 @@ export class TockEventSource { */ open(endpoint: string, userId: string): Promise { this.onStateChange(EventSource.CONNECTING); - this.abortController = new AbortController(); + const url = `${endpoint}/sse?userid=${userId}`; return new Promise((resolve, reject): void => { - fetchEventSource(`${endpoint}/sse?userid=${userId}`, { - signal: this.abortController.signal, - onopen: async (response) => { - if ( - response.ok && - response.headers - .get('content-type') - ?.includes(EventStreamContentType) - ) { - this.onStateChange(EventSource.OPEN); - this.initialized = true; - resolve(); - return; - } else if ( - response.status >= 400 && - response.status < 500 && - response.status !== 429 - ) { - throw new FatalError(); - } else { - throw new RetriableError(); - } - }, - onmessage: (e: EventSourceMessage) => { - if (e.event === 'message') { - this.onResponse(JSON.parse(e.data)); - } - }, - onerror: (err) => { - if (err instanceof FatalError) { - throw err; // rethrow to stop the operation - } else { - const retryDelay = this.retryDelay; - this.retryDelay = Math.min( - MAX_RETRY_DELAY, - retryDelay + RETRY_DELAY_INCREMENT, - ); - return retryDelay; - } - }, - }) - .catch((e) => console.error(e)) - .finally(() => { - reject(); - this.onStateChange(EventSource.CLOSED); - this.initialized = false; - }); + this.tryOpen(url, resolve, reject); + }); + } + + private tryOpen(url: string, resolve: () => void, reject: () => void) { + this.eventSource = new EventSource(url); + this.eventSource.addEventListener('open', () => { + this.onStateChange(EventSource.OPEN); + this.initialized = true; + this.retryDelay = INITIAL_RETRY_DELAY; + resolve(); }); + this.eventSource.addEventListener('error', () => { + this.eventSource?.close(); + this.retry(url, reject, resolve); + }); + this.eventSource.addEventListener('message', (e) => { + this.onResponse(JSON.parse(e.data)); + }); + } + + private retry(url: string, reject: () => void, resolve: () => void) { + const retryDelay = this.retryDelay; + this.retryDelay = Math.min( + MAX_RETRY_DELAY, + retryDelay + RETRY_DELAY_INCREMENT, + ); + setTimeout(async () => { + switch (await getSseStatus(url)) { + case SseStatus.UNSUPPORTED: + reject(); + this.close(); + break; + case SseStatus.SUPPORTED: + this.tryOpen(url, resolve, reject); + break; + case SseStatus.SERVER_UNAVAILABLE: + this.retry(url, reject, resolve); + break; + } + }, retryDelay); } close() { - this.abortController?.abort(); + this.eventSource?.close(); + this.eventSource = null; this.initialized = false; + this.onStateChange(EventSource.CLOSED); } } diff --git a/yarn.lock b/yarn.lock index 42137b4..f9e007c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1381,11 +1381,6 @@ "@types/mdx" "^2.0.0" "@types/react" ">=16" -"@microsoft/fetch-event-source@^2.0.1": - version "2.0.1" - resolved "https://registry.yarnpkg.com/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz#9ceecc94b49fbaa15666e38ae8587f64acce007d" - integrity sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA== - "@ndelangen/get-tarball@^3.0.7": version "3.0.9" resolved "https://registry.yarnpkg.com/@ndelangen/get-tarball/-/get-tarball-3.0.9.tgz#727ff4454e65f34707e742a59e5e6b1f525d8964"