diff --git a/packages/ndk/lib/domain_layer/usecases/requests/requests.dart b/packages/ndk/lib/domain_layer/usecases/requests/requests.dart index f52a96d05..946eaed06 100644 --- a/packages/ndk/lib/domain_layer/usecases/requests/requests.dart +++ b/packages/ndk/lib/domain_layer/usecases/requests/requests.dart @@ -1,5 +1,7 @@ import 'dart:async'; +import 'package:rxdart/rxdart.dart'; + import '../../../config/request_defaults.dart'; import '../../../shared/logger/logger.dart'; import '../../../shared/nips/nip01/helpers.dart'; @@ -23,6 +25,13 @@ import '../stream_response_cleaner/stream_response_cleaner.dart'; import 'concurrency_check.dart'; import 'verify_event_stream.dart'; +/// Internal state for tracking pagination progress on a single relay +class _RelayPaginationState { + int? oldestTimestamp; + int? currentUntil; + bool exhausted = false; +} + /// A class that handles low-level Nostr network requests and subscriptions. class Requests { final GlobalState _globalState; @@ -78,6 +87,7 @@ class Requests { /// [timeoutCallbackUserFacing] A user facing timeout callback, this callback should be given to the lib user \ /// [timeoutCallback] An internal timeout callback, this callback should be used for internal error handling \ /// [authenticateAs] List of accounts to authenticate with on relays (NIP-42) \ + /// [paginate] If true, automatically paginates backwards through time to fetch all events in the range \ /// /// Returns an [NdkResponse] containing the query result stream, future NdkResponse query({ @@ -95,6 +105,7 @@ class Requests { Iterable? explicitRelays, int? desiredCoverage, List? authenticateAs, + bool paginate = false, }) { if (filter == null && (filters == null || filters.isEmpty)) { throw ArgumentError('Either filter or filters must be provided'); @@ -102,6 +113,22 @@ class Requests { final effectiveFilters = filter != null ? [filter] : filters!; timeout ??= _defaultQueryTimeout; + if (paginate) { + return _paginatedQuery( + filter: effectiveFilters.first, + name: name, + relaySet: relaySet, + cacheRead: cacheRead, + cacheWrite: cacheWrite, + timeout: timeout, + timeoutCallbackUserFacing: timeoutCallbackUserFacing, + timeoutCallback: timeoutCallback, + explicitRelays: explicitRelays, + desiredCoverage: desiredCoverage, + authenticateAs: authenticateAs, + ); + } + return requestNostrEvent(NdkRequest.query( '$name-${Helpers.getRandomString(10)}', name: name, @@ -299,6 +326,164 @@ class Requests { return response; } + /// Performs a paginated query that fetches all events in a time range + /// by making multiple requests per relay, each time adjusting the `until` parameter + /// to fetch older events until `since` is reached or no more events are returned. + /// Pagination is done independently per relay to avoid skipping events. + NdkResponse _paginatedQuery({ + required Filter filter, + String name = '', + RelaySet? relaySet, + bool cacheRead = true, + bool cacheWrite = true, + Duration? timeout, + Function()? timeoutCallbackUserFacing, + Function()? timeoutCallback, + Iterable? explicitRelays, + int? desiredCoverage, + List? authenticateAs, + }) { + final requestId = '$name-paginated-${Helpers.getRandomString(10)}'; + final aggregatedController = ReplaySubject(); + final seenEventIds = {}; + + Future paginate() async { + final since = filter.since; + + // First request to discover relays and get initial events + final initialResponse = requestNostrEvent(NdkRequest.query( + '$name-page-initial-${Helpers.getRandomString(5)}', + name: name, + filters: [filter.clone()], + relaySet: relaySet, + cacheRead: cacheRead, + cacheWrite: cacheWrite, + timeoutDuration: timeout, + timeoutCallbackUserFacing: timeoutCallbackUserFacing, + timeoutCallback: timeoutCallback, + explicitRelays: explicitRelays, + desiredCoverage: + desiredCoverage ?? RequestDefaults.DEFAULT_BEST_RELAYS_MIN_COUNT, + authenticateAs: authenticateAs, + )); + + final initialEvents = await initialResponse.future; + + // Emit initial events and discover relays + final relayState = {}; + + for (final event in initialEvents) { + if (!seenEventIds.contains(event.id)) { + seenEventIds.add(event.id); + aggregatedController.add(event); + } + + // Track oldest timestamp per relay + for (final relay in event.sources) { + final state = relayState.putIfAbsent( + relay, + () => _RelayPaginationState(), + ); + if (state.oldestTimestamp == null || + event.createdAt < state.oldestTimestamp!) { + state.oldestTimestamp = event.createdAt; + } + } + } + + // If no events or no relays discovered, we're done + if (initialEvents.isEmpty || relayState.isEmpty) { + await aggregatedController.close(); + return; + } + + // Initialize relay states + for (final entry in relayState.entries) { + final state = entry.value; + if (state.oldestTimestamp != null) { + state.currentUntil = state.oldestTimestamp! - 1; + // Check if already reached since + if (since != null && state.oldestTimestamp! <= since) { + state.exhausted = true; + } + } else { + state.exhausted = true; + } + } + + // Paginate each relay independently + while (relayState.values.any((s) => !s.exhausted)) { + // Get active relays + final activeRelays = relayState.entries + .where((e) => !e.value.exhausted) + .map((e) => e.key) + .toList(); + + // Make parallel requests to all active relays + final futures = activeRelays.map((relay) async { + final state = relayState[relay]!; + final pageFilter = filter.clone(); + pageFilter.until = state.currentUntil; + + final response = requestNostrEvent(NdkRequest.query( + '$name-page-${Helpers.getRandomString(5)}', + name: name, + filters: [pageFilter], + relaySet: relaySet, + cacheRead: false, // Don't read from cache for subsequent pages + cacheWrite: cacheWrite, + timeoutDuration: timeout, + timeoutCallbackUserFacing: timeoutCallbackUserFacing, + timeoutCallback: timeoutCallback, + explicitRelays: [relay], + desiredCoverage: 1, + authenticateAs: authenticateAs, + )); + + return MapEntry(relay, await response.future); + }); + + final results = await Future.wait(futures); + + // Process results + for (final result in results) { + final relay = result.key; + final pageEvents = result.value; + final state = relayState[relay]!; + + int? oldestTimestamp; + for (final event in pageEvents) { + if (!seenEventIds.contains(event.id)) { + seenEventIds.add(event.id); + aggregatedController.add(event); + } + // Track oldest timestamp for this relay + if (oldestTimestamp == null || event.createdAt < oldestTimestamp) { + oldestTimestamp = event.createdAt; + } + } + + if (pageEvents.isEmpty || oldestTimestamp == null) { + state.exhausted = true; + } else { + state.currentUntil = oldestTimestamp - 1; + // Check if reached since + if (since != null && oldestTimestamp <= since) { + state.exhausted = true; + } + } + } + } + + await aggregatedController.close(); + } + + // Start pagination asynchronously + paginate(); + + return NdkResponse(requestId, aggregatedController.stream); + } + /// Records fetched ranges for each relay that received EOSE /// - If events received: use min/max of event timestamps /// - If no events + filter has since/until: use filter bounds diff --git a/packages/ndk/test/mocks/mock_relay.dart b/packages/ndk/test/mocks/mock_relay.dart index f91f3b96b..5bc37d565 100644 --- a/packages/ndk/test/mocks/mock_relay.dart +++ b/packages/ndk/test/mocks/mock_relay.dart @@ -32,6 +32,7 @@ class MockRelay { bool allwaysSendBadJson; bool sendMalformedEvents; String? customWelcomeMessage; + int? maxEventsPerRequest; // NIP-46 Remote Signer Support static const int kNip46Kind = BunkerRequest.kKind; @@ -56,6 +57,7 @@ class MockRelay { this.allwaysSendBadJson = false, this.sendMalformedEvents = false, this.customWelcomeMessage, + this.maxEventsPerRequest, int? explicitPort, }) : _nip65s = nip65s { if (explicitPort != null) { @@ -277,7 +279,8 @@ class MockRelay { filter.authors != null && filter.authors!.isNotEmpty) { eventsForThisFilter.addAll(_contactLists.values - .where((e) => filter.authors!.contains(e.pubKey)) + .where((e) => filter.authors!.contains(e.pubKey) && + _matchesTimeFilter(e, filter)) .toList()); } // Match against metadatas @@ -286,7 +289,8 @@ class MockRelay { filter.authors != null && filter.authors!.isNotEmpty) { eventsForThisFilter.addAll(_metadatas.values - .where((e) => filter.authors!.contains(e.pubKey)) + .where((e) => filter.authors!.contains(e.pubKey) && + _matchesTimeFilter(e, filter)) .toList()); } // General event matching (storedEvents and textNotes) @@ -298,8 +302,8 @@ class MockRelay { filter.authors == null || filter.authors!.contains(event.pubKey); bool idsMatches = filter.ids == null || filter.ids!.contains(event.id); - // Add other tag-based filtering if necessary, e.g., #e, #p tags - return kindMatches && authorMatches && idsMatches; + bool timeMatches = _matchesTimeFilter(event, filter); + return kindMatches && authorMatches && idsMatches && timeMatches; }).toList()); if (textNotes != null) { @@ -310,8 +314,8 @@ class MockRelay { filter.authors!.contains(event.pubKey); bool idsMatches = filter.ids == null || filter.ids!.contains(event.id); - // Add other tag-based filtering if necessary - return kindMatches && authorMatches && idsMatches; + bool timeMatches = _matchesTimeFilter(event, filter); + return kindMatches && authorMatches && idsMatches && timeMatches; }).toList()); } } @@ -324,6 +328,7 @@ class MockRelay { (filter.kinds == null || filter.kinds!.contains(Nip65.kKind))) { Nip01Event eventToAdd = entry.value.toEvent(); // Creates a new event instance + if (!_matchesTimeFilter(eventToAdd, filter)) continue; final Nip01Event? eventToAddSigned; if (signEvents && entry.key.privateKey != null) { // Sign the new instance, not the one in _nip65s @@ -354,8 +359,9 @@ class MockRelay { (filter.kinds!.any((k) => Nip51List.kPossibleKinds.contains(k) && Nip51List.kPossibleKinds.contains(entry.value.kind))); + bool timeMatches = _matchesTimeFilter(entry.value, filter); - if (authorsMatch && kindsMatch) { + if (authorsMatch && kindsMatch && timeMatches) { // Clone the event from the map before signing to avoid mutating the stored original Nip01Event eventToAdd = entry.value.copyWith(); Nip01Event? eventToAddSigned; @@ -369,10 +375,24 @@ class MockRelay { } } } + + // Apply limit per filter - sort by created_at desc and take limit + if (filter.limit != null && eventsForThisFilter.length > filter.limit!) { + eventsForThisFilter.sort((a, b) => b.createdAt.compareTo(a.createdAt)); + eventsForThisFilter = eventsForThisFilter.take(filter.limit!).toList(); + } + allMatchingEvents.addAll(eventsForThisFilter); } - for (final event in allMatchingEvents) { + // Apply global relay limit if configured + List eventsToSend = allMatchingEvents.toList(); + if (maxEventsPerRequest != null && eventsToSend.length > maxEventsPerRequest!) { + eventsToSend.sort((a, b) => b.createdAt.compareTo(a.createdAt)); + eventsToSend = eventsToSend.take(maxEventsPerRequest!).toList(); + } + + for (final event in eventsToSend) { webSocket.add(jsonEncode( ["EVENT", requestId, Nip01EventModel.fromEntity(event).toJson()])); } @@ -380,6 +400,17 @@ class MockRelay { webSocket.add(jsonEncode(["EOSE", requestId])); } + /// Check if event matches since/until time filters + bool _matchesTimeFilter(Nip01Event event, Filter filter) { + if (filter.since != null && event.createdAt < filter.since!) { + return false; + } + if (filter.until != null && event.createdAt > filter.until!) { + return false; + } + return true; + } + /// Sends event to all connected clients /// If key pair is provided, it will sign the event void sendEvent({ @@ -460,6 +491,16 @@ class MockRelay { return false; } + // Check since filter + if (filter.since != null && event.createdAt < filter.since!) { + return false; + } + + // Check until filter + if (filter.until != null && event.createdAt > filter.until!) { + return false; + } + // Check #p tag filter if (filter.pTags != null) { List eventPTags = event.tags diff --git a/packages/ndk/test/usecases/paginated_query_test.dart b/packages/ndk/test/usecases/paginated_query_test.dart new file mode 100644 index 000000000..ef5aa6f2b --- /dev/null +++ b/packages/ndk/test/usecases/paginated_query_test.dart @@ -0,0 +1,338 @@ +// ignore_for_file: avoid_print + +import 'package:test/test.dart'; +import 'package:ndk/ndk.dart'; +import 'package:ndk/shared/nips/nip01/bip340.dart'; +import 'package:ndk/shared/nips/nip01/key_pair.dart'; + +import '../mocks/mock_event_verifier.dart'; +import '../mocks/mock_relay.dart'; + +void main() async { + late KeyPair key1; + late MockRelay relay1; + late Ndk ndk; + late Bip340EventSigner signer; + + setUp(() async { + key1 = Bip340.generatePrivateKey(); + + signer = Bip340EventSigner( + privateKey: key1.privateKey, + publicKey: key1.publicKey, + ); + + relay1 = MockRelay( + name: "relay 1", + explicitPort: 6070, + maxEventsPerRequest: 2, + ); + + await relay1.startServer(); + + ndk = Ndk(NdkConfig( + eventVerifier: MockEventVerifier(), + cache: MemCacheManager(), + bootstrapRelays: [relay1.url], + logLevel: LogLevel.off, + )); + + ndk.accounts.loginExternalSigner(signer: signer); + }); + + tearDown(() async { + await ndk.destroy(); + await relay1.stopServer(); + }); + + group('Paginated Query', () { + test('fetches all events across multiple pages', () async { + final now = DateTime.now().millisecondsSinceEpoch ~/ 1000; + // Create 5 events spread across time + final timestamps = [ + now - 4000, + now - 3000, + now - 2000, + now - 1000, + now, + ]; + + // Publish events to the relay + for (final ts in timestamps) { + final event = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $ts", + tags: [], + createdAt: ts, + ); + final response = ndk.broadcast.broadcast( + nostrEvent: event, + ); + await response.broadcastDoneFuture; + } + + await ndk.config.cache.clearAll(); + + final query = ndk.requests.query( + filter: Filter( + kinds: [Nip01Event.kTextNodeKind], + authors: [key1.publicKey], + ), + paginate: true, + ); + + final events = await query.future; + + expect(events.length, equals(5)); + }); + + test('respects since parameter', () async { + final now = DateTime.now().millisecondsSinceEpoch ~/ 1000; + final timestamps = [ + now - 5000, + now - 4000, + now - 3000, + now - 2000, + now - 1000, + ]; + + // Publish events + for (final ts in timestamps) { + final event = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $ts", + tags: [], + createdAt: ts, + ); + final response = ndk.broadcast.broadcast( + nostrEvent: event, + ); + await response.broadcastDoneFuture; + } + + await ndk.config.cache.clearAll(); + + // Query with since that excludes older events + final query = ndk.requests.query( + filter: Filter( + kinds: [Nip01Event.kTextNodeKind], + authors: [key1.publicKey], + since: now - 3500, // Should only get events from -3000, -2000, -1000 + ), + paginate: true, + ); + + final events = await query.future; + // Should return only 3 events (those after since) + expect(events.length, equals(3)); + + // Verify all events are after 'since' + for (final event in events) { + expect(event.createdAt, greaterThanOrEqualTo(now - 3500)); + } + }); + + test('respects until parameter', () async { + final now = DateTime.now().millisecondsSinceEpoch ~/ 1000; + final timestamps = [ + now - 5000, + now - 4000, + now - 3000, + now - 2000, + now - 1000, + ]; + + // Publish events + for (final ts in timestamps) { + final event = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $ts", + tags: [], + createdAt: ts, + ); + final response = ndk.broadcast.broadcast( + nostrEvent: event, + ); + await response.broadcastDoneFuture; + } + + await ndk.config.cache.clearAll(); + + // Query with until that excludes recent events + final query = ndk.requests.query( + filter: Filter( + kinds: [Nip01Event.kTextNodeKind], + authors: [key1.publicKey], + until: now - 2500, // Should only get events from -5000, -4000, -3000 + ), + paginate: true, + ); + + final events = await query.future; + // Should return only 3 events (those before until) + expect(events.length, equals(3)); + + // Verify all events are before 'until' + for (final event in events) { + expect(event.createdAt, lessThanOrEqualTo(now - 2500)); + } + }); + + test('handles events with same timestamp', () async { + final now = DateTime.now().millisecondsSinceEpoch ~/ 1000; + final sameTimestamp = now - 2000; + + final timestamps = [ + now - 4000, + sameTimestamp, // same + sameTimestamp, // same + sameTimestamp, // same + now - 1000, + ]; + + // Publish events + for (int i = 0; i < timestamps.length; i++) { + final event = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event $i at ${timestamps[i]}", + tags: [], + createdAt: timestamps[i], + ); + final response = ndk.broadcast.broadcast( + nostrEvent: event, + ); + await response.broadcastDoneFuture; + } + + await ndk.config.cache.clearAll(); + + final query = ndk.requests.query( + filter: Filter( + kinds: [Nip01Event.kTextNodeKind], + authors: [key1.publicKey], + ), + paginate: true, + ); + + final events = await query.future; + + expect(events.length, equals(3)); + + // Verify no duplicates + final ids = events.map((e) => e.id).toSet(); + expect(ids.length, equals(3)); + }); + + test('paginates correctly across multiple relays', () async { + final now = DateTime.now().millisecondsSinceEpoch ~/ 1000; + + final firstEvent = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $now", + tags: [], + createdAt: now, + ); + final firstEventSigned = await signer.sign(firstEvent); + + final relay1EventTimeStamp = now - 3000; + final relay1Event = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $relay1EventTimeStamp", + tags: [], + createdAt: relay1EventTimeStamp, + ); + final relay1EventSigned = await signer.sign(relay1Event); + + final middleEventTimeStamp = now - 5000; + final middleEvent = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $middleEventTimeStamp", + tags: [], + createdAt: middleEventTimeStamp, + ); + final middleEventSigned = await signer.sign(middleEvent); + + final relay2EventTimeStamp = now - 7000; + final relay2Event = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $relay2EventTimeStamp", + tags: [], + createdAt: relay2EventTimeStamp, + ); + final relay2EventSigned = await signer.sign(relay2Event); + + final lastEventTimeStamp = now - 10000; + final lastEvent = Nip01Event( + kind: Nip01Event.kTextNodeKind, + pubKey: key1.publicKey, + content: "Event at $lastEventTimeStamp", + tags: [], + createdAt: lastEventTimeStamp, + ); + final lastEventSigned = await signer.sign(lastEvent); + + final relay1Events = [ + firstEventSigned, + relay1EventSigned, + middleEventSigned, + lastEventSigned + ]; + final relay2Events = [ + firstEventSigned, + relay2EventSigned, + lastEventSigned + ]; + + // Create a second relay + final relay2 = MockRelay( + name: "relay 2", + explicitPort: 6071, + ); + await relay2.startServer(); + + try { + for (final event in relay1Events) { + final response = ndk.broadcast.broadcast( + nostrEvent: event, + specificRelays: [relay1.url], + ); + await response.broadcastDoneFuture; + } + + for (final event in relay2Events) { + final response = ndk.broadcast.broadcast( + nostrEvent: event, + specificRelays: [relay2.url], + ); + await response.broadcastDoneFuture; + } + } finally {} + + await ndk.config.cache.clearAll(); + + // Query both relays with pagination + final query = ndk.requests.query( + filter: Filter( + kinds: [Nip01Event.kTextNodeKind], + authors: [key1.publicKey], + ), + explicitRelays: [relay1.url, relay2.url], + paginate: true, + ); + + final events = await query.future; + + expect(events.length, equals(5)); + + await relay2.stopServer(); + }); + }); +}