Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ export class WebSocketClassProvider {
}
}

/**
* Object used to pass partial websocket transport state into url config function.
*/
export interface WebSocketUrlConfigFunctionParameters {
/** Number of times streamHandler was called without a responsive connection */
streamHandlerInvocationsWithNoConnection: number
}

/**
* Config object that is provided to the WebSocketTransport constructor.
*/
Expand All @@ -39,6 +47,7 @@ export interface WebSocketTransportConfig<T extends WebsocketTransportGenerics>
url: (
context: EndpointContext<T>,
desiredSubs: TypeFromDefinition<T['Parameters']>[],
urlConfigFunctionParameters: WebSocketUrlConfigFunctionParameters,
) => Promise<string> | string

/** Optional parameters used when establishing the WebSocket connection */
Expand Down Expand Up @@ -149,6 +158,7 @@ export class WebSocketTransport<
currentUrl = ''
lastMessageReceivedAt = 0
connectionOpenedAt = 0
streamHandlerInvocationsWithNoConnection = 0

constructor(private config: WebSocketTransportConfig<T>) {
super()
Expand Down Expand Up @@ -328,11 +338,6 @@ export class WebSocketTransport<
return
}

// We want to check if the URL we calculate is different from the one currently connected.
// This is because some providers handle subscriptions on the URLs and not through messages.
const urlFromConfig = await this.config.url(context, subscriptions.desired)
const urlChanged = this.currentUrl !== urlFromConfig

// We want to check that if we have a connection, it hasn't gone stale. That is,
// since opening it, have we had any activity from the provider.
const now = Date.now()
Expand All @@ -351,7 +356,27 @@ export class WebSocketTransport<
timeSinceLastActivity: ${timeSinceLastActivity} |
subscriptionUnresponsiveTtl: ${context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL} |
connectionUnresponsive: ${connectionUnresponsive} |
`)
`)

// The var connectionUnresponsive checks whether the time since last activity on
// _any_ successful open connection (or 0 if we haven't had one yet) has exceeded
// WS_SUBSCRIPTION_UNRESPONSIVE_TTL. There is interplay with WS_SUBSCRIPTION_TTL
// to determine minimum TTL of an open connection given no explicit connection errors.
if (connectionUnresponsive) {
this.streamHandlerInvocationsWithNoConnection += 1
logger.trace(
`The connection is unresponsive, incremented streamHandlerIterationsWithNoConnection = ${this.streamHandlerInvocationsWithNoConnection}`,
)
}

// We want to check if the URL we calculate is different from the one currently connected.
// This is because some providers handle subscriptions on the URLs and not through messages.
// Subclasses may also implement alternate URL handling logic,
// eg: toggling through multiple possible URLs in case of failure.
const urlFromConfig = await this.config.url(context, subscriptions.desired, {
streamHandlerInvocationsWithNoConnection: this.streamHandlerInvocationsWithNoConnection,
})
const urlChanged = this.currentUrl !== urlFromConfig

// Check if we should close the current connection
if (!connectionClosed && (urlChanged || connectionUnresponsive)) {
Expand Down
Loading