From c691614fc5372a2efd7587e038598cdacd3a0c8f Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Sun, 23 Nov 2025 12:23:23 +0100 Subject: [PATCH 01/13] feat: websocket channel DS --- .../data_sources/websocket_channel.dart | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 packages/ndk/lib/data_layer/data_sources/websocket_channel.dart diff --git a/packages/ndk/lib/data_layer/data_sources/websocket_channel.dart b/packages/ndk/lib/data_layer/data_sources/websocket_channel.dart new file mode 100644 index 000000000..f2db949bf --- /dev/null +++ b/packages/ndk/lib/data_layer/data_sources/websocket_channel.dart @@ -0,0 +1,48 @@ +import 'dart:async'; + +import 'package:web_socket_channel/web_socket_channel.dart'; + +class WebsocketChannelDS { + final WebSocketChannel ws; + final String url; + + final Completer _readyCompleter = Completer(); + + WebsocketChannelDS(this.ws, this.url) { + ws.ready.then((_) { + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + }); + } + + StreamSubscription listen( + void Function(dynamic) onData, { + Function? onError, + void Function()? onDone, + }) { + return ws.stream.listen(onData, onDone: onDone, onError: onError); + } + + void send(dynamic data) { + return ws.sink.add(data); + } + + Future close() { + return ws.sink.close(); + } + + bool isOpen() { + final rdy = _readyCompleter.isCompleted; + final notClosed = ws.closeCode == null; + return rdy && notClosed; + } + + int? closeCode() { + return ws.closeCode; + } + + String? closeReason() { + return ws.closeReason; + } +} From b49714d69483307991e5768ba7223233c13a4d58 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Sun, 23 Nov 2025 12:24:37 +0100 Subject: [PATCH 02/13] fix: websocket optimization --- .../data_layer/data_sources/websocket_client.dart | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/ndk/lib/data_layer/data_sources/websocket_client.dart b/packages/ndk/lib/data_layer/data_sources/websocket_client.dart index d8648994b..c82a282b5 100644 --- a/packages/ndk/lib/data_layer/data_sources/websocket_client.dart +++ b/packages/ndk/lib/data_layer/data_sources/websocket_client.dart @@ -26,20 +26,18 @@ class WebsocketDSClient { } bool isOpen() { - return ws.connection.state == Connected() || - ws.connection.state == Reconnected(); + final state = ws.connection.state; + return state is Connected || state is Reconnected; } int? closeCode() { - return ws.connection.state == Disconnected() - ? (ws.connection.state as Disconnected).code - : null; + final state = ws.connection.state; + return state is Disconnected ? state.code : null; } String? closeReason() { - return ws.connection.state == Disconnected() - ? (ws.connection.state as Disconnected).reason - : null; + final state = ws.connection.state; + return state is Disconnected ? state.reason : null; } } // coverage:ignore-end From 4dddbbce6f4f1fd3fe7426d39e8badba3871e0a1 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Sun, 23 Nov 2025 12:25:31 +0100 Subject: [PATCH 03/13] fix: relay manager optimization --- .../domain_layer/usecases/relay_manager.dart | 63 ++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index cacfb9cfc..d7c2df885 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -8,6 +8,7 @@ import '../../config/bootstrap_relays.dart'; import '../../config/relay_defaults.dart'; import '../../shared/helpers/relay_helper.dart'; import '../../shared/logger/logger.dart'; +import '../../shared/logger/log_level.dart'; import '../../shared/nips/nip01/client_msg.dart'; import '../entities/broadcast_state.dart'; import '../entities/connection_source.dart'; @@ -55,6 +56,9 @@ class RelayManager { Stream> get relayConnectivityChanges => _relayUpdatesStreamController.stream; + Timer? _updateRelayConnectivityTimer; + bool _pendingRelayUpdate = false; + /// Creates a new relay manager. RelayManager({ required this.globalState, @@ -69,6 +73,19 @@ class RelayManager { } void updateRelayConnectivity() { + if (_pendingRelayUpdate) return; + + _pendingRelayUpdate = true; + _updateRelayConnectivityTimer?.cancel(); + _updateRelayConnectivityTimer = Timer(Duration(milliseconds: 100), () { + _pendingRelayUpdate = false; + _relayUpdatesStreamController.add(globalState.relays); + }); + } + + void updateRelayConnectivityImmediate() { + _updateRelayConnectivityTimer?.cancel(); + _pendingRelayUpdate = false; _relayUpdatesStreamController.add(globalState.relays); } @@ -264,18 +281,18 @@ class RelayManager { } void reSubscribeInFlightSubscriptions(RelayConnectivity relayConnectivity) { + final relayUrl = relayConnectivity.url; globalState.inFlightRequests.forEach((key, state) { - state.requests.values - .where((req) => req.url == relayConnectivity.url) - .forEach((req) { - if (!state.request.closeOnEOSE) { - List list = ["REQ", state.id]; - list.addAll(req.filters.map((filter) => filter.toMap())); - - relayConnectivity.stats.activeRequests++; - _sendRaw(relayConnectivity, jsonEncode(list)); - } - }); + if (state.request.closeOnEOSE) return; // Skip early + + final req = state.requests[relayUrl]; + if (req != null) { + List list = ["REQ", state.id]; + list.addAll(req.filters.map((filter) => filter.toMap())); + + relayConnectivity.stats.activeRequests++; + _sendRaw(relayConnectivity, jsonEncode(list)); + } }); } @@ -546,15 +563,13 @@ class RelayManager { /// check if relays for this request are still connected /// if not ignore it and wait for the ones still alive to receive EOSE - final listOfRelaysForThisRequest = state.requests.keys.toList(); - final myNotConnectedRelays = globalState.relays.keys - .where((url) => listOfRelaysForThisRequest.contains(url)) - .where((url) => !isRelayConnected(url)) - .toList(); + final requestRelayUrls = state.requests.keys.toSet(); + final notConnectedRelays = + requestRelayUrls.where((url) => !isRelayConnected(url)).toSet(); final bool didAllRelaysFinish = state.requests.values.every( (element) => - element.receivedEOSE || myNotConnectedRelays.contains(element.url), + element.receivedEOSE || notConnectedRelays.contains(element.url), ); if (didAllRelaysFinish) { @@ -581,22 +596,14 @@ class RelayManager { } void _logActiveRequests() { - // Map kindsMap = {}; + // Skip expensive iteration if debug logging is not enabled + if (!LogLevel.debug.shouldLog(Logger.log.level)) return; + Map namesMap = {}; globalState.inFlightRequests.forEach((key, state) { - // int? kind; - // if (state.requests.isNotEmpty && - // state.requests.values.first.filters.first.kinds != null && - // state.requests.values.first.filters.first.kinds!.isNotEmpty) { - // kind = state.requests.values.first.filters.first.kinds!.first; - // } - // int? kindCount = kindsMap[kind]; int? nameCount = namesMap[state.request.name]; - // kindCount ??= 0; - // kindCount++; nameCount ??= 0; nameCount++; - // kindsMap[kind] = kindCount; namesMap[state.request.name] = nameCount; }); Logger.log.d( From cd66030c4944e36dd83f2b56b5baa69bb05b50ce Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Sun, 23 Nov 2025 12:26:39 +0100 Subject: [PATCH 04/13] perf: skip unnecessary loop pubkey strategy --- .../relay_jit_pubkey_strategy.dart | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart index e3e55bc22..aad24caf5 100644 --- a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart +++ b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart @@ -34,7 +34,7 @@ import '../../user_relay_lists/user_relay_lists.dart'; /// class RelayJitPubkeyStrategy with Logger { - static void handleRequest({ + static Future handleRequest({ required RequestState requestState, required GlobalState globalState, @@ -51,7 +51,7 @@ class RelayJitPubkeyStrategy with Logger { required ReadWriteMarker direction, required List ignoreRelays, required RelayManager relayManager, - }) { + }) async { List combindedPubkeys = [ ...?filter.authors, ...?filter.pTags @@ -60,16 +60,16 @@ class RelayJitPubkeyStrategy with Logger { // init coveragePubkeys List coveragePubkeys = []; - for (var pubkey in combindedPubkeys) { + for (final pubkey in combindedPubkeys) { coveragePubkeys .add(CoveragePubkey(pubkey, desiredCoverage, desiredCoverage)); } // look for connected relays that cover the pubkey - for (var connectedRelay in connectedRelays) { - var coveredPubkeysForRelay = []; + for (final connectedRelay in connectedRelays) { + final coveredPubkeysForRelay = []; - for (var coveragePubkey in coveragePubkeys) { + for (final coveragePubkey in coveragePubkeys) { if (JitEngine.doesRelayCoverPubkey( connectedRelay, coveragePubkey.pubkey, direction)) { coveredPubkeysForRelay.add(coveragePubkey.pubkey); @@ -93,18 +93,18 @@ class RelayJitPubkeyStrategy with Logger { globalState, relayManager, ); - - // clear out fully covered pubkeys - _removeFullyCoveredPubkeys(coveragePubkeys); } + // clear out fully covered pubkeys after processing all connected relays + _removeFullyCoveredPubkeys(coveragePubkeys); + if (coveragePubkeys.isEmpty) { // we are done // all pubkeys are covered by already connected relays return; } - _findRelaysForUnresolvedPubkeys( + await _findRelaysForUnresolvedPubkeys( requestState: requestState, globalState: globalState, relayManger: relayManager, @@ -141,7 +141,7 @@ class RelayJitPubkeyStrategy with Logger { // looks in nip65 data for not covered pubkeys // the result is relay candidates // connects to these candidates and sends out the request - static void _findRelaysForUnresolvedPubkeys({ + static Future _findRelaysForUnresolvedPubkeys({ required RelayManager relayManger, required RequestState requestState, required GlobalState globalState, From 914ce825483f47d12c76e6d7ba3094301ec61c33 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Sun, 23 Nov 2025 12:28:02 +0100 Subject: [PATCH 05/13] disable code to expose the bug --- .../websocket_client_nostr_transport.dart | 46 +++++++++---------- ...socket_client_nostr_transport_factory.dart | 4 ++ 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart index cbb66dd44..deed9999d 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart @@ -4,6 +4,7 @@ import 'package:web_socket_client/web_socket_client.dart'; import '../../../domain_layer/repositories/nostr_transport.dart'; import '../../../shared/logger/logger.dart'; +import '../../data_sources/websocket_channel.dart'; import '../../data_sources/websocket_client.dart'; /// A WebSocket-based implementation of the NostrTransport interface. @@ -22,30 +23,27 @@ class WebSocketClientNostrTransport implements NostrTransport { WebSocketClientNostrTransport(this._websocketDS, [Function? onReconnect]) { Completer completer = Completer(); ready = completer.future; - _stateStreamSubscription = _websocketDS.ws.connection.listen((state) { - Logger.log.t("${_websocketDS.url} connection state changed to $state"); - switch (state) { - case Connected() || Reconnected(): - completer.complete(); - if (state == Reconnected() && onReconnect != null) { - onReconnect.call(); - } - break; - case Disconnected(): - completer = Completer(); - ready = completer.future; - break; - case Connecting(): - // Do nothing, just waiting for connection to be established - break; - case Reconnecting(): - // Do nothing, just waiting for reconnection to be established - break; - default: - Logger.log.w( - "${_websocketDS.url} connection state changed to unknown state: $state"); - } - }); + + ///! this code is causing the performance issue, or upstream by onReconnect() + // _stateStreamSubscription = _websocketDS.ws.connection.listen((state) { + // Logger.log.t("${_websocketDS.url} connection state changed to $state"); + // if (state is Connected || state is Reconnected) { + // if (!completer.isCompleted) { + // completer.complete(); + // } + // if (state is Reconnected && onReconnect != null) { + // onReconnect.call(); + // } + // } else if (state is Disconnected) { + // completer = Completer(); + // ready = completer.future; + // } else if (state is Connecting || state is Reconnecting) { + // // Do nothing, just waiting for (re)connection to be established + // } else { + // Logger.log.w( + // "${_websocketDS.url} connection state changed to unknown state: $state"); + // } + // }); } /// A Future that completes when the WebSocket connection is ready. diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart index 207f59cd4..e1114e99d 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart @@ -1,8 +1,10 @@ import 'package:ndk/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_client/web_socket_client.dart'; import '../../../domain_layer/repositories/nostr_transport.dart'; import '../../../shared/helpers/relay_helper.dart'; +import '../../data_sources/websocket_channel.dart'; import '../../data_sources/websocket_client.dart'; class WebSocketClientNostrTransportFactory implements NostrTransportFactory { @@ -19,6 +21,8 @@ class WebSocketClientNostrTransportFactory implements NostrTransportFactory { final client = WebSocket(Uri.parse(myUrl), backoff: backoff); final WebsocketDSClient myDataSource = WebsocketDSClient(client, myUrl); + // final WebsocketChannelDS myDataSource = + // WebsocketChannelDS(WebSocketChannel.connect(Uri.parse(myUrl)), myUrl); return WebSocketClientNostrTransport(myDataSource, onReconnect); } } From 78f40f990013f00c19f528ff5f660a55f870b473 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Mon, 24 Nov 2025 12:22:21 +0100 Subject: [PATCH 06/13] experiment: isolate websocket, disabled calcId --- .../websocket_client_nostr_transport.dart | 39 +- ...socket_client_nostr_transport_factory.dart | 13 +- .../websocket_isolate_nostr_transport.dart | 375 ++++++++++++++++++ .../domain_layer/entities/nip_01_event.dart | 3 +- .../domain_layer/usecases/relay_manager.dart | 57 ++- 5 files changed, 443 insertions(+), 44 deletions(-) create mode 100644 packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart index deed9999d..cee7a44de 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart @@ -24,26 +24,25 @@ class WebSocketClientNostrTransport implements NostrTransport { Completer completer = Completer(); ready = completer.future; - ///! this code is causing the performance issue, or upstream by onReconnect() - // _stateStreamSubscription = _websocketDS.ws.connection.listen((state) { - // Logger.log.t("${_websocketDS.url} connection state changed to $state"); - // if (state is Connected || state is Reconnected) { - // if (!completer.isCompleted) { - // completer.complete(); - // } - // if (state is Reconnected && onReconnect != null) { - // onReconnect.call(); - // } - // } else if (state is Disconnected) { - // completer = Completer(); - // ready = completer.future; - // } else if (state is Connecting || state is Reconnecting) { - // // Do nothing, just waiting for (re)connection to be established - // } else { - // Logger.log.w( - // "${_websocketDS.url} connection state changed to unknown state: $state"); - // } - // }); + _stateStreamSubscription = _websocketDS.ws.connection.listen((state) { + Logger.log.t("${_websocketDS.url} connection state changed to $state"); + if (state is Connected || state is Reconnected) { + if (!completer.isCompleted) { + completer.complete(); + } + if (state is Reconnected && onReconnect != null) { + onReconnect.call(); + } + } else if (state is Disconnected) { + completer = Completer(); + ready = completer.future; + } else if (state is Connecting || state is Reconnecting) { + // Do nothing, just waiting for (re)connection to be established + } else { + Logger.log.w( + "${_websocketDS.url} connection state changed to unknown state: $state"); + } + }); } /// A Future that completes when the WebSocket connection is ready. diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart index e1114e99d..92d96ae68 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart @@ -1,4 +1,5 @@ import 'package:ndk/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart'; +import 'package:ndk/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_client/web_socket_client.dart'; @@ -8,6 +9,10 @@ import '../../data_sources/websocket_channel.dart'; import '../../data_sources/websocket_client.dart'; class WebSocketClientNostrTransportFactory implements NostrTransportFactory { + final bool useIsolate; + + WebSocketClientNostrTransportFactory({this.useIsolate = true}); + @override NostrTransport call(String url, Function? onReconnect) { final myUrl = cleanRelayUrl(url); @@ -16,13 +21,17 @@ class WebSocketClientNostrTransportFactory implements NostrTransportFactory { throw Exception("relayUrl is not parsable"); } + // Use isolate-based transport for better performance + if (useIsolate) { + return WebSocketIsolateNostrTransport(myUrl, onReconnect); + } + + // Fallback to regular transport final backoff = BinaryExponentialBackoff( initial: Duration(seconds: 1), maximumStep: 10); final client = WebSocket(Uri.parse(myUrl), backoff: backoff); final WebsocketDSClient myDataSource = WebsocketDSClient(client, myUrl); - // final WebsocketChannelDS myDataSource = - // WebsocketChannelDS(WebSocketChannel.connect(Uri.parse(myUrl)), myUrl); return WebSocketClientNostrTransport(myDataSource, onReconnect); } } diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart new file mode 100644 index 000000000..05981081e --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart @@ -0,0 +1,375 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:isolate'; +import 'package:ndk/domain_layer/repositories/nostr_transport.dart'; +import 'package:ndk/shared/logger/logger.dart'; + +import 'package:web_socket_client/web_socket_client.dart'; + +enum NostrMessageRawType { + notice, + event, + eose, + ok, + closed, + auth, + unknown, +} + +class Nip01EventRaw { + final String id; + + final String pubKey; + + final int createdAt; + + final int kind; + + final List> tags; + + final String content; + + final String sig; + + Nip01EventRaw({ + required this.id, + required this.pubKey, + required this.createdAt, + required this.kind, + required this.tags, + required this.content, + required this.sig, + }); +} + +class NostrMessageRaw { + final NostrMessageRawType type; + final Nip01EventRaw? nip01Event; + final String? requestId; + final dynamic otherData; + + NostrMessageRaw({ + required this.type, + this.nip01Event, + this.requestId, + this.otherData, + }); +} + +class WebSocketIsolateNostrTransport implements NostrTransport { + final String url; + final Function? onReconnect; + final Completer _readyCompleter = Completer(); + final StreamController _messageController = + StreamController.broadcast(); + + late final ReceivePort _receivePort; + SendPort? _isolateSendPort; + Isolate? _isolate; + + int? _closeCode; + String? _closeReason; + bool _isOpen = false; + bool _isInitialized = false; + + WebSocketIsolateNostrTransport(this.url, this.onReconnect) { + _initialize(); + } + + Future _initialize() async { + if (_isInitialized) return; + _isInitialized = true; + + _receivePort = ReceivePort(); + + try { + _isolate = await Isolate.spawn( + _isolateEntry, + _IsolateStartupData( + sendPort: _receivePort.sendPort, + url: url, + ), + ); + + _receivePort.listen((message) { + _handleIsolateMessage(message); + }); + } catch (e) { + Logger.log.e("Failed to spawn isolate for $url: $e"); + if (!_readyCompleter.isCompleted) { + _readyCompleter.completeError(e); + } + } + } + + void _handleIsolateMessage(dynamic message) { + if (message is SendPort) { + _isolateSendPort = message; + return; + } + + if (message is Map) { + switch (message['type']) { + case 'ready': + _isOpen = true; + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + break; + + case 'message': + // Message is raw string from WebSocket + _messageController.add(message['data']); + break; + + case 'reconnecting': + Logger.log.i("WebSocket reconnecting: $url"); + if (onReconnect != null) { + onReconnect!(); + } + break; + + case 'error': + Logger.log.e("WebSocket error from isolate: ${message['error']}"); + _messageController.addError(message['error']); + break; + + case 'done': + _closeCode = message['closeCode']; + _closeReason = message['closeReason']; + _isOpen = false; + if (!_messageController.isClosed) { + _messageController.close(); + } + break; + } + } + } + + @override + Future get ready => _readyCompleter.future; + + @override + bool isOpen() => _isOpen; + + @override + int? closeCode() => _closeCode; + + @override + String? closeReason() => _closeReason; + + @override + void send(dynamic data) { + if (_isolateSendPort != null && _isOpen) { + _isolateSendPort!.send({ + 'command': 'send', + 'data': data, + }); + } else { + Logger.log.w("Attempted to send on closed/unready WebSocket: $url"); + } + } + + @override + Stream get stream => _messageController.stream; + + @override + Future close() async { + if (_isolateSendPort != null) { + _isolateSendPort!.send({'command': 'close'}); + } + + await Future.delayed(Duration(milliseconds: 100)); + + if (!_messageController.isClosed) { + await _messageController.close(); + } + _receivePort.close(); + _isolate?.kill(priority: Isolate.immediate); + _isolate = null; + } + + @override + StreamSubscription listen( + void Function(NostrMessageRaw) onData, { + Function? onError, + void Function()? onDone, + }) { + return _messageController.stream + .listen(onData, onError: onError, onDone: onDone); + } + + @override + set ready(Future value) { + // No-op: ready is managed internally. + } +} + +class _IsolateStartupData { + final SendPort sendPort; + final String url; + + _IsolateStartupData({ + required this.sendPort, + required this.url, + }); +} + +void _isolateEntry(_IsolateStartupData startupData) { + _WebSocketIsolateWorker(startupData.sendPort, startupData.url); +} + +class _WebSocketIsolateWorker { + final SendPort _sendPort; + final String _url; + late final ReceivePort _receivePort; + + _WebSocketIsolateWorker(this._sendPort, this._url) { + _receivePort = ReceivePort(); + _sendPort.send(_receivePort.sendPort); + _connect(); + } + + void _connect() async { + // Import web_socket_client in the isolate context + final webSocket = await _createWebSocket(); + + webSocket.connection.listen( + (state) { + if (state is Connected) { + _sendPort.send({'type': 'ready'}); + } else if (state is Reconnecting) { + _sendPort.send({'type': 'reconnecting'}); + } else if (state is Disconnected) { + _sendPort.send({ + 'type': 'done', + 'closeCode': null, + 'closeReason': 'Disconnected', + }); + } + }, + onError: (error) { + _sendPort.send({ + 'type': 'error', + 'error': error.toString(), + }); + }, + ); + + webSocket.messages.listen( + (message) { + final eventJson = json.decode(message); + final NostrMessageRaw data; + + switch (eventJson[0]) { + case 'NOTICE': + data = NostrMessageRaw( + type: NostrMessageRawType.notice, + otherData: eventJson, + ); + break; + case 'EVENT': + Nip01EventRaw? nip01Event; + try { + final eventData = eventJson[2]; + nip01Event = Nip01EventRaw( + id: eventData['id'], + pubKey: eventData['pubkey'], + createdAt: eventData['created_at'], + kind: eventData['kind'], + tags: List>.from( + (eventData['tags'] as List).map( + (tag) => List.from(tag), + ), + ), + content: eventData['content'], + sig: eventData['sig'], + ); + } catch (e) { + nip01Event = null; + } + + data = NostrMessageRaw( + type: NostrMessageRawType.event, + requestId: eventJson[1], + nip01Event: nip01Event, + otherData: nip01Event == null ? eventJson : null, + ); + + break; + case 'EOSE': + data = NostrMessageRaw( + type: NostrMessageRawType.eose, otherData: eventJson); + break; + case 'OK': + data = NostrMessageRaw( + type: NostrMessageRawType.ok, + otherData: eventJson, + ); + break; + case 'CLOSED': + data = NostrMessageRaw( + type: NostrMessageRawType.closed, + otherData: eventJson, + ); + break; + case 'AUTH': + data = NostrMessageRaw( + type: NostrMessageRawType.auth, + otherData: eventJson, + ); + break; + default: + data = NostrMessageRaw( + type: NostrMessageRawType.unknown, + otherData: eventJson, + ); + break; + } + + // Send raw message to main isolate + _sendPort.send({ + 'type': 'message', + 'data': data, + }); + }, + onError: (error) { + _sendPort.send({ + 'type': 'error', + 'error': error.toString(), + }); + }, + onDone: () { + _sendPort.send({ + 'type': 'done', + 'closeCode': null, + 'closeReason': "Done", + }); + }, + ); + + // Listen for commands from main isolate + _receivePort.listen((message) { + if (message is Map) { + switch (message['command']) { + case 'send': + webSocket.send(message['data']); + break; + case 'close': + webSocket.close(); + _receivePort.close(); + break; + } + } + }); + } + + Future _createWebSocket() async { + final backoff = BinaryExponentialBackoff( + initial: Duration(seconds: 1), + maximumStep: 10, + ); + + return WebSocket(Uri.parse(_url), backoff: backoff); + } +} diff --git a/packages/ndk/lib/domain_layer/entities/nip_01_event.dart b/packages/ndk/lib/domain_layer/entities/nip_01_event.dart index 7cab47aeb..3215f2980 100644 --- a/packages/ndk/lib/domain_layer/entities/nip_01_event.dart +++ b/packages/ndk/lib/domain_layer/entities/nip_01_event.dart @@ -30,7 +30,7 @@ class Nip01Event { this.createdAt = (createdAt == 0) ? DateTime.now().millisecondsSinceEpoch ~/ 1000 : createdAt; - id = _calculateId(pubKey, this.createdAt, kind, tags, content); + //id = _calculateId(pubKey, this.createdAt, kind, tags, content); } Nip01Event._( @@ -135,6 +135,7 @@ class Nip01Event { static String _calculateId(String publicKey, int createdAt, int kind, List tags, String content) { + print("Calculating id with: $publicKey, $createdAt, $kind, $tags"); final jsonData = json.encode([0, publicKey, createdAt, kind, tags, content]); final bytes = utf8.encode(jsonData); diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index d7c2df885..2f97cdbd2 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -6,6 +6,7 @@ import 'package:rxdart/rxdart.dart'; import '../../config/bootstrap_relays.dart'; import '../../config/relay_defaults.dart'; +import '../../data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart'; import '../../shared/helpers/relay_helper.dart'; import '../../shared/logger/logger.dart'; import '../../shared/logger/log_level.dart'; @@ -310,6 +311,8 @@ class RelayManager { /// wait until rdy await relayConnectivity.relayTransport!.ready; + //await Future.delayed(Duration(seconds: 10)); + final String encodedMsg = jsonEncode(msg.toJson()); _sendRaw(relayConnectivity, encodedMsg); } @@ -407,16 +410,16 @@ class RelayManager { void _handleIncomingMessage( dynamic message, RelayConnectivity relayConnectivity) { - List eventJson; - try { - eventJson = json.decode(message); - } on FormatException catch (e) { - Logger.log.e( - "FormatException in _handleIncomingMessage for relay ${relayConnectivity.url}: $e, message: $message"); + if (message is! NostrMessageRaw) { + Logger.log.w( + "Received non NostrMessageRaw message from ${relayConnectivity.url}: $message"); return; } - if (eventJson[0] == 'OK') { + final myMsg = message; + final eventJson = myMsg.otherData; + + if (myMsg.type == NostrMessageRawType.ok) { //nip 20 used to notify clients if an EVENT was successful if (eventJson.length >= 2 && eventJson[2] == false) { Logger.log.e("NOT OK from ${relayConnectivity.url}: $eventJson"); @@ -438,22 +441,22 @@ class RelayManager { } return; } - if (eventJson[0] == 'NOTICE') { + if (myMsg.type == NostrMessageRawType.notice) { Logger.log.w("NOTICE from ${relayConnectivity.url}: ${eventJson[1]}"); _logActiveRequests(); - } else if (eventJson[0] == 'EVENT') { + } else if (myMsg.type == NostrMessageRawType.event) { _handleIncomingEvent( - eventJson, relayConnectivity, message.toString().codeUnits.length); + myMsg, relayConnectivity, message.toString().codeUnits.length); Logger.log.t("EVENT from ${relayConnectivity.url}: $eventJson"); - } else if (eventJson[0] == 'EOSE') { + } else if (myMsg.type == NostrMessageRawType.eose) { Logger.log.d("EOSE from ${relayConnectivity.url}: ${eventJson[1]}"); _handleEOSE(eventJson, relayConnectivity); - } else if (eventJson[0] == 'CLOSED') { + } else if (myMsg.type == NostrMessageRawType.closed) { Logger.log.w( " CLOSED subscription url: ${relayConnectivity.url} id: ${eventJson[1]} msg: ${eventJson.length > 2 ? eventJson[2] : ''}"); _handleClosed(eventJson, relayConnectivity); } - if (eventJson[0] == ClientMsgType.kAuth) { + if (myMsg.type == NostrMessageRawType.auth) { // nip 42 used to send authentication challenges final challenge = eventJson[1]; Logger.log.d("AUTH: $challenge"); @@ -480,23 +483,35 @@ class RelayManager { // } } - void _handleIncomingEvent(List eventJson, - RelayConnectivity connectivity, int messageSize) { - var id = eventJson[1]; - if (globalState.inFlightRequests[id] == null) { + void _handleIncomingEvent( + NostrMessageRaw nostrMsgRaw, + RelayConnectivity connectivity, + int messageSize, + ) { + final requestId = nostrMsgRaw.requestId!; + final eventRaw = nostrMsgRaw.nip01Event!; + if (globalState.inFlightRequests[requestId] == null) { Logger.log.w( - "RECEIVED EVENT from ${connectivity.url} for id $id, not in globalState inFlightRequests. Likely data after EOSE on a query"); + "RECEIVED EVENT from ${connectivity.url} for id $requestId, not in globalState inFlightRequests. Likely data after EOSE on a query"); return; } - Nip01Event event = Nip01Event.fromJson(eventJson[2]); + Nip01Event event = Nip01Event( + pubKey: eventRaw.pubKey, + createdAt: eventRaw.createdAt, + kind: eventRaw.kind, + tags: eventRaw.tags, + content: eventRaw.content, + ); + event.sig = eventRaw.sig; + event.id = eventRaw.id; connectivity.stats.incStatsByNewEvent(event, messageSize); - RequestState? state = globalState.inFlightRequests[id]; + RequestState? state = globalState.inFlightRequests[requestId]; if (state != null) { RelayRequestState? request = state.requests[connectivity.url]; if (request == null) { - Logger.log.w("No RelayRequestState found for id $id"); + Logger.log.w("No RelayRequestState found for id $requestId"); return; } event.sources.add(connectivity.url); From f0b3283574afbe7b834873407af50a4f6704c774 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Tue, 25 Nov 2025 10:12:44 +0100 Subject: [PATCH 07/13] experiment: single isolate --- .../websocket_isolate_nostr_transport.dart | 321 +++++++++++------- 1 file changed, 206 insertions(+), 115 deletions(-) diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart index 05981081e..30716aafc 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart @@ -56,46 +56,38 @@ class NostrMessageRaw { }); } -class WebSocketIsolateNostrTransport implements NostrTransport { - final String url; - final Function? onReconnect; - final Completer _readyCompleter = Completer(); - final StreamController _messageController = - StreamController.broadcast(); +/// Singleton manager for the shared WebSocket isolate +class _WebSocketIsolateManager { + static _WebSocketIsolateManager? _instance; + static _WebSocketIsolateManager get instance { + _instance ??= _WebSocketIsolateManager._(); + return _instance!; + } - late final ReceivePort _receivePort; - SendPort? _isolateSendPort; Isolate? _isolate; + SendPort? _isolateSendPort; + final ReceivePort _receivePort = ReceivePort(); + final Completer _readyCompleter = Completer(); + final Map> _connectionControllers = + {}; + int _nextConnectionId = 0; - int? _closeCode; - String? _closeReason; - bool _isOpen = false; - bool _isInitialized = false; - - WebSocketIsolateNostrTransport(this.url, this.onReconnect) { + _WebSocketIsolateManager._() { _initialize(); } Future _initialize() async { - if (_isInitialized) return; - _isInitialized = true; - - _receivePort = ReceivePort(); - try { _isolate = await Isolate.spawn( _isolateEntry, - _IsolateStartupData( - sendPort: _receivePort.sendPort, - url: url, - ), + _receivePort.sendPort, ); _receivePort.listen((message) { _handleIsolateMessage(message); }); } catch (e) { - Logger.log.e("Failed to spawn isolate for $url: $e"); + Logger.log.e("Failed to spawn shared WebSocket isolate: $e"); if (!_readyCompleter.isCompleted) { _readyCompleter.completeError(e); } @@ -105,47 +97,135 @@ class WebSocketIsolateNostrTransport implements NostrTransport { void _handleIsolateMessage(dynamic message) { if (message is SendPort) { _isolateSendPort = message; + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } return; } if (message is Map) { - switch (message['type']) { - case 'ready': - _isOpen = true; - if (!_readyCompleter.isCompleted) { - _readyCompleter.complete(); - } - break; + final connectionId = message['connectionId'] as String?; + if (connectionId == null) return; - case 'message': - // Message is raw string from WebSocket - _messageController.add(message['data']); - break; + final controller = _connectionControllers[connectionId]; + if (controller == null) return; - case 'reconnecting': - Logger.log.i("WebSocket reconnecting: $url"); - if (onReconnect != null) { - onReconnect!(); - } + switch (message['type']) { + case 'message': + controller.add(message['data']); break; - case 'error': - Logger.log.e("WebSocket error from isolate: ${message['error']}"); - _messageController.addError(message['error']); + controller.addError(message['error']); break; - case 'done': - _closeCode = message['closeCode']; - _closeReason = message['closeReason']; - _isOpen = false; - if (!_messageController.isClosed) { - _messageController.close(); + if (!controller.isClosed) { + controller.close(); } break; } } } + String _registerConnection(StreamController controller) { + final id = 'conn_${_nextConnectionId++}'; + _connectionControllers[id] = controller; + return id; + } + + void _unregisterConnection(String connectionId) { + _connectionControllers.remove(connectionId); + } + + Future get ready => _readyCompleter.future; + + void sendCommand(String connectionId, Map command) { + if (_isolateSendPort != null) { + _isolateSendPort!.send({ + 'connectionId': connectionId, + ...command, + }); + } + } + + Future dispose() async { + for (final controller in _connectionControllers.values) { + if (!controller.isClosed) { + await controller.close(); + } + } + _connectionControllers.clear(); + _receivePort.close(); + _isolate?.kill(priority: Isolate.immediate); + _isolate = null; + _instance = null; + } +} + +class WebSocketIsolateNostrTransport implements NostrTransport { + final String url; + final Function? onReconnect; + final Completer _readyCompleter = Completer(); + final StreamController _messageController = + StreamController.broadcast(); + + late final String _connectionId; + final _WebSocketIsolateManager _manager = _WebSocketIsolateManager.instance; + + int? _closeCode; + String? _closeReason; + bool _isOpen = false; + bool _isInitialized = false; + + WebSocketIsolateNostrTransport(this.url, this.onReconnect) { + _initialize(); + } + + Future _initialize() async { + if (_isInitialized) return; + _isInitialized = true; + + try { + await _manager.ready; + + _connectionId = _manager._registerConnection(_messageController); + + // Listen to messages for connection state changes + _messageController.stream.listen( + (message) { + // Check for ready/reconnecting messages + if (message.type == NostrMessageRawType.unknown && + message.otherData is Map) { + final data = message.otherData as Map; + if (data['_state'] == 'ready') { + _isOpen = true; + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + } else if (data['_state'] == 'reconnecting') { + Logger.log.i("WebSocket reconnecting: $url"); + if (onReconnect != null) { + onReconnect!(); + } + } + } + }, + onDone: () { + _isOpen = false; + }, + ); + + _manager.sendCommand(_connectionId, { + 'command': 'connect', + 'url': url, + }); + } catch (e) { + Logger.log.e("Failed to initialize WebSocket for $url: $e"); + if (!_readyCompleter.isCompleted) { + _readyCompleter.completeError(e); + } + } + } + @override Future get ready => _readyCompleter.future; @@ -160,8 +240,8 @@ class WebSocketIsolateNostrTransport implements NostrTransport { @override void send(dynamic data) { - if (_isolateSendPort != null && _isOpen) { - _isolateSendPort!.send({ + if (_isOpen) { + _manager.sendCommand(_connectionId, { 'command': 'send', 'data': data, }); @@ -170,23 +250,19 @@ class WebSocketIsolateNostrTransport implements NostrTransport { } } - @override - Stream get stream => _messageController.stream; - @override Future close() async { - if (_isolateSendPort != null) { - _isolateSendPort!.send({'command': 'close'}); - } + _manager.sendCommand(_connectionId, { + 'command': 'close', + }); await Future.delayed(Duration(milliseconds: 100)); + _manager._unregisterConnection(_connectionId); + if (!_messageController.isClosed) { await _messageController.close(); } - _receivePort.close(); - _isolate?.kill(priority: Isolate.immediate); - _isolate = null; } @override @@ -205,43 +281,78 @@ class WebSocketIsolateNostrTransport implements NostrTransport { } } -class _IsolateStartupData { - final SendPort sendPort; - final String url; - - _IsolateStartupData({ - required this.sendPort, - required this.url, - }); -} - -void _isolateEntry(_IsolateStartupData startupData) { - _WebSocketIsolateWorker(startupData.sendPort, startupData.url); +void _isolateEntry(SendPort mainSendPort) { + _WebSocketIsolateWorker(mainSendPort); } class _WebSocketIsolateWorker { - final SendPort _sendPort; - final String _url; - late final ReceivePort _receivePort; - - _WebSocketIsolateWorker(this._sendPort, this._url) { - _receivePort = ReceivePort(); - _sendPort.send(_receivePort.sendPort); - _connect(); + final SendPort _mainSendPort; + final ReceivePort _receivePort = ReceivePort(); + final Map _connections = {}; + + _WebSocketIsolateWorker(this._mainSendPort) { + _mainSendPort.send(_receivePort.sendPort); + _receivePort.listen(_handleCommand); } - void _connect() async { - // Import web_socket_client in the isolate context - final webSocket = await _createWebSocket(); + void _handleCommand(dynamic message) { + if (message is! Map) return; + + final connectionId = message['connectionId'] as String?; + if (connectionId == null) return; + + final command = message['command'] as String?; + + switch (command) { + case 'connect': + final url = message['url'] as String?; + if (url != null) { + _connect(connectionId, url); + } + break; + case 'send': + final data = message['data']; + _connections[connectionId]?.send(data); + break; + case 'close': + _connections[connectionId]?.close(); + _connections.remove(connectionId); + break; + } + } + + void _connect(String connectionId, String url) async { + final backoff = BinaryExponentialBackoff( + initial: Duration(seconds: 1), + maximumStep: 10, + ); + + final webSocket = WebSocket(Uri.parse(url), backoff: backoff); + _connections[connectionId] = webSocket; webSocket.connection.listen( (state) { if (state is Connected) { - _sendPort.send({'type': 'ready'}); + _mainSendPort.send({ + 'connectionId': connectionId, + 'type': 'message', + 'data': NostrMessageRaw( + type: NostrMessageRawType.unknown, + otherData: {'_state': 'ready'}, + ), + }); } else if (state is Reconnecting) { - _sendPort.send({'type': 'reconnecting'}); + _mainSendPort.send({ + 'connectionId': connectionId, + 'type': 'message', + 'data': NostrMessageRaw( + type: NostrMessageRawType.unknown, + otherData: {'_state': 'reconnecting'}, + ), + }); } else if (state is Disconnected) { - _sendPort.send({ + _mainSendPort.send({ + 'connectionId': connectionId, 'type': 'done', 'closeCode': null, 'closeReason': 'Disconnected', @@ -249,7 +360,8 @@ class _WebSocketIsolateWorker { } }, onError: (error) { - _sendPort.send({ + _mainSendPort.send({ + 'connectionId': connectionId, 'type': 'error', 'error': error.toString(), }); @@ -327,49 +439,28 @@ class _WebSocketIsolateWorker { break; } - // Send raw message to main isolate - _sendPort.send({ + _mainSendPort.send({ + 'connectionId': connectionId, 'type': 'message', 'data': data, }); }, onError: (error) { - _sendPort.send({ + _mainSendPort.send({ + 'connectionId': connectionId, 'type': 'error', 'error': error.toString(), }); }, onDone: () { - _sendPort.send({ + _mainSendPort.send({ + 'connectionId': connectionId, 'type': 'done', 'closeCode': null, 'closeReason': "Done", }); + _connections.remove(connectionId); }, ); - - // Listen for commands from main isolate - _receivePort.listen((message) { - if (message is Map) { - switch (message['command']) { - case 'send': - webSocket.send(message['data']); - break; - case 'close': - webSocket.close(); - _receivePort.close(); - break; - } - } - }); - } - - Future _createWebSocket() async { - final backoff = BinaryExponentialBackoff( - initial: Duration(seconds: 1), - maximumStep: 10, - ); - - return WebSocket(Uri.parse(_url), backoff: backoff); } } From f7634419dd790593995de75fe3e013d63bde1686 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Tue, 25 Nov 2025 18:05:23 +0100 Subject: [PATCH 08/13] enable calculateId --- packages/ndk/lib/domain_layer/entities/nip_01_event.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ndk/lib/domain_layer/entities/nip_01_event.dart b/packages/ndk/lib/domain_layer/entities/nip_01_event.dart index 3215f2980..dd6814689 100644 --- a/packages/ndk/lib/domain_layer/entities/nip_01_event.dart +++ b/packages/ndk/lib/domain_layer/entities/nip_01_event.dart @@ -30,7 +30,7 @@ class Nip01Event { this.createdAt = (createdAt == 0) ? DateTime.now().millisecondsSinceEpoch ~/ 1000 : createdAt; - //id = _calculateId(pubKey, this.createdAt, kind, tags, content); + id = _calculateId(pubKey, this.createdAt, kind, tags, content); } Nip01Event._( From d02b4bb9f8e1811e535872fd2116061183155486 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Tue, 25 Nov 2025 18:25:02 +0100 Subject: [PATCH 09/13] refactor: typesafe isolate --- ...socket_client_nostr_transport_factory.dart | 2 +- .../websocket_isolate_nostr_transport.dart | 304 +++++++++++------- .../domain_layer/usecases/relay_manager.dart | 2 +- 3 files changed, 185 insertions(+), 123 deletions(-) rename packages/ndk/lib/data_layer/repositories/nostr_transport/{ => websocket_isolate}/websocket_isolate_nostr_transport.dart (60%) diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart index 92d96ae68..9ee4cfdc4 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart @@ -1,5 +1,5 @@ import 'package:ndk/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart'; -import 'package:ndk/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart'; +import 'package:ndk/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_client/web_socket_client.dart'; diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart similarity index 60% rename from packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart rename to packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart index 30716aafc..a27cf2626 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart @@ -16,6 +16,7 @@ enum NostrMessageRawType { unknown, } +// needed until Nip01Event is refactored to be immutable class Nip01EventRaw { final String id; @@ -56,6 +57,57 @@ class NostrMessageRaw { }); } +/// Message types for isolate communication +enum _IsolateMessageType { + ready, + reconnecting, + message, + error, + done, +} + +/// Internal message class for communication between main isolate and worker isolate +class _IsolateMessage { + final int connectionId; + final _IsolateMessageType type; + final NostrMessageRaw? data; + final String? error; + final int? closeCode; + final String? closeReason; + + _IsolateMessage({ + required this.connectionId, + required this.type, + this.data, + this.error, + this.closeCode, + this.closeReason, + }); +} + +/// Base class for commands sent from main isolate to worker isolate +abstract class _IsolateCommand { + final int connectionId; + + _IsolateCommand({required this.connectionId}); +} + +class _ConnectCommand extends _IsolateCommand { + final String url; + + _ConnectCommand({required super.connectionId, required this.url}); +} + +class _SendCommand extends _IsolateCommand { + final dynamic data; + + _SendCommand({required super.connectionId, required this.data}); +} + +class _CloseCommand extends _IsolateCommand { + _CloseCommand({required super.connectionId}); +} + /// Singleton manager for the shared WebSocket isolate class _WebSocketIsolateManager { static _WebSocketIsolateManager? _instance; @@ -68,8 +120,8 @@ class _WebSocketIsolateManager { SendPort? _isolateSendPort; final ReceivePort _receivePort = ReceivePort(); final Completer _readyCompleter = Completer(); - final Map> _connectionControllers = - {}; + final Map> _connectionControllers = {}; + final Map _stateCallbacks = {}; int _nextConnectionId = 0; _WebSocketIsolateManager._() { @@ -95,55 +147,68 @@ class _WebSocketIsolateManager { } void _handleIsolateMessage(dynamic message) { - if (message is SendPort) { - _isolateSendPort = message; - if (!_readyCompleter.isCompleted) { - _readyCompleter.complete(); - } - return; - } - - if (message is Map) { - final connectionId = message['connectionId'] as String?; - if (connectionId == null) return; - - final controller = _connectionControllers[connectionId]; + if (message is _IsolateMessage) { + final isolateMsg = message; + final controller = _connectionControllers[isolateMsg.connectionId]; if (controller == null) return; - switch (message['type']) { - case 'message': - controller.add(message['data']); + switch (isolateMsg.type) { + case _IsolateMessageType.message: + if (isolateMsg.data != null) { + controller.add(isolateMsg.data!); + } break; - case 'error': - controller.addError(message['error']); + case _IsolateMessageType.error: + if (isolateMsg.error != null) { + controller.addError(isolateMsg.error!); + } break; - case 'done': + case _IsolateMessageType.done: if (!controller.isClosed) { controller.close(); } break; + case _IsolateMessageType.ready: + case _IsolateMessageType.reconnecting: + // Notify state change via callback + final stateCallback = _stateCallbacks[isolateMsg.connectionId]; + if (stateCallback != null) { + stateCallback(isolateMsg.type); + } + break; + } + return; + } + + if (message is SendPort) { + _isolateSendPort = message; + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); } + return; } } - String _registerConnection(StreamController controller) { - final id = 'conn_${_nextConnectionId++}'; + int _registerConnection( + StreamController controller, + void Function(_IsolateMessageType) onStateChange, + ) { + final id = _nextConnectionId++; _connectionControllers[id] = controller; + _stateCallbacks[id] = onStateChange; return id; } - void _unregisterConnection(String connectionId) { + void _unregisterConnection(int connectionId) { _connectionControllers.remove(connectionId); + _stateCallbacks.remove(connectionId); } Future get ready => _readyCompleter.future; - void sendCommand(String connectionId, Map command) { + void sendCommand(_IsolateCommand command) { if (_isolateSendPort != null) { - _isolateSendPort!.send({ - 'connectionId': connectionId, - ...command, - }); + _isolateSendPort!.send(command); } } @@ -168,7 +233,7 @@ class WebSocketIsolateNostrTransport implements NostrTransport { final StreamController _messageController = StreamController.broadcast(); - late final String _connectionId; + late final int _connectionId; final _WebSocketIsolateManager _manager = _WebSocketIsolateManager.instance; int? _closeCode; @@ -187,37 +252,39 @@ class WebSocketIsolateNostrTransport implements NostrTransport { try { await _manager.ready; - _connectionId = _manager._registerConnection(_messageController); - - // Listen to messages for connection state changes - _messageController.stream.listen( - (message) { - // Check for ready/reconnecting messages - if (message.type == NostrMessageRawType.unknown && - message.otherData is Map) { - final data = message.otherData as Map; - if (data['_state'] == 'ready') { + _connectionId = _manager._registerConnection( + _messageController, + (state) { + // Handle state changes from isolate + switch (state) { + case _IsolateMessageType.ready: _isOpen = true; if (!_readyCompleter.isCompleted) { _readyCompleter.complete(); } - } else if (data['_state'] == 'reconnecting') { + break; + case _IsolateMessageType.reconnecting: Logger.log.i("WebSocket reconnecting: $url"); if (onReconnect != null) { onReconnect!(); } - } + break; + case _IsolateMessageType.done: + _isOpen = false; + break; + case _IsolateMessageType.message: + case _IsolateMessageType.error: + break; } }, - onDone: () { - _isOpen = false; - }, ); - _manager.sendCommand(_connectionId, { - 'command': 'connect', - 'url': url, - }); + _manager.sendCommand( + _ConnectCommand( + connectionId: _connectionId, + url: url, + ), + ); } catch (e) { Logger.log.e("Failed to initialize WebSocket for $url: $e"); if (!_readyCompleter.isCompleted) { @@ -241,10 +308,12 @@ class WebSocketIsolateNostrTransport implements NostrTransport { @override void send(dynamic data) { if (_isOpen) { - _manager.sendCommand(_connectionId, { - 'command': 'send', - 'data': data, - }); + _manager.sendCommand( + _SendCommand( + connectionId: _connectionId, + data: data, + ), + ); } else { Logger.log.w("Attempted to send on closed/unready WebSocket: $url"); } @@ -252,9 +321,11 @@ class WebSocketIsolateNostrTransport implements NostrTransport { @override Future close() async { - _manager.sendCommand(_connectionId, { - 'command': 'close', - }); + _manager.sendCommand( + _CloseCommand( + connectionId: _connectionId, + ), + ); await Future.delayed(Duration(milliseconds: 100)); @@ -288,7 +359,7 @@ void _isolateEntry(SendPort mainSendPort) { class _WebSocketIsolateWorker { final SendPort _mainSendPort; final ReceivePort _receivePort = ReceivePort(); - final Map _connections = {}; + final Map _connections = {}; _WebSocketIsolateWorker(this._mainSendPort) { _mainSendPort.send(_receivePort.sendPort); @@ -296,32 +367,17 @@ class _WebSocketIsolateWorker { } void _handleCommand(dynamic message) { - if (message is! Map) return; - - final connectionId = message['connectionId'] as String?; - if (connectionId == null) return; - - final command = message['command'] as String?; - - switch (command) { - case 'connect': - final url = message['url'] as String?; - if (url != null) { - _connect(connectionId, url); - } - break; - case 'send': - final data = message['data']; - _connections[connectionId]?.send(data); - break; - case 'close': - _connections[connectionId]?.close(); - _connections.remove(connectionId); - break; + if (message is _ConnectCommand) { + _connect(message.connectionId, message.url); + } else if (message is _SendCommand) { + _connections[message.connectionId]?.send(message.data); + } else if (message is _CloseCommand) { + _connections[message.connectionId]?.close(); + _connections.remove(message.connectionId); } } - void _connect(String connectionId, String url) async { + void _connect(int connectionId, String url) async { final backoff = BinaryExponentialBackoff( initial: Duration(seconds: 1), maximumStep: 10, @@ -333,38 +389,38 @@ class _WebSocketIsolateWorker { webSocket.connection.listen( (state) { if (state is Connected) { - _mainSendPort.send({ - 'connectionId': connectionId, - 'type': 'message', - 'data': NostrMessageRaw( - type: NostrMessageRawType.unknown, - otherData: {'_state': 'ready'}, + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.ready, ), - }); + ); } else if (state is Reconnecting) { - _mainSendPort.send({ - 'connectionId': connectionId, - 'type': 'message', - 'data': NostrMessageRaw( - type: NostrMessageRawType.unknown, - otherData: {'_state': 'reconnecting'}, + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.reconnecting, ), - }); + ); } else if (state is Disconnected) { - _mainSendPort.send({ - 'connectionId': connectionId, - 'type': 'done', - 'closeCode': null, - 'closeReason': 'Disconnected', - }); + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.done, + closeCode: null, + closeReason: 'Disconnected', + ), + ); } }, onError: (error) { - _mainSendPort.send({ - 'connectionId': connectionId, - 'type': 'error', - 'error': error.toString(), - }); + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.error, + error: error.toString(), + ), + ); }, ); @@ -439,26 +495,32 @@ class _WebSocketIsolateWorker { break; } - _mainSendPort.send({ - 'connectionId': connectionId, - 'type': 'message', - 'data': data, - }); + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.message, + data: data, + ), + ); }, onError: (error) { - _mainSendPort.send({ - 'connectionId': connectionId, - 'type': 'error', - 'error': error.toString(), - }); + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.error, + error: error.toString(), + ), + ); }, onDone: () { - _mainSendPort.send({ - 'connectionId': connectionId, - 'type': 'done', - 'closeCode': null, - 'closeReason': "Done", - }); + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.done, + closeCode: null, + closeReason: "Done", + ), + ); _connections.remove(connectionId); }, ); diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index 2f97cdbd2..38446a138 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -6,7 +6,7 @@ import 'package:rxdart/rxdart.dart'; import '../../config/bootstrap_relays.dart'; import '../../config/relay_defaults.dart'; -import '../../data_layer/repositories/nostr_transport/websocket_isolate_nostr_transport.dart'; +import '../../data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart'; import '../../shared/helpers/relay_helper.dart'; import '../../shared/logger/logger.dart'; import '../../shared/logger/log_level.dart'; From 57258ddbb104249eadecc5f1fa26c070b76971d9 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Tue, 25 Nov 2025 18:34:19 +0100 Subject: [PATCH 10/13] chore: organize isolate into multiple files --- .../websocket_isolate_entities.dart | 103 +++++ .../websocket_isolate_nostr_transport.dart | 396 +----------------- ...ocket_isolate_nostr_transport_manager.dart | 119 ++++++ ...socket_isolate_nostr_transport_worker.dart | 172 ++++++++ 4 files changed, 399 insertions(+), 391 deletions(-) create mode 100644 packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart create mode 100644 packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart create mode 100644 packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart new file mode 100644 index 000000000..206ef0a2c --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart @@ -0,0 +1,103 @@ +part of 'websocket_isolate_nostr_transport.dart'; + +/// Message types for isolate communication +enum _IsolateMessageType { + ready, + reconnecting, + message, + error, + done, +} + +/// Internal message class for communication between main isolate and worker isolate +class _IsolateMessage { + final int connectionId; + final _IsolateMessageType type; + final NostrMessageRaw? data; + final String? error; + final int? closeCode; + final String? closeReason; + + _IsolateMessage({ + required this.connectionId, + required this.type, + this.data, + this.error, + this.closeCode, + this.closeReason, + }); +} + +/// Base class for commands sent from main isolate to worker isolate +abstract class _IsolateCommand { + final int connectionId; + + _IsolateCommand({required this.connectionId}); +} + +class _ConnectCommand extends _IsolateCommand { + final String url; + + _ConnectCommand({required super.connectionId, required this.url}); +} + +class _SendCommand extends _IsolateCommand { + final dynamic data; + + _SendCommand({required super.connectionId, required this.data}); +} + +class _CloseCommand extends _IsolateCommand { + _CloseCommand({required super.connectionId}); +} + +enum NostrMessageRawType { + notice, + event, + eose, + ok, + closed, + auth, + unknown, +} + +//? needed until Nip01Event is refactored to be immutable +class Nip01EventRaw { + final String id; + + final String pubKey; + + final int createdAt; + + final int kind; + + final List> tags; + + final String content; + + final String sig; + + Nip01EventRaw({ + required this.id, + required this.pubKey, + required this.createdAt, + required this.kind, + required this.tags, + required this.content, + required this.sig, + }); +} + +class NostrMessageRaw { + final NostrMessageRawType type; + final Nip01EventRaw? nip01Event; + final String? requestId; + final dynamic otherData; + + NostrMessageRaw({ + required this.type, + this.nip01Event, + this.requestId, + this.otherData, + }); +} diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart index a27cf2626..9743e7194 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart @@ -1,230 +1,15 @@ import 'dart:async'; import 'dart:convert'; import 'dart:isolate'; -import 'package:ndk/domain_layer/repositories/nostr_transport.dart'; -import 'package:ndk/shared/logger/logger.dart'; import 'package:web_socket_client/web_socket_client.dart'; -enum NostrMessageRawType { - notice, - event, - eose, - ok, - closed, - auth, - unknown, -} - -// needed until Nip01Event is refactored to be immutable -class Nip01EventRaw { - final String id; - - final String pubKey; - - final int createdAt; - - final int kind; - - final List> tags; - - final String content; - - final String sig; - - Nip01EventRaw({ - required this.id, - required this.pubKey, - required this.createdAt, - required this.kind, - required this.tags, - required this.content, - required this.sig, - }); -} - -class NostrMessageRaw { - final NostrMessageRawType type; - final Nip01EventRaw? nip01Event; - final String? requestId; - final dynamic otherData; - - NostrMessageRaw({ - required this.type, - this.nip01Event, - this.requestId, - this.otherData, - }); -} - -/// Message types for isolate communication -enum _IsolateMessageType { - ready, - reconnecting, - message, - error, - done, -} - -/// Internal message class for communication between main isolate and worker isolate -class _IsolateMessage { - final int connectionId; - final _IsolateMessageType type; - final NostrMessageRaw? data; - final String? error; - final int? closeCode; - final String? closeReason; - - _IsolateMessage({ - required this.connectionId, - required this.type, - this.data, - this.error, - this.closeCode, - this.closeReason, - }); -} +import '../../../../domain_layer/repositories/nostr_transport.dart'; +import '../../../../shared/logger/logger.dart'; -/// Base class for commands sent from main isolate to worker isolate -abstract class _IsolateCommand { - final int connectionId; - - _IsolateCommand({required this.connectionId}); -} - -class _ConnectCommand extends _IsolateCommand { - final String url; - - _ConnectCommand({required super.connectionId, required this.url}); -} - -class _SendCommand extends _IsolateCommand { - final dynamic data; - - _SendCommand({required super.connectionId, required this.data}); -} - -class _CloseCommand extends _IsolateCommand { - _CloseCommand({required super.connectionId}); -} - -/// Singleton manager for the shared WebSocket isolate -class _WebSocketIsolateManager { - static _WebSocketIsolateManager? _instance; - static _WebSocketIsolateManager get instance { - _instance ??= _WebSocketIsolateManager._(); - return _instance!; - } - - Isolate? _isolate; - SendPort? _isolateSendPort; - final ReceivePort _receivePort = ReceivePort(); - final Completer _readyCompleter = Completer(); - final Map> _connectionControllers = {}; - final Map _stateCallbacks = {}; - int _nextConnectionId = 0; - - _WebSocketIsolateManager._() { - _initialize(); - } - - Future _initialize() async { - try { - _isolate = await Isolate.spawn( - _isolateEntry, - _receivePort.sendPort, - ); - - _receivePort.listen((message) { - _handleIsolateMessage(message); - }); - } catch (e) { - Logger.log.e("Failed to spawn shared WebSocket isolate: $e"); - if (!_readyCompleter.isCompleted) { - _readyCompleter.completeError(e); - } - } - } - - void _handleIsolateMessage(dynamic message) { - if (message is _IsolateMessage) { - final isolateMsg = message; - final controller = _connectionControllers[isolateMsg.connectionId]; - if (controller == null) return; - - switch (isolateMsg.type) { - case _IsolateMessageType.message: - if (isolateMsg.data != null) { - controller.add(isolateMsg.data!); - } - break; - case _IsolateMessageType.error: - if (isolateMsg.error != null) { - controller.addError(isolateMsg.error!); - } - break; - case _IsolateMessageType.done: - if (!controller.isClosed) { - controller.close(); - } - break; - case _IsolateMessageType.ready: - case _IsolateMessageType.reconnecting: - // Notify state change via callback - final stateCallback = _stateCallbacks[isolateMsg.connectionId]; - if (stateCallback != null) { - stateCallback(isolateMsg.type); - } - break; - } - return; - } - - if (message is SendPort) { - _isolateSendPort = message; - if (!_readyCompleter.isCompleted) { - _readyCompleter.complete(); - } - return; - } - } - - int _registerConnection( - StreamController controller, - void Function(_IsolateMessageType) onStateChange, - ) { - final id = _nextConnectionId++; - _connectionControllers[id] = controller; - _stateCallbacks[id] = onStateChange; - return id; - } - - void _unregisterConnection(int connectionId) { - _connectionControllers.remove(connectionId); - _stateCallbacks.remove(connectionId); - } - - Future get ready => _readyCompleter.future; - - void sendCommand(_IsolateCommand command) { - if (_isolateSendPort != null) { - _isolateSendPort!.send(command); - } - } - - Future dispose() async { - for (final controller in _connectionControllers.values) { - if (!controller.isClosed) { - await controller.close(); - } - } - _connectionControllers.clear(); - _receivePort.close(); - _isolate?.kill(priority: Isolate.immediate); - _isolate = null; - _instance = null; - } -} +part 'websocket_isolate_entities.dart'; +part 'websocket_isolate_nostr_transport_worker.dart'; +part 'websocket_isolate_nostr_transport_manager.dart'; class WebSocketIsolateNostrTransport implements NostrTransport { final String url; @@ -355,174 +140,3 @@ class WebSocketIsolateNostrTransport implements NostrTransport { void _isolateEntry(SendPort mainSendPort) { _WebSocketIsolateWorker(mainSendPort); } - -class _WebSocketIsolateWorker { - final SendPort _mainSendPort; - final ReceivePort _receivePort = ReceivePort(); - final Map _connections = {}; - - _WebSocketIsolateWorker(this._mainSendPort) { - _mainSendPort.send(_receivePort.sendPort); - _receivePort.listen(_handleCommand); - } - - void _handleCommand(dynamic message) { - if (message is _ConnectCommand) { - _connect(message.connectionId, message.url); - } else if (message is _SendCommand) { - _connections[message.connectionId]?.send(message.data); - } else if (message is _CloseCommand) { - _connections[message.connectionId]?.close(); - _connections.remove(message.connectionId); - } - } - - void _connect(int connectionId, String url) async { - final backoff = BinaryExponentialBackoff( - initial: Duration(seconds: 1), - maximumStep: 10, - ); - - final webSocket = WebSocket(Uri.parse(url), backoff: backoff); - _connections[connectionId] = webSocket; - - webSocket.connection.listen( - (state) { - if (state is Connected) { - _mainSendPort.send( - _IsolateMessage( - connectionId: connectionId, - type: _IsolateMessageType.ready, - ), - ); - } else if (state is Reconnecting) { - _mainSendPort.send( - _IsolateMessage( - connectionId: connectionId, - type: _IsolateMessageType.reconnecting, - ), - ); - } else if (state is Disconnected) { - _mainSendPort.send( - _IsolateMessage( - connectionId: connectionId, - type: _IsolateMessageType.done, - closeCode: null, - closeReason: 'Disconnected', - ), - ); - } - }, - onError: (error) { - _mainSendPort.send( - _IsolateMessage( - connectionId: connectionId, - type: _IsolateMessageType.error, - error: error.toString(), - ), - ); - }, - ); - - webSocket.messages.listen( - (message) { - final eventJson = json.decode(message); - final NostrMessageRaw data; - - switch (eventJson[0]) { - case 'NOTICE': - data = NostrMessageRaw( - type: NostrMessageRawType.notice, - otherData: eventJson, - ); - break; - case 'EVENT': - Nip01EventRaw? nip01Event; - try { - final eventData = eventJson[2]; - nip01Event = Nip01EventRaw( - id: eventData['id'], - pubKey: eventData['pubkey'], - createdAt: eventData['created_at'], - kind: eventData['kind'], - tags: List>.from( - (eventData['tags'] as List).map( - (tag) => List.from(tag), - ), - ), - content: eventData['content'], - sig: eventData['sig'], - ); - } catch (e) { - nip01Event = null; - } - - data = NostrMessageRaw( - type: NostrMessageRawType.event, - requestId: eventJson[1], - nip01Event: nip01Event, - otherData: nip01Event == null ? eventJson : null, - ); - - break; - case 'EOSE': - data = NostrMessageRaw( - type: NostrMessageRawType.eose, otherData: eventJson); - break; - case 'OK': - data = NostrMessageRaw( - type: NostrMessageRawType.ok, - otherData: eventJson, - ); - break; - case 'CLOSED': - data = NostrMessageRaw( - type: NostrMessageRawType.closed, - otherData: eventJson, - ); - break; - case 'AUTH': - data = NostrMessageRaw( - type: NostrMessageRawType.auth, - otherData: eventJson, - ); - break; - default: - data = NostrMessageRaw( - type: NostrMessageRawType.unknown, - otherData: eventJson, - ); - break; - } - - _mainSendPort.send( - _IsolateMessage( - connectionId: connectionId, - type: _IsolateMessageType.message, - data: data, - ), - ); - }, - onError: (error) { - _mainSendPort.send( - _IsolateMessage( - connectionId: connectionId, - type: _IsolateMessageType.error, - error: error.toString(), - ), - ); - }, - onDone: () { - _mainSendPort.send( - _IsolateMessage( - connectionId: connectionId, - type: _IsolateMessageType.done, - closeCode: null, - closeReason: "Done", - ), - ); - _connections.remove(connectionId); - }, - ); - } -} diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart new file mode 100644 index 000000000..5a6a5dcea --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart @@ -0,0 +1,119 @@ +part of 'websocket_isolate_nostr_transport.dart'; + +/// Singleton manager for the shared WebSocket isolate +class _WebSocketIsolateManager { + static _WebSocketIsolateManager? _instance; + static _WebSocketIsolateManager get instance { + _instance ??= _WebSocketIsolateManager._(); + return _instance!; + } + + Isolate? _isolate; + SendPort? _isolateSendPort; + final ReceivePort _receivePort = ReceivePort(); + final Completer _readyCompleter = Completer(); + final Map> _connectionControllers = {}; + final Map _stateCallbacks = {}; + int _nextConnectionId = 0; + + _WebSocketIsolateManager._() { + _initialize(); + } + + Future _initialize() async { + try { + _isolate = await Isolate.spawn( + _isolateEntry, + _receivePort.sendPort, + ); + + _receivePort.listen((message) { + _handleIsolateMessage(message); + }); + } catch (e) { + Logger.log.e("Failed to spawn shared WebSocket isolate: $e"); + if (!_readyCompleter.isCompleted) { + _readyCompleter.completeError(e); + } + } + } + + void _handleIsolateMessage(dynamic message) { + if (message is _IsolateMessage) { + final isolateMsg = message; + final controller = _connectionControllers[isolateMsg.connectionId]; + if (controller == null) return; + + switch (isolateMsg.type) { + case _IsolateMessageType.message: + if (isolateMsg.data != null) { + controller.add(isolateMsg.data!); + } + break; + case _IsolateMessageType.error: + if (isolateMsg.error != null) { + controller.addError(isolateMsg.error!); + } + break; + case _IsolateMessageType.done: + if (!controller.isClosed) { + controller.close(); + } + break; + case _IsolateMessageType.ready: + case _IsolateMessageType.reconnecting: + // Notify state change via callback + final stateCallback = _stateCallbacks[isolateMsg.connectionId]; + if (stateCallback != null) { + stateCallback(isolateMsg.type); + } + break; + } + return; + } + + if (message is SendPort) { + _isolateSendPort = message; + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + return; + } + } + + int _registerConnection( + StreamController controller, + void Function(_IsolateMessageType) onStateChange, + ) { + final id = _nextConnectionId++; + _connectionControllers[id] = controller; + _stateCallbacks[id] = onStateChange; + return id; + } + + void _unregisterConnection(int connectionId) { + _connectionControllers.remove(connectionId); + _stateCallbacks.remove(connectionId); + } + + Future get ready => _readyCompleter.future; + + void sendCommand(_IsolateCommand command) { + if (_isolateSendPort != null) { + _isolateSendPort!.send(command); + } + } + + Future dispose() async { + for (final controller in _connectionControllers.values) { + if (!controller.isClosed) { + await controller.close(); + } + } + _connectionControllers.clear(); + _receivePort.close(); + _isolate?.kill(priority: Isolate.immediate); + _isolate = null; + _instance = null; + } +} diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart new file mode 100644 index 000000000..eb683fc22 --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart @@ -0,0 +1,172 @@ +part of 'websocket_isolate_nostr_transport.dart'; + +class _WebSocketIsolateWorker { + final SendPort _mainSendPort; + final ReceivePort _receivePort = ReceivePort(); + final Map _connections = {}; + + _WebSocketIsolateWorker(this._mainSendPort) { + _mainSendPort.send(_receivePort.sendPort); + _receivePort.listen(_handleCommand); + } + + void _handleCommand(dynamic message) { + if (message is _ConnectCommand) { + _connect(message.connectionId, message.url); + } else if (message is _SendCommand) { + _connections[message.connectionId]?.send(message.data); + } else if (message is _CloseCommand) { + _connections[message.connectionId]?.close(); + _connections.remove(message.connectionId); + } + } + + void _connect(int connectionId, String url) async { + final backoff = BinaryExponentialBackoff( + initial: Duration(seconds: 1), + maximumStep: 10, + ); + + final webSocket = WebSocket(Uri.parse(url), backoff: backoff); + _connections[connectionId] = webSocket; + + webSocket.connection.listen( + (state) { + if (state is Connected) { + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.ready, + ), + ); + } else if (state is Reconnecting) { + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.reconnecting, + ), + ); + } else if (state is Disconnected) { + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.done, + closeCode: null, + closeReason: 'Disconnected', + ), + ); + } + }, + onError: (error) { + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.error, + error: error.toString(), + ), + ); + }, + ); + + webSocket.messages.listen( + (message) { + final eventJson = json.decode(message); + final NostrMessageRaw data; + + switch (eventJson[0]) { + case 'NOTICE': + data = NostrMessageRaw( + type: NostrMessageRawType.notice, + otherData: eventJson, + ); + break; + case 'EVENT': + Nip01EventRaw? nip01Event; + try { + final eventData = eventJson[2]; + nip01Event = Nip01EventRaw( + id: eventData['id'], + pubKey: eventData['pubkey'], + createdAt: eventData['created_at'], + kind: eventData['kind'], + tags: List>.from( + (eventData['tags'] as List).map( + (tag) => List.from(tag), + ), + ), + content: eventData['content'], + sig: eventData['sig'], + ); + } catch (e) { + nip01Event = null; + } + + data = NostrMessageRaw( + type: NostrMessageRawType.event, + requestId: eventJson[1], + nip01Event: nip01Event, + otherData: nip01Event == null ? eventJson : null, + ); + + break; + case 'EOSE': + data = NostrMessageRaw( + type: NostrMessageRawType.eose, otherData: eventJson); + break; + case 'OK': + data = NostrMessageRaw( + type: NostrMessageRawType.ok, + otherData: eventJson, + ); + break; + case 'CLOSED': + data = NostrMessageRaw( + type: NostrMessageRawType.closed, + otherData: eventJson, + ); + break; + case 'AUTH': + data = NostrMessageRaw( + type: NostrMessageRawType.auth, + otherData: eventJson, + ); + break; + default: + data = NostrMessageRaw( + type: NostrMessageRawType.unknown, + otherData: eventJson, + ); + break; + } + + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.message, + data: data, + ), + ); + }, + onError: (error) { + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.error, + error: error.toString(), + ), + ); + }, + onDone: () { + _mainSendPort.send( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.done, + closeCode: null, + closeReason: "Done", + ), + ); + _connections.remove(connectionId); + }, + ); + } +} From 9df49d1f85601d40889e5587dc853dca86695752 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Tue, 25 Nov 2025 18:44:23 +0100 Subject: [PATCH 11/13] fix: memory leaks --- ...ocket_isolate_nostr_transport_manager.dart | 6 +++- ...socket_isolate_nostr_transport_worker.dart | 30 +++++++++++++++---- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart index 5a6a5dcea..6e833f4f1 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart @@ -11,6 +11,7 @@ class _WebSocketIsolateManager { Isolate? _isolate; SendPort? _isolateSendPort; final ReceivePort _receivePort = ReceivePort(); + StreamSubscription? _receivePortSubscription; final Completer _readyCompleter = Completer(); final Map> _connectionControllers = {}; final Map _stateCallbacks = {}; @@ -27,7 +28,7 @@ class _WebSocketIsolateManager { _receivePort.sendPort, ); - _receivePort.listen((message) { + _receivePortSubscription = _receivePort.listen((message) { _handleIsolateMessage(message); }); } catch (e) { @@ -111,6 +112,9 @@ class _WebSocketIsolateManager { } } _connectionControllers.clear(); + _stateCallbacks.clear(); + + await _receivePortSubscription?.cancel(); _receivePort.close(); _isolate?.kill(priority: Isolate.immediate); _isolate = null; diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart index eb683fc22..8796c4705 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart @@ -4,6 +4,7 @@ class _WebSocketIsolateWorker { final SendPort _mainSendPort; final ReceivePort _receivePort = ReceivePort(); final Map _connections = {}; + final Map> _subscriptions = {}; _WebSocketIsolateWorker(this._mainSendPort) { _mainSendPort.send(_receivePort.sendPort); @@ -16,8 +17,20 @@ class _WebSocketIsolateWorker { } else if (message is _SendCommand) { _connections[message.connectionId]?.send(message.data); } else if (message is _CloseCommand) { - _connections[message.connectionId]?.close(); - _connections.remove(message.connectionId); + _closeConnection(message.connectionId); + } + } + + Future _closeConnection(int connectionId) async { + _connections[connectionId]?.close(); + _connections.remove(connectionId); + + // Cancel all subscriptions for this connection + final subs = _subscriptions.remove(connectionId); + if (subs != null) { + for (final sub in subs) { + await sub.cancel(); + } } } @@ -30,7 +43,9 @@ class _WebSocketIsolateWorker { final webSocket = WebSocket(Uri.parse(url), backoff: backoff); _connections[connectionId] = webSocket; - webSocket.connection.listen( + final subscriptions = []; + + final connectionSub = webSocket.connection.listen( (state) { if (state is Connected) { _mainSendPort.send( @@ -67,9 +82,11 @@ class _WebSocketIsolateWorker { ); }, ); + subscriptions.add(connectionSub); - webSocket.messages.listen( + final messagesSub = webSocket.messages.listen( (message) { + //? this is an expensive operation final eventJson = json.decode(message); final NostrMessageRaw data; @@ -165,8 +182,11 @@ class _WebSocketIsolateWorker { closeReason: "Done", ), ); - _connections.remove(connectionId); + _closeConnection(connectionId); }, ); + subscriptions.add(messagesSub); + + _subscriptions[connectionId] = subscriptions; } } From 5b444577c3c787d9f38fb3073e749dc6aa7c8093 Mon Sep 17 00:00:00 2001 From: 1leo <58687994+1-leo@users.noreply.github.com> Date: Tue, 25 Nov 2025 19:08:39 +0100 Subject: [PATCH 12/13] feat: isolate batch send --- ...ocket_isolate_nostr_transport_manager.dart | 69 +++++++++++-------- ...socket_isolate_nostr_transport_worker.dart | 51 ++++++++++++-- 2 files changed, 84 insertions(+), 36 deletions(-) diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart index 6e833f4f1..736e6bffc 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart @@ -40,39 +40,19 @@ class _WebSocketIsolateManager { } void _handleIsolateMessage(dynamic message) { - if (message is _IsolateMessage) { - final isolateMsg = message; - final controller = _connectionControllers[isolateMsg.connectionId]; - if (controller == null) return; - - switch (isolateMsg.type) { - case _IsolateMessageType.message: - if (isolateMsg.data != null) { - controller.add(isolateMsg.data!); - } - break; - case _IsolateMessageType.error: - if (isolateMsg.error != null) { - controller.addError(isolateMsg.error!); - } - break; - case _IsolateMessageType.done: - if (!controller.isClosed) { - controller.close(); - } - break; - case _IsolateMessageType.ready: - case _IsolateMessageType.reconnecting: - // Notify state change via callback - final stateCallback = _stateCallbacks[isolateMsg.connectionId]; - if (stateCallback != null) { - stateCallback(isolateMsg.type); - } - break; + // Handle batched messages + if (message is List<_IsolateMessage>) { + for (final msg in message) { + _processIsolateMessage(msg); } return; } + if (message is _IsolateMessage) { + _processIsolateMessage(message); + return; + } + if (message is SendPort) { _isolateSendPort = message; if (!_readyCompleter.isCompleted) { @@ -82,6 +62,37 @@ class _WebSocketIsolateManager { } } + void _processIsolateMessage(_IsolateMessage isolateMsg) { + final controller = _connectionControllers[isolateMsg.connectionId]; + if (controller == null) return; + + switch (isolateMsg.type) { + case _IsolateMessageType.message: + if (isolateMsg.data != null) { + controller.add(isolateMsg.data!); + } + break; + case _IsolateMessageType.error: + if (isolateMsg.error != null) { + controller.addError(isolateMsg.error!); + } + break; + case _IsolateMessageType.done: + if (!controller.isClosed) { + controller.close(); + } + break; + case _IsolateMessageType.ready: + case _IsolateMessageType.reconnecting: + // Notify state change via callback + final stateCallback = _stateCallbacks[isolateMsg.connectionId]; + if (stateCallback != null) { + stateCallback(isolateMsg.type); + } + break; + } + } + int _registerConnection( StreamController controller, void Function(_IsolateMessageType) onStateChange, diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart index 8796c4705..67f000d47 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart @@ -6,9 +6,40 @@ class _WebSocketIsolateWorker { final Map _connections = {}; final Map> _subscriptions = {}; + // Message batching + final List<_IsolateMessage> _messageQueue = []; + Timer? _batchTimer; + static const Duration _batchInterval = Duration(milliseconds: 10); + static const int _maxBatchSize = 100; + _WebSocketIsolateWorker(this._mainSendPort) { _mainSendPort.send(_receivePort.sendPort); _receivePort.listen(_handleCommand); + _startBatchTimer(); + } + + void _startBatchTimer() { + _batchTimer = Timer.periodic(_batchInterval, (_) { + _flushMessageQueue(); + }); + } + + void _queueMessage(_IsolateMessage message) { + _messageQueue.add(message); + + // Flush if batch is full + if (_messageQueue.length >= _maxBatchSize) { + _flushMessageQueue(); + } + } + + void _flushMessageQueue() { + if (_messageQueue.isEmpty) return; + + // Send all messages in one batch + final batch = List<_IsolateMessage>.from(_messageQueue); + _messageQueue.clear(); + _mainSendPort.send(batch); } void _handleCommand(dynamic message) { @@ -32,6 +63,12 @@ class _WebSocketIsolateWorker { await sub.cancel(); } } + + // Clean up batch timer if no connections remain + if (_connections.isEmpty) { + _batchTimer?.cancel(); + _flushMessageQueue(); // Flush remaining messages + } } void _connect(int connectionId, String url) async { @@ -48,21 +85,21 @@ class _WebSocketIsolateWorker { final connectionSub = webSocket.connection.listen( (state) { if (state is Connected) { - _mainSendPort.send( + _queueMessage( _IsolateMessage( connectionId: connectionId, type: _IsolateMessageType.ready, ), ); } else if (state is Reconnecting) { - _mainSendPort.send( + _queueMessage( _IsolateMessage( connectionId: connectionId, type: _IsolateMessageType.reconnecting, ), ); } else if (state is Disconnected) { - _mainSendPort.send( + _queueMessage( _IsolateMessage( connectionId: connectionId, type: _IsolateMessageType.done, @@ -73,7 +110,7 @@ class _WebSocketIsolateWorker { } }, onError: (error) { - _mainSendPort.send( + _queueMessage( _IsolateMessage( connectionId: connectionId, type: _IsolateMessageType.error, @@ -156,7 +193,7 @@ class _WebSocketIsolateWorker { break; } - _mainSendPort.send( + _queueMessage( _IsolateMessage( connectionId: connectionId, type: _IsolateMessageType.message, @@ -165,7 +202,7 @@ class _WebSocketIsolateWorker { ); }, onError: (error) { - _mainSendPort.send( + _queueMessage( _IsolateMessage( connectionId: connectionId, type: _IsolateMessageType.error, @@ -174,7 +211,7 @@ class _WebSocketIsolateWorker { ); }, onDone: () { - _mainSendPort.send( + _queueMessage( _IsolateMessage( connectionId: connectionId, type: _IsolateMessageType.done, From 9bb5059f5b2863649272abf617f3ee32c6ceb92a Mon Sep 17 00:00:00 2001 From: LeoLox <58687994+leo-lox@users.noreply.github.com> Date: Sun, 30 Nov 2025 21:58:19 +0100 Subject: [PATCH 13/13] refactor: use url as connection id --- .../websocket_isolate_entities.dart | 5 +++-- .../websocket_isolate_nostr_transport.dart | 8 +++++--- ...ocket_isolate_nostr_transport_manager.dart | 20 ++++++++++--------- ...socket_isolate_nostr_transport_worker.dart | 14 +++++++++---- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart index 206ef0a2c..47486bc4f 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart @@ -11,7 +11,8 @@ enum _IsolateMessageType { /// Internal message class for communication between main isolate and worker isolate class _IsolateMessage { - final int connectionId; + /// connection id is the cleaned relay url, (needed so reconnect, restore state works) + final String connectionId; final _IsolateMessageType type; final NostrMessageRaw? data; final String? error; @@ -30,7 +31,7 @@ class _IsolateMessage { /// Base class for commands sent from main isolate to worker isolate abstract class _IsolateCommand { - final int connectionId; + final String connectionId; _IsolateCommand({required this.connectionId}); } diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart index 9743e7194..8c6f7f12c 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart @@ -5,6 +5,7 @@ import 'dart:isolate'; import 'package:web_socket_client/web_socket_client.dart'; import '../../../../domain_layer/repositories/nostr_transport.dart'; +import '../../../../shared/helpers/relay_helper.dart'; import '../../../../shared/logger/logger.dart'; part 'websocket_isolate_entities.dart'; @@ -18,7 +19,7 @@ class WebSocketIsolateNostrTransport implements NostrTransport { final StreamController _messageController = StreamController.broadcast(); - late final int _connectionId; + late final String _connectionId; final _WebSocketIsolateManager _manager = _WebSocketIsolateManager.instance; int? _closeCode; @@ -38,8 +39,9 @@ class WebSocketIsolateNostrTransport implements NostrTransport { await _manager.ready; _connectionId = _manager._registerConnection( - _messageController, - (state) { + controller: _messageController, + connectionId: url, + onStateChange: (state) { // Handle state changes from isolate switch (state) { case _IsolateMessageType.ready: diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart index 736e6bffc..f37979dca 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart @@ -13,9 +13,9 @@ class _WebSocketIsolateManager { final ReceivePort _receivePort = ReceivePort(); StreamSubscription? _receivePortSubscription; final Completer _readyCompleter = Completer(); - final Map> _connectionControllers = {}; - final Map _stateCallbacks = {}; - int _nextConnectionId = 0; + final Map> _connectionControllers = + {}; + final Map _stateCallbacks = {}; _WebSocketIsolateManager._() { _initialize(); @@ -26,6 +26,7 @@ class _WebSocketIsolateManager { _isolate = await Isolate.spawn( _isolateEntry, _receivePort.sendPort, + debugName: "WebSocketIsolateNostrTransportWorker", ); _receivePortSubscription = _receivePort.listen((message) { @@ -93,17 +94,18 @@ class _WebSocketIsolateManager { } } - int _registerConnection( - StreamController controller, - void Function(_IsolateMessageType) onStateChange, - ) { - final id = _nextConnectionId++; + String _registerConnection({ + required StreamController controller, + required void Function(_IsolateMessageType) onStateChange, + required String connectionId, + }) { + final id = connectionId; _connectionControllers[id] = controller; _stateCallbacks[id] = onStateChange; return id; } - void _unregisterConnection(int connectionId) { + void _unregisterConnection(String connectionId) { _connectionControllers.remove(connectionId); _stateCallbacks.remove(connectionId); } diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart index 67f000d47..46866d63d 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart @@ -3,8 +3,8 @@ part of 'websocket_isolate_nostr_transport.dart'; class _WebSocketIsolateWorker { final SendPort _mainSendPort; final ReceivePort _receivePort = ReceivePort(); - final Map _connections = {}; - final Map> _subscriptions = {}; + final Map _connections = {}; + final Map> _subscriptions = {}; // Message batching final List<_IsolateMessage> _messageQueue = []; @@ -52,7 +52,7 @@ class _WebSocketIsolateWorker { } } - Future _closeConnection(int connectionId) async { + Future _closeConnection(String connectionId) async { _connections[connectionId]?.close(); _connections.remove(connectionId); @@ -71,7 +71,13 @@ class _WebSocketIsolateWorker { } } - void _connect(int connectionId, String url) async { + void _connect(String connectionId, String url) async { + if (_connections.containsKey(connectionId)) { + // Already connected + print("connection with id $connectionId already exists"); + throw Exception("Connection with id $connectionId already exists"); + return; + } final backoff = BinaryExponentialBackoff( initial: Duration(seconds: 1), maximumStep: 10,