From aa362eead477dd436e944cd5bd9b695327bd5430 Mon Sep 17 00:00:00 2001 From: Matthew McAllister Date: Wed, 5 Nov 2025 22:40:49 -0500 Subject: [PATCH 1/6] OPDATA-4087 track WS streamHandler connection attempts --- package.json | 2 +- src/transports/websocket.ts | 38 ++++++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index 7c382cc4..0137e310 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@chainlink/external-adapter-framework", - "version": "2.8.0", + "version": "2.8.1", "main": "dist/index.js", "license": "MIT", "repository": "git://github.com/smartcontractkit/ea-framework-js.git", diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index 7842351d..2e537292 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -149,6 +149,7 @@ export class WebSocketTransport< currentUrl = '' lastMessageReceivedAt = 0 connectionOpenedAt = 0 + streamHandlerInvocationsWithNoConnection = 0 constructor(private config: WebSocketTransportConfig) { super() @@ -328,11 +329,6 @@ export class WebSocketTransport< return } - // We want to check if the URL we calculate is different from the one currently connected. - // This is because some providers handle subscriptions on the URLs and not through messages. - const urlFromConfig = await this.config.url(context, subscriptions.desired) - const urlChanged = this.currentUrl !== urlFromConfig - // We want to check that if we have a connection, it hasn't gone stale. That is, // since opening it, have we had any activity from the provider. const now = Date.now() @@ -351,14 +347,29 @@ export class WebSocketTransport< timeSinceLastActivity: ${timeSinceLastActivity} | subscriptionUnresponsiveTtl: ${context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL} | connectionUnresponsive: ${connectionUnresponsive} | - `) + `) + + // The var connectionUnresponsive checks whether the time since last activity on + // _any_ successful open connection (or 0 if we haven't had one yet) has exceeded + // WS_SUBSCRIPTION_UNRESPONSIVE_TTL. There is interplay with WS_SUBSCRIPTION_TTL + // to determine minimum TTL of an open connection given no explicit connection errors. + if (connectionUnresponsive) { + this.streamHandlerInvocationsWithNoConnection += 1 + logger.trace(`The connection is unresponsive, incremented streamHandlerIterationsWithNoConnection = ${this.streamHandlerInvocationsWithNoConnection}`) + } + + // We want to check if the URL we calculate is different from the one currently connected. + // This is because some providers handle subscriptions on the URLs and not through messages. + // Subclasses may also implement alternate URL handling logic, + // eg: toggling through multiple possible URLs in case of failure. + const { urlChanged, url } = await this.determineUrlChange(context, subscriptions) // Check if we should close the current connection if (!connectionClosed && (urlChanged || connectionUnresponsive)) { if (urlChanged) { censorLogs(() => logger.debug( - `Websocket url has changed from ${this.currentUrl} to ${urlFromConfig}, closing connection...`, + `Websocket url has changed from ${this.currentUrl} to ${url}, closing connection...`, ), ) } else { @@ -397,7 +408,7 @@ export class WebSocketTransport< logger.debug('No established connection and new subscriptions available, connecting to WS') const options = this.config.options && (await this.config.options(context, subscriptions.desired)) - this.currentUrl = urlFromConfig + this.currentUrl = url // Need to write this now, otherwise there could be messages sent with values before the open handler finishes this.providerDataStreamEstablished = Date.now() @@ -408,7 +419,7 @@ export class WebSocketTransport< subscriptions.new = subscriptions.desired // Connect to the provider - this.wsConnection = await this.establishWsConnection(context, urlFromConfig, options) + this.wsConnection = await this.establishWsConnection(context, url, options) // Now that we successfully opened the connection, we can reset the variables connectionClosed = false @@ -451,6 +462,15 @@ export class WebSocketTransport< return } + async determineUrlChange( + context: EndpointContext, + subscriptions: SubscriptionDeltas> + ): Promise<{ urlChanged: boolean; url: string }> { + const url = await this.config.url(context, subscriptions.desired) + const urlChanged = this.currentUrl !== url + return { urlChanged, url } + } + private rejectionHandler( rejectionFn: (reason?: unknown) => void, handler: (event: E) => Promise, From dbfe0c7e330f695721bd2228b7de1b9d99ef3e6b Mon Sep 17 00:00:00 2001 From: mmcallister-cll <139181225+mmcallister-cll@users.noreply.github.com> Date: Wed, 5 Nov 2025 22:44:52 -0500 Subject: [PATCH 2/6] put back package.json --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 0137e310..7c382cc4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@chainlink/external-adapter-framework", - "version": "2.8.1", + "version": "2.8.0", "main": "dist/index.js", "license": "MIT", "repository": "git://github.com/smartcontractkit/ea-framework-js.git", From 93dcfc889a9dc22e2a4d6e53606acd67d504b28a Mon Sep 17 00:00:00 2001 From: Matthew McAllister Date: Thu, 6 Nov 2025 11:44:48 -0500 Subject: [PATCH 3/6] lint fix --- src/transports/websocket.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index 2e537292..a26581da 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -355,7 +355,9 @@ export class WebSocketTransport< // to determine minimum TTL of an open connection given no explicit connection errors. if (connectionUnresponsive) { this.streamHandlerInvocationsWithNoConnection += 1 - logger.trace(`The connection is unresponsive, incremented streamHandlerIterationsWithNoConnection = ${this.streamHandlerInvocationsWithNoConnection}`) + logger.trace( + `The connection is unresponsive, incremented streamHandlerIterationsWithNoConnection = ${this.streamHandlerInvocationsWithNoConnection}`, + ) } // We want to check if the URL we calculate is different from the one currently connected. @@ -464,7 +466,7 @@ export class WebSocketTransport< async determineUrlChange( context: EndpointContext, - subscriptions: SubscriptionDeltas> + subscriptions: SubscriptionDeltas>, ): Promise<{ urlChanged: boolean; url: string }> { const url = await this.config.url(context, subscriptions.desired) const urlChanged = this.currentUrl !== url From 930f6ac769e9d32506dd99e313141dd382625448 Mon Sep 17 00:00:00 2001 From: Matthew McAllister Date: Thu, 6 Nov 2025 19:34:39 -0500 Subject: [PATCH 4/6] review fix, add urlConfigFunctionParameters to pass state into implementing transport config.url() --- src/transports/websocket.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index a26581da..2244f53d 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -31,6 +31,14 @@ export class WebSocketClassProvider { } } +/** + * Object used to pass partial websocket transport state into url config function. + */ +export interface WebSocketUrlConfigFunctionParameters { + /** Number of times streamHandler was called without a responsive connection */ + streamHandlerInvocationsWithNoConnection: number +} + /** * Config object that is provided to the WebSocketTransport constructor. */ @@ -39,6 +47,7 @@ export interface WebSocketTransportConfig url: ( context: EndpointContext, desiredSubs: TypeFromDefinition[], + urlConfigFunctionParameters: WebSocketUrlConfigFunctionParameters, ) => Promise | string /** Optional parameters used when establishing the WebSocket connection */ @@ -468,7 +477,9 @@ export class WebSocketTransport< context: EndpointContext, subscriptions: SubscriptionDeltas>, ): Promise<{ urlChanged: boolean; url: string }> { - const url = await this.config.url(context, subscriptions.desired) + const url = await this.config.url(context, subscriptions.desired, { + streamHandlerInvocationsWithNoConnection: this.streamHandlerInvocationsWithNoConnection, + }) const urlChanged = this.currentUrl !== url return { urlChanged, url } } From c06a9a3eae39018dd65a21404fb6c6b0fdd9a279 Mon Sep 17 00:00:00 2001 From: Matthew McAllister Date: Thu, 6 Nov 2025 20:54:43 -0500 Subject: [PATCH 5/6] remove determineUrlChanges as it is not needed --- src/transports/websocket.ts | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index 2244f53d..399a4e6c 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -373,7 +373,10 @@ export class WebSocketTransport< // This is because some providers handle subscriptions on the URLs and not through messages. // Subclasses may also implement alternate URL handling logic, // eg: toggling through multiple possible URLs in case of failure. - const { urlChanged, url } = await this.determineUrlChange(context, subscriptions) + const url = await this.config.url(context, subscriptions.desired, { + streamHandlerInvocationsWithNoConnection: this.streamHandlerInvocationsWithNoConnection, + }) + const urlChanged = this.currentUrl !== url // Check if we should close the current connection if (!connectionClosed && (urlChanged || connectionUnresponsive)) { @@ -473,17 +476,6 @@ export class WebSocketTransport< return } - async determineUrlChange( - context: EndpointContext, - subscriptions: SubscriptionDeltas>, - ): Promise<{ urlChanged: boolean; url: string }> { - const url = await this.config.url(context, subscriptions.desired, { - streamHandlerInvocationsWithNoConnection: this.streamHandlerInvocationsWithNoConnection, - }) - const urlChanged = this.currentUrl !== url - return { urlChanged, url } - } - private rejectionHandler( rejectionFn: (reason?: unknown) => void, handler: (event: E) => Promise, From 9ea1ec60a23c95bb49220399b89bee94f930194a Mon Sep 17 00:00:00 2001 From: Matthew McAllister Date: Thu, 6 Nov 2025 20:56:41 -0500 Subject: [PATCH 6/6] put back urlFromConfig varname --- src/transports/websocket.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index 399a4e6c..2a53cfca 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -373,17 +373,17 @@ export class WebSocketTransport< // This is because some providers handle subscriptions on the URLs and not through messages. // Subclasses may also implement alternate URL handling logic, // eg: toggling through multiple possible URLs in case of failure. - const url = await this.config.url(context, subscriptions.desired, { + const urlFromConfig = await this.config.url(context, subscriptions.desired, { streamHandlerInvocationsWithNoConnection: this.streamHandlerInvocationsWithNoConnection, }) - const urlChanged = this.currentUrl !== url + const urlChanged = this.currentUrl !== urlFromConfig // Check if we should close the current connection if (!connectionClosed && (urlChanged || connectionUnresponsive)) { if (urlChanged) { censorLogs(() => logger.debug( - `Websocket url has changed from ${this.currentUrl} to ${url}, closing connection...`, + `Websocket url has changed from ${this.currentUrl} to ${urlFromConfig}, closing connection...`, ), ) } else { @@ -422,7 +422,7 @@ export class WebSocketTransport< logger.debug('No established connection and new subscriptions available, connecting to WS') const options = this.config.options && (await this.config.options(context, subscriptions.desired)) - this.currentUrl = url + this.currentUrl = urlFromConfig // Need to write this now, otherwise there could be messages sent with values before the open handler finishes this.providerDataStreamEstablished = Date.now() @@ -433,7 +433,7 @@ export class WebSocketTransport< subscriptions.new = subscriptions.desired // Connect to the provider - this.wsConnection = await this.establishWsConnection(context, url, options) + this.wsConnection = await this.establishWsConnection(context, urlFromConfig, options) // Now that we successfully opened the connection, we can reset the variables connectionClosed = false