Skip to content
Open
7 changes: 7 additions & 0 deletions packages/ndk/lib/domain_layer/entities/broadcast_state.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class BroadcastState {

late final StreamSubscription _networkSubscription;

bool _timeoutStarted = false;

/// creates a new [BroadcastState] instance
BroadcastState({
required this.timeout,
Expand All @@ -91,7 +93,12 @@ class BroadcastState {
// check if all relays responded
_checkBroadcastDone();
});
}

/// Starts the timeout timer. Call this after signing is complete.
void startTimeout() {
if (_timeoutStarted) return;
_timeoutStarted = true;
Future.delayed(timeout, () {
if (!publishDone) {
_stateUpdatesController.add(this);
Expand Down
36 changes: 31 additions & 5 deletions packages/ndk/lib/domain_layer/entities/request_state.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class RequestState {
Set<String> returnedIds = {};

Timer? _timeout;
DateTime? _timeoutStartedAt;
Duration? _remainingTimeout;

Stream<Nip01Event> get stream => controller.stream;

Expand Down Expand Up @@ -76,11 +78,7 @@ class RequestState {
// if we have a timeout set, we start it
if (request.timeoutDuration != null) {
timeoutDuration = request.timeoutDuration;
_timeout = Timer(timeoutDuration!, () {
onTimeout?.call(this);
// call close on all controllers
close();
});
_startTimeout(timeoutDuration!);
}
_streamSubscription = controller.listen((e) {}, onDone: () {
if (_timeout != null) {
Expand All @@ -90,6 +88,34 @@ class RequestState {
});
}

void _startTimeout(Duration duration) {
_timeoutStartedAt = DateTime.now();
_timeout = Timer(duration, () {
onTimeout?.call(this);
close();
});
}

/// Pauses the timeout timer. Call this before signing starts.
void pauseTimeout() {
if (_timeout == null || timeoutDuration == null) return;

final elapsed = DateTime.now().difference(_timeoutStartedAt!);
_remainingTimeout = timeoutDuration! - elapsed;
if (_remainingTimeout!.isNegative) {
_remainingTimeout = Duration.zero;
}
_timeout!.cancel();
_timeout = null;
}

/// Resumes the timeout timer. Call this after signing completes.
void resumeTimeout() {
if (_remainingTimeout == null) return;
_startTimeout(_remainingTimeout!);
_remainingTimeout = null;
}

/// checks if all requests received EOSE
bool get didAllRequestsReceivedEOSE =>
!requests.values.any((element) => !element.receivedEOSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ class Broadcast {
nostrEvent: nostrEvent,
signer: signer,
specificRelays: cleanedSpecificRelays,
doneStream: broadcastState.stateUpdates
.map((state) => state.broadcasts.values.toList()),
broadcastState: broadcastState,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract class NetworkEngine {
NdkBroadcastResponse handleEventBroadcast({
required Nip01Event nostrEvent,
required EventSigner? signer,
required Stream<List<RelayBroadcastResponse>> doneStream,
required BroadcastState broadcastState,
Iterable<String>? specificRelays,
});
}
Expand Down
19 changes: 13 additions & 6 deletions packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,24 @@ class JitEngine with Logger implements NetworkEngine {
NdkBroadcastResponse handleEventBroadcast({
required Nip01Event nostrEvent,
required EventSigner? signer,
required Stream<List<RelayBroadcastResponse>> doneStream,
required BroadcastState broadcastState,
Iterable<String>? specificRelays,
}) {
Future<void> asyncStuff() async {
await relayManagerLight.seedRelaysConnected;

final Nip01Event workingNostrEvent;
if (signer != null) {
workingNostrEvent = await signer.sign(nostrEvent);
} else {
workingNostrEvent = nostrEvent;
try {
if (signer != null) {
workingNostrEvent = await signer.sign(nostrEvent);
} else {
workingNostrEvent = nostrEvent;
}
} catch (e) {
broadcastState.startTimeout();
rethrow;
}
broadcastState.startTimeout();

if (specificRelays != null) {
final cleanedSpecificRelays = cleanRelayUrls(specificRelays.toList());
Expand Down Expand Up @@ -216,7 +222,8 @@ class JitEngine with Logger implements NetworkEngine {
asyncStuff();
return NdkBroadcastResponse(
publishEvent: nostrEvent,
broadcastDoneStream: doneStream,
broadcastDoneStream: broadcastState.stateUpdates
.map((state) => state.broadcasts.values.toList()),
);
}

Expand Down
26 changes: 26 additions & 0 deletions packages/ndk/lib/domain_layer/usecases/relay_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,14 @@ class RelayManager<T> {
String challenge,
Set<Account> accounts,
) {
// Pause timeout for all requests on this relay during AUTH signing
final requestsOnRelay = globalState.inFlightRequests.values
.where((state) => state.requests.keys.contains(relayConnectivity.url))
.toList();
for (final state in requestsOnRelay) {
state.pauseTimeout();
}

int authCount = 0;
for (final account in accounts) {
if (account.signer.canSign()) {
Expand All @@ -597,6 +605,10 @@ class RelayManager<T> {
["challenge", challenge]
]);
account.signer.sign(auth).then((signedAuth) {
// Resume timeout for requests after signing completes
for (final state in requestsOnRelay) {
state.resumeTimeout();
}
send(relayConnectivity, ClientMsg(ClientMsgType.kAuth, event: signedAuth));
Logger.log.d(
"AUTH sent for ${account.pubkey.substring(0, 8)} to ${relayConnectivity.url}");
Expand All @@ -606,6 +618,10 @@ class RelayManager<T> {
}

if (authCount == 0) {
// No signing will happen, resume timeouts immediately
for (final state in requestsOnRelay) {
state.resumeTimeout();
}
Logger.log.w("Received an AUTH challenge but no account can sign");
}
}
Expand Down Expand Up @@ -764,8 +780,12 @@ class RelayManager<T> {
Logger.log.d(
"AUTH required for REQ $reqId on ${relayConnectivity.url}, authenticating ${signableAccounts.length} account(s)...");

// Pause timeout during AUTH signing
state.pauseTimeout();

// Track how many AUTH OKs we need before re-sending REQ
int pendingAuthCount = signableAccounts.length;
int pendingSignCount = signableAccounts.length;

for (final account in signableAccounts) {
final auth = AuthEvent(pubKey: account.pubkey, tags: [
Expand All @@ -774,6 +794,12 @@ class RelayManager<T> {
]);

account.signer.sign(auth).then((signedAuth) {
// Resume timeout after all signings complete
pendingSignCount--;
if (pendingSignCount == 0) {
state.resumeTimeout();
}

// Store callback - only re-send REQ after last AUTH OK
_pendingAuthCallbacks[signedAuth.id] = () {
pendingAuthCount--;
Expand Down
19 changes: 13 additions & 6 deletions packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,23 @@ class RelaySetsEngine implements NetworkEngine {
NdkBroadcastResponse handleEventBroadcast({
required Nip01Event nostrEvent,
required EventSigner? signer,
required Stream<List<RelayBroadcastResponse>> doneStream,
required BroadcastState broadcastState,
Iterable<String>? specificRelays,
}) {
Future<void> asyncStuff() async {
final Nip01Event workingEvent;

if (signer != null) {
workingEvent = await signer.sign(nostrEvent);
} else {
workingEvent = nostrEvent;
try {
if (signer != null) {
workingEvent = await signer.sign(nostrEvent);
} else {
workingEvent = nostrEvent;
}
} catch (e) {
broadcastState.startTimeout();
rethrow;
}
broadcastState.startTimeout();

// =====================================================================================
// specific relays
Expand Down Expand Up @@ -403,7 +409,8 @@ class RelaySetsEngine implements NetworkEngine {

return NdkBroadcastResponse(
publishEvent: nostrEvent,
broadcastDoneStream: doneStream,
broadcastDoneStream: broadcastState.stateUpdates
.map((state) => state.broadcasts.values.toList()),
);
}
}
4 changes: 2 additions & 2 deletions packages/ndk/test/mocks/mock_relay.dart
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class MockRelay {
this.server = server;
var stream = server.transform(WebSocketTransformer());

String challenge = '';
// Generate challenge once for the entire server lifetime (fixes race condition on reconnect)
final String challenge = Helpers.getRandomString(10);
Set<String> authenticatedPubkeys = {};

stream.listen((webSocket) {
Expand All @@ -104,7 +105,6 @@ class MockRelay {
webSocket.add(customWelcomeMessage!);
}
if ((requireAuthForRequests || requireAuthForEvents) && sendAuthChallenge) {
challenge = Helpers.getRandomString(10);
webSocket.add(jsonEncode(["AUTH", challenge]));
}
webSocket.listen((message) async {
Expand Down
78 changes: 78 additions & 0 deletions packages/ndk/test/mocks/mock_slow_signer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import 'package:ndk/domain_layer/entities/nip_01_event.dart';
import 'package:ndk/domain_layer/entities/pending_signer_request.dart';
import 'package:ndk/domain_layer/repositories/event_signer.dart';

/// A wrapper signer that adds a delay to simulate slow signing
/// (like NIP-46, Amber, etc.) where user interaction is required.
class MockSlowSigner implements EventSigner {
final EventSigner _innerSigner;
final Duration _delay;

MockSlowSigner({
required EventSigner innerSigner,
required Duration delay,
}) : _innerSigner = innerSigner,
_delay = delay;

@override
Future<Nip01Event> sign(Nip01Event event) async {
await Future.delayed(_delay);
return _innerSigner.sign(event);
}

@override
String getPublicKey() => _innerSigner.getPublicKey();

@override
bool canSign() => _innerSigner.canSign();

@override
Future<String?> decrypt(String msg, String destPubKey, {String? id}) async {
await Future.delayed(_delay);
return _innerSigner.decrypt(msg, destPubKey, id: id);
}

@override
Future<String?> encrypt(String msg, String destPubKey, {String? id}) async {
await Future.delayed(_delay);
return _innerSigner.encrypt(msg, destPubKey, id: id);
}

@override
Future<String?> encryptNip44({
required String plaintext,
required String recipientPubKey,
}) async {
await Future.delayed(_delay);
return _innerSigner.encryptNip44(
plaintext: plaintext,
recipientPubKey: recipientPubKey,
);
}

@override
Future<String?> decryptNip44({
required String ciphertext,
required String senderPubKey,
}) async {
await Future.delayed(_delay);
return _innerSigner.decryptNip44(
ciphertext: ciphertext,
senderPubKey: senderPubKey,
);
}

@override
Stream<List<PendingSignerRequest>> get pendingRequestsStream =>
_innerSigner.pendingRequestsStream;

@override
List<PendingSignerRequest> get pendingRequests =>
_innerSigner.pendingRequests;

@override
bool cancelRequest(String requestId) => _innerSigner.cancelRequest(requestId);

@override
Future<void> dispose() => _innerSigner.dispose();
}
8 changes: 4 additions & 4 deletions packages/ndk/test/usecases/broadcast_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void main() async {
late Ndk ndk;

setUp(() async {
relay0 = MockRelay(name: "relay 0", explicitPort: 5098);
relay0 = MockRelay(name: "relay 0", explicitPort: 5102);
await relay0.startServer(nip65s: {
key0: Nip65(
pubKey: key0.publicKey,
Expand Down Expand Up @@ -252,7 +252,7 @@ void main() async {
.loginPrivateKey(pubkey: key0.publicKey, privkey: key0.privateKey!);

// Create a slow relay that won't respond in time
MockRelay slowRelay = MockRelay(name: "slow relay", explicitPort: 5099);
MockRelay slowRelay = MockRelay(name: "slow relay", explicitPort: 5103);
await slowRelay.startServer(
nip65s: {
key0: Nip65(
Expand Down Expand Up @@ -316,8 +316,8 @@ void main() async {
ndk.accounts
.loginPrivateKey(pubkey: key0.publicKey, privkey: key0.privateKey!);

MockRelay relay1 = MockRelay(name: "relay 1", explicitPort: 5099);
MockRelay relay2 = MockRelay(name: "relay 2", explicitPort: 5100);
MockRelay relay1 = MockRelay(name: "relay 1", explicitPort: 5104);
MockRelay relay2 = MockRelay(name: "relay 2", explicitPort: 5106);

await relay1.startServer(nip65s: {
key0: Nip65(
Expand Down
Loading