diff --git a/packages/ndk/lib/domain_layer/entities/broadcast_state.dart b/packages/ndk/lib/domain_layer/entities/broadcast_state.dart index 83b449482..256db3ae3 100644 --- a/packages/ndk/lib/domain_layer/entities/broadcast_state.dart +++ b/packages/ndk/lib/domain_layer/entities/broadcast_state.dart @@ -78,6 +78,8 @@ class BroadcastState { late final StreamSubscription _networkSubscription; + bool _timeoutStarted = false; + /// creates a new [BroadcastState] instance BroadcastState({ required this.timeout, @@ -91,7 +93,12 @@ class BroadcastState { // check if all relays responded _checkBroadcastDone(); }); + } + /// Starts the timeout timer. Call this after signing is complete. + void startTimeout() { + if (_timeoutStarted) return; + _timeoutStarted = true; Future.delayed(timeout, () { if (!publishDone) { _stateUpdatesController.add(this); diff --git a/packages/ndk/lib/domain_layer/entities/request_state.dart b/packages/ndk/lib/domain_layer/entities/request_state.dart index 10a13e300..605182a5a 100644 --- a/packages/ndk/lib/domain_layer/entities/request_state.dart +++ b/packages/ndk/lib/domain_layer/entities/request_state.dart @@ -35,6 +35,8 @@ class RequestState { Set returnedIds = {}; Timer? _timeout; + DateTime? _timeoutStartedAt; + Duration? _remainingTimeout; Stream get stream => controller.stream; @@ -76,11 +78,7 @@ class RequestState { // if we have a timeout set, we start it if (request.timeoutDuration != null) { timeoutDuration = request.timeoutDuration; - _timeout = Timer(timeoutDuration!, () { - onTimeout?.call(this); - // call close on all controllers - close(); - }); + _startTimeout(timeoutDuration!); } _streamSubscription = controller.listen((e) {}, onDone: () { if (_timeout != null) { @@ -90,6 +88,34 @@ class RequestState { }); } + void _startTimeout(Duration duration) { + _timeoutStartedAt = DateTime.now(); + _timeout = Timer(duration, () { + onTimeout?.call(this); + close(); + }); + } + + /// Pauses the timeout timer. Call this before signing starts. + void pauseTimeout() { + if (_timeout == null || timeoutDuration == null) return; + + final elapsed = DateTime.now().difference(_timeoutStartedAt!); + _remainingTimeout = timeoutDuration! - elapsed; + if (_remainingTimeout!.isNegative) { + _remainingTimeout = Duration.zero; + } + _timeout!.cancel(); + _timeout = null; + } + + /// Resumes the timeout timer. Call this after signing completes. + void resumeTimeout() { + if (_remainingTimeout == null) return; + _startTimeout(_remainingTimeout!); + _remainingTimeout = null; + } + /// checks if all requests received EOSE bool get didAllRequestsReceivedEOSE => !requests.values.any((element) => !element.receivedEOSE); diff --git a/packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart b/packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart index 76eb76cb3..067275520 100644 --- a/packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart +++ b/packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart @@ -87,8 +87,7 @@ class Broadcast { nostrEvent: nostrEvent, signer: signer, specificRelays: cleanedSpecificRelays, - doneStream: broadcastState.stateUpdates - .map((state) => state.broadcasts.values.toList()), + broadcastState: broadcastState, ); } diff --git a/packages/ndk/lib/domain_layer/usecases/engines/network_engine.dart b/packages/ndk/lib/domain_layer/usecases/engines/network_engine.dart index d6d2f3e64..840e3992c 100644 --- a/packages/ndk/lib/domain_layer/usecases/engines/network_engine.dart +++ b/packages/ndk/lib/domain_layer/usecases/engines/network_engine.dart @@ -10,7 +10,7 @@ abstract class NetworkEngine { NdkBroadcastResponse handleEventBroadcast({ required Nip01Event nostrEvent, required EventSigner? signer, - required Stream> doneStream, + required BroadcastState broadcastState, Iterable? specificRelays, }); } diff --git a/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart b/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart index bca61a91f..69523e457 100644 --- a/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart +++ b/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart @@ -162,18 +162,24 @@ class JitEngine with Logger implements NetworkEngine { NdkBroadcastResponse handleEventBroadcast({ required Nip01Event nostrEvent, required EventSigner? signer, - required Stream> doneStream, + required BroadcastState broadcastState, Iterable? specificRelays, }) { Future asyncStuff() async { await relayManagerLight.seedRelaysConnected; final Nip01Event workingNostrEvent; - if (signer != null) { - workingNostrEvent = await signer.sign(nostrEvent); - } else { - workingNostrEvent = nostrEvent; + try { + if (signer != null) { + workingNostrEvent = await signer.sign(nostrEvent); + } else { + workingNostrEvent = nostrEvent; + } + } catch (e) { + broadcastState.startTimeout(); + rethrow; } + broadcastState.startTimeout(); if (specificRelays != null) { final cleanedSpecificRelays = cleanRelayUrls(specificRelays.toList()); @@ -216,7 +222,8 @@ class JitEngine with Logger implements NetworkEngine { asyncStuff(); return NdkBroadcastResponse( publishEvent: nostrEvent, - broadcastDoneStream: doneStream, + broadcastDoneStream: broadcastState.stateUpdates + .map((state) => state.broadcasts.values.toList()), ); } diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index dcbc3a519..ba7fe5001 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -589,6 +589,14 @@ class RelayManager { String challenge, Set accounts, ) { + // Pause timeout for all requests on this relay during AUTH signing + final requestsOnRelay = globalState.inFlightRequests.values + .where((state) => state.requests.keys.contains(relayConnectivity.url)) + .toList(); + for (final state in requestsOnRelay) { + state.pauseTimeout(); + } + int authCount = 0; for (final account in accounts) { if (account.signer.canSign()) { @@ -597,6 +605,10 @@ class RelayManager { ["challenge", challenge] ]); account.signer.sign(auth).then((signedAuth) { + // Resume timeout for requests after signing completes + for (final state in requestsOnRelay) { + state.resumeTimeout(); + } send(relayConnectivity, ClientMsg(ClientMsgType.kAuth, event: signedAuth)); Logger.log.d( "AUTH sent for ${account.pubkey.substring(0, 8)} to ${relayConnectivity.url}"); @@ -606,6 +618,10 @@ class RelayManager { } if (authCount == 0) { + // No signing will happen, resume timeouts immediately + for (final state in requestsOnRelay) { + state.resumeTimeout(); + } Logger.log.w("Received an AUTH challenge but no account can sign"); } } @@ -764,8 +780,12 @@ class RelayManager { Logger.log.d( "AUTH required for REQ $reqId on ${relayConnectivity.url}, authenticating ${signableAccounts.length} account(s)..."); + // Pause timeout during AUTH signing + state.pauseTimeout(); + // Track how many AUTH OKs we need before re-sending REQ int pendingAuthCount = signableAccounts.length; + int pendingSignCount = signableAccounts.length; for (final account in signableAccounts) { final auth = AuthEvent(pubKey: account.pubkey, tags: [ @@ -774,6 +794,12 @@ class RelayManager { ]); account.signer.sign(auth).then((signedAuth) { + // Resume timeout after all signings complete + pendingSignCount--; + if (pendingSignCount == 0) { + state.resumeTimeout(); + } + // Store callback - only re-send REQ after last AUTH OK _pendingAuthCallbacks[signedAuth.id] = () { pendingAuthCount--; diff --git a/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart b/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart index bdd5519c1..e05c2cfcf 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart @@ -265,17 +265,23 @@ class RelaySetsEngine implements NetworkEngine { NdkBroadcastResponse handleEventBroadcast({ required Nip01Event nostrEvent, required EventSigner? signer, - required Stream> doneStream, + required BroadcastState broadcastState, Iterable? specificRelays, }) { Future asyncStuff() async { final Nip01Event workingEvent; - if (signer != null) { - workingEvent = await signer.sign(nostrEvent); - } else { - workingEvent = nostrEvent; + try { + if (signer != null) { + workingEvent = await signer.sign(nostrEvent); + } else { + workingEvent = nostrEvent; + } + } catch (e) { + broadcastState.startTimeout(); + rethrow; } + broadcastState.startTimeout(); // ===================================================================================== // specific relays @@ -403,7 +409,8 @@ class RelaySetsEngine implements NetworkEngine { return NdkBroadcastResponse( publishEvent: nostrEvent, - broadcastDoneStream: doneStream, + broadcastDoneStream: broadcastState.stateUpdates + .map((state) => state.broadcasts.values.toList()), ); } } diff --git a/packages/ndk/test/mocks/mock_relay.dart b/packages/ndk/test/mocks/mock_relay.dart index 792127dfe..c8891a6c3 100644 --- a/packages/ndk/test/mocks/mock_relay.dart +++ b/packages/ndk/test/mocks/mock_relay.dart @@ -93,7 +93,8 @@ class MockRelay { this.server = server; var stream = server.transform(WebSocketTransformer()); - String challenge = ''; + // Generate challenge once for the entire server lifetime (fixes race condition on reconnect) + final String challenge = Helpers.getRandomString(10); Set authenticatedPubkeys = {}; stream.listen((webSocket) { @@ -104,7 +105,6 @@ class MockRelay { webSocket.add(customWelcomeMessage!); } if ((requireAuthForRequests || requireAuthForEvents) && sendAuthChallenge) { - challenge = Helpers.getRandomString(10); webSocket.add(jsonEncode(["AUTH", challenge])); } webSocket.listen((message) async { diff --git a/packages/ndk/test/mocks/mock_slow_signer.dart b/packages/ndk/test/mocks/mock_slow_signer.dart new file mode 100644 index 000000000..c964f388d --- /dev/null +++ b/packages/ndk/test/mocks/mock_slow_signer.dart @@ -0,0 +1,78 @@ +import 'package:ndk/domain_layer/entities/nip_01_event.dart'; +import 'package:ndk/domain_layer/entities/pending_signer_request.dart'; +import 'package:ndk/domain_layer/repositories/event_signer.dart'; + +/// A wrapper signer that adds a delay to simulate slow signing +/// (like NIP-46, Amber, etc.) where user interaction is required. +class MockSlowSigner implements EventSigner { + final EventSigner _innerSigner; + final Duration _delay; + + MockSlowSigner({ + required EventSigner innerSigner, + required Duration delay, + }) : _innerSigner = innerSigner, + _delay = delay; + + @override + Future sign(Nip01Event event) async { + await Future.delayed(_delay); + return _innerSigner.sign(event); + } + + @override + String getPublicKey() => _innerSigner.getPublicKey(); + + @override + bool canSign() => _innerSigner.canSign(); + + @override + Future decrypt(String msg, String destPubKey, {String? id}) async { + await Future.delayed(_delay); + return _innerSigner.decrypt(msg, destPubKey, id: id); + } + + @override + Future encrypt(String msg, String destPubKey, {String? id}) async { + await Future.delayed(_delay); + return _innerSigner.encrypt(msg, destPubKey, id: id); + } + + @override + Future encryptNip44({ + required String plaintext, + required String recipientPubKey, + }) async { + await Future.delayed(_delay); + return _innerSigner.encryptNip44( + plaintext: plaintext, + recipientPubKey: recipientPubKey, + ); + } + + @override + Future decryptNip44({ + required String ciphertext, + required String senderPubKey, + }) async { + await Future.delayed(_delay); + return _innerSigner.decryptNip44( + ciphertext: ciphertext, + senderPubKey: senderPubKey, + ); + } + + @override + Stream> get pendingRequestsStream => + _innerSigner.pendingRequestsStream; + + @override + List get pendingRequests => + _innerSigner.pendingRequests; + + @override + bool cancelRequest(String requestId) => _innerSigner.cancelRequest(requestId); + + @override + Future dispose() => _innerSigner.dispose(); +} diff --git a/packages/ndk/test/usecases/broadcast_test.dart b/packages/ndk/test/usecases/broadcast_test.dart index 5cdbdab5d..f71baae0a 100644 --- a/packages/ndk/test/usecases/broadcast_test.dart +++ b/packages/ndk/test/usecases/broadcast_test.dart @@ -16,7 +16,7 @@ void main() async { late Ndk ndk; setUp(() async { - relay0 = MockRelay(name: "relay 0", explicitPort: 5098); + relay0 = MockRelay(name: "relay 0", explicitPort: 5102); await relay0.startServer(nip65s: { key0: Nip65( pubKey: key0.publicKey, @@ -252,7 +252,7 @@ void main() async { .loginPrivateKey(pubkey: key0.publicKey, privkey: key0.privateKey!); // Create a slow relay that won't respond in time - MockRelay slowRelay = MockRelay(name: "slow relay", explicitPort: 5099); + MockRelay slowRelay = MockRelay(name: "slow relay", explicitPort: 5103); await slowRelay.startServer( nip65s: { key0: Nip65( @@ -316,8 +316,8 @@ void main() async { ndk.accounts .loginPrivateKey(pubkey: key0.publicKey, privkey: key0.privateKey!); - MockRelay relay1 = MockRelay(name: "relay 1", explicitPort: 5099); - MockRelay relay2 = MockRelay(name: "relay 2", explicitPort: 5100); + MockRelay relay1 = MockRelay(name: "relay 1", explicitPort: 5104); + MockRelay relay2 = MockRelay(name: "relay 2", explicitPort: 5106); await relay1.startServer(nip65s: { key0: Nip65( diff --git a/packages/ndk/test/usecases/slow_signer_timeout_test.dart b/packages/ndk/test/usecases/slow_signer_timeout_test.dart new file mode 100644 index 000000000..fb245674a --- /dev/null +++ b/packages/ndk/test/usecases/slow_signer_timeout_test.dart @@ -0,0 +1,107 @@ +import 'package:ndk/ndk.dart'; +import 'package:ndk/shared/nips/nip01/bip340.dart'; +import 'package:test/test.dart'; + +import '../mocks/mock_event_verifier.dart'; +import '../mocks/mock_relay.dart'; +import '../mocks/mock_slow_signer.dart'; + +void main() { + test('broadcast with slow signer should not timeout during signing', + timeout: Timeout(Duration(seconds: 60)), () async { + final key = Bip340.generatePrivateKey(); + final relay = MockRelay(name: "relay", explicitPort: 5097); + await relay.startServer(); + + final ndk = Ndk(NdkConfig( + eventVerifier: MockEventVerifier(), + cache: MemCacheManager(), + bootstrapRelays: [relay.url], + )); + + ndk.accounts.loginExternalSigner( + signer: MockSlowSigner( + innerSigner: Bip340EventSigner( + privateKey: key.privateKey!, + publicKey: key.publicKey, + ), + delay: const Duration(seconds: 12), + ), + ); + + final event = Nip01Event( + pubKey: key.publicKey, + kind: Nip01Event.kTextNodeKind, + tags: [], + content: "test", + ); + + // timeout (10s) < signing (12s) → should fail without fix + final result = await ndk.broadcast + .broadcast( + nostrEvent: event, + specificRelays: [relay.url], + timeout: const Duration(seconds: 10), + ) + .broadcastDoneFuture; + + expect(result.any((r) => r.broadcastSuccessful), isTrue); + + await ndk.destroy(); + await relay.stopServer(); + }); + + test('request with AUTH should not timeout during signing', + timeout: Timeout(Duration(seconds: 60)), () async { + final key = Bip340.generatePrivateKey(); + final relay = MockRelay( + name: "relay", + explicitPort: 5098, + requireAuthForRequests: true, + ); + + final testEvent = await Bip340EventSigner( + privateKey: key.privateKey!, + publicKey: key.publicKey, + ).sign(Nip01Event( + pubKey: key.publicKey, + kind: Nip01Event.kTextNodeKind, + tags: [], + content: "test event", + )); + + await relay.startServer(textNotes: {key: testEvent}); + + final ndk = Ndk(NdkConfig( + eventVerifier: MockEventVerifier(), + cache: MemCacheManager(), + bootstrapRelays: [relay.url], + )); + + ndk.accounts.loginExternalSigner( + signer: MockSlowSigner( + innerSigner: Bip340EventSigner( + privateKey: key.privateKey!, + publicKey: key.publicKey, + ), + delay: const Duration(seconds: 12), + ), + ); + + // timeout (10s) < signing (12s) → should fail without fix + final result = await ndk.requests + .query( + filter: Filter( + kinds: [Nip01Event.kTextNodeKind], + ), + explicitRelays: [relay.url], + timeout: const Duration(seconds: 10), + ) + .future; + + expect(result, isNotEmpty); + + await ndk.destroy(); + await relay.stopServer(); + }); +}