Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions packages/ndk/lib/domain_layer/usecases/requests/requests.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../../config/request_defaults.dart';
import '../../../shared/logger/logger.dart';
import '../../../shared/nips/nip01/helpers.dart';
Expand All @@ -23,6 +25,13 @@ import '../stream_response_cleaner/stream_response_cleaner.dart';
import 'concurrency_check.dart';
import 'verify_event_stream.dart';

/// Internal state for tracking pagination progress on a single relay
class _RelayPaginationState {
int? oldestTimestamp;
int? currentUntil;
bool exhausted = false;
}

/// A class that handles low-level Nostr network requests and subscriptions.
class Requests {
final GlobalState _globalState;
Expand Down Expand Up @@ -78,6 +87,7 @@ class Requests {
/// [timeoutCallbackUserFacing] A user facing timeout callback, this callback should be given to the lib user \
/// [timeoutCallback] An internal timeout callback, this callback should be used for internal error handling \
/// [authenticateAs] List of accounts to authenticate with on relays (NIP-42) \
/// [paginate] If true, automatically paginates backwards through time to fetch all events in the range \
///
/// Returns an [NdkResponse] containing the query result stream, future
NdkResponse query({
Expand All @@ -95,13 +105,30 @@ class Requests {
Iterable<String>? explicitRelays,
int? desiredCoverage,
List<Account>? authenticateAs,
bool paginate = false,
}) {
if (filter == null && (filters == null || filters.isEmpty)) {
throw ArgumentError('Either filter or filters must be provided');
}
final effectiveFilters = filter != null ? [filter] : filters!;
timeout ??= _defaultQueryTimeout;

if (paginate) {
return _paginatedQuery(
filter: effectiveFilters.first,
name: name,
relaySet: relaySet,
cacheRead: cacheRead,
cacheWrite: cacheWrite,
timeout: timeout,
timeoutCallbackUserFacing: timeoutCallbackUserFacing,
timeoutCallback: timeoutCallback,
explicitRelays: explicitRelays,
desiredCoverage: desiredCoverage,
authenticateAs: authenticateAs,
);
}

return requestNostrEvent(NdkRequest.query(
'$name-${Helpers.getRandomString(10)}',
name: name,
Expand Down Expand Up @@ -299,6 +326,164 @@ class Requests {
return response;
}

/// Performs a paginated query that fetches all events in a time range
/// by making multiple requests per relay, each time adjusting the `until` parameter
/// to fetch older events until `since` is reached or no more events are returned.
/// Pagination is done independently per relay to avoid skipping events.
NdkResponse _paginatedQuery({
required Filter filter,
String name = '',
RelaySet? relaySet,
bool cacheRead = true,
bool cacheWrite = true,
Duration? timeout,
Function()? timeoutCallbackUserFacing,
Function()? timeoutCallback,
Iterable<String>? explicitRelays,
int? desiredCoverage,
List<Account>? authenticateAs,
}) {
final requestId = '$name-paginated-${Helpers.getRandomString(10)}';
final aggregatedController = ReplaySubject<Nip01Event>();
final seenEventIds = <String>{};

Future<void> paginate() async {
final since = filter.since;

// First request to discover relays and get initial events
final initialResponse = requestNostrEvent(NdkRequest.query(
'$name-page-initial-${Helpers.getRandomString(5)}',
name: name,
filters: [filter.clone()],
relaySet: relaySet,
cacheRead: cacheRead,
cacheWrite: cacheWrite,
timeoutDuration: timeout,
timeoutCallbackUserFacing: timeoutCallbackUserFacing,
timeoutCallback: timeoutCallback,
explicitRelays: explicitRelays,
desiredCoverage:
desiredCoverage ?? RequestDefaults.DEFAULT_BEST_RELAYS_MIN_COUNT,
authenticateAs: authenticateAs,
));

final initialEvents = await initialResponse.future;

// Emit initial events and discover relays
final relayState = <String, _RelayPaginationState>{};

for (final event in initialEvents) {
if (!seenEventIds.contains(event.id)) {
seenEventIds.add(event.id);
aggregatedController.add(event);
}

// Track oldest timestamp per relay
for (final relay in event.sources) {
final state = relayState.putIfAbsent(
relay,
() => _RelayPaginationState(),
);
if (state.oldestTimestamp == null ||
event.createdAt < state.oldestTimestamp!) {
state.oldestTimestamp = event.createdAt;
}
}
}

// If no events or no relays discovered, we're done
if (initialEvents.isEmpty || relayState.isEmpty) {
await aggregatedController.close();
return;
}

// Initialize relay states
for (final entry in relayState.entries) {
final state = entry.value;
if (state.oldestTimestamp != null) {
state.currentUntil = state.oldestTimestamp! - 1;
// Check if already reached since
if (since != null && state.oldestTimestamp! <= since) {
state.exhausted = true;
}
} else {
state.exhausted = true;
}
}

// Paginate each relay independently
while (relayState.values.any((s) => !s.exhausted)) {
// Get active relays
final activeRelays = relayState.entries
.where((e) => !e.value.exhausted)
.map((e) => e.key)
.toList();

// Make parallel requests to all active relays
final futures = activeRelays.map((relay) async {
final state = relayState[relay]!;
final pageFilter = filter.clone();
pageFilter.until = state.currentUntil;

final response = requestNostrEvent(NdkRequest.query(
'$name-page-${Helpers.getRandomString(5)}',
name: name,
filters: [pageFilter],
relaySet: relaySet,
cacheRead: false, // Don't read from cache for subsequent pages
cacheWrite: cacheWrite,
timeoutDuration: timeout,
timeoutCallbackUserFacing: timeoutCallbackUserFacing,
timeoutCallback: timeoutCallback,
explicitRelays: [relay],
desiredCoverage: 1,
authenticateAs: authenticateAs,
));

return MapEntry(relay, await response.future);
});

final results = await Future.wait(futures);

// Process results
for (final result in results) {
final relay = result.key;
final pageEvents = result.value;
final state = relayState[relay]!;

int? oldestTimestamp;
for (final event in pageEvents) {
if (!seenEventIds.contains(event.id)) {
seenEventIds.add(event.id);
aggregatedController.add(event);
}
// Track oldest timestamp for this relay
if (oldestTimestamp == null || event.createdAt < oldestTimestamp) {
oldestTimestamp = event.createdAt;
}
}

if (pageEvents.isEmpty || oldestTimestamp == null) {
state.exhausted = true;
} else {
state.currentUntil = oldestTimestamp - 1;
// Check if reached since
if (since != null && oldestTimestamp <= since) {
state.exhausted = true;
}
}
}
}

await aggregatedController.close();
}

// Start pagination asynchronously
paginate();

return NdkResponse(requestId, aggregatedController.stream);
}

/// Records fetched ranges for each relay that received EOSE
/// - If events received: use min/max of event timestamps
/// - If no events + filter has since/until: use filter bounds
Expand Down
57 changes: 49 additions & 8 deletions packages/ndk/test/mocks/mock_relay.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class MockRelay {
bool allwaysSendBadJson;
bool sendMalformedEvents;
String? customWelcomeMessage;
int? maxEventsPerRequest;

// NIP-46 Remote Signer Support
static const int kNip46Kind = BunkerRequest.kKind;
Expand All @@ -56,6 +57,7 @@ class MockRelay {
this.allwaysSendBadJson = false,
this.sendMalformedEvents = false,
this.customWelcomeMessage,
this.maxEventsPerRequest,
int? explicitPort,
}) : _nip65s = nip65s {
if (explicitPort != null) {
Expand Down Expand Up @@ -277,7 +279,8 @@ class MockRelay {
filter.authors != null &&
filter.authors!.isNotEmpty) {
eventsForThisFilter.addAll(_contactLists.values
.where((e) => filter.authors!.contains(e.pubKey))
.where((e) => filter.authors!.contains(e.pubKey) &&
_matchesTimeFilter(e, filter))
.toList());
}
// Match against metadatas
Expand All @@ -286,7 +289,8 @@ class MockRelay {
filter.authors != null &&
filter.authors!.isNotEmpty) {
eventsForThisFilter.addAll(_metadatas.values
.where((e) => filter.authors!.contains(e.pubKey))
.where((e) => filter.authors!.contains(e.pubKey) &&
_matchesTimeFilter(e, filter))
.toList());
}
// General event matching (storedEvents and textNotes)
Expand All @@ -298,8 +302,8 @@ class MockRelay {
filter.authors == null || filter.authors!.contains(event.pubKey);
bool idsMatches =
filter.ids == null || filter.ids!.contains(event.id);
// Add other tag-based filtering if necessary, e.g., #e, #p tags
return kindMatches && authorMatches && idsMatches;
bool timeMatches = _matchesTimeFilter(event, filter);
return kindMatches && authorMatches && idsMatches && timeMatches;
}).toList());

if (textNotes != null) {
Expand All @@ -310,8 +314,8 @@ class MockRelay {
filter.authors!.contains(event.pubKey);
bool idsMatches =
filter.ids == null || filter.ids!.contains(event.id);
// Add other tag-based filtering if necessary
return kindMatches && authorMatches && idsMatches;
bool timeMatches = _matchesTimeFilter(event, filter);
return kindMatches && authorMatches && idsMatches && timeMatches;
}).toList());
}
}
Expand All @@ -324,6 +328,7 @@ class MockRelay {
(filter.kinds == null || filter.kinds!.contains(Nip65.kKind))) {
Nip01Event eventToAdd =
entry.value.toEvent(); // Creates a new event instance
if (!_matchesTimeFilter(eventToAdd, filter)) continue;
final Nip01Event? eventToAddSigned;
if (signEvents && entry.key.privateKey != null) {
// Sign the new instance, not the one in _nip65s
Expand Down Expand Up @@ -354,8 +359,9 @@ class MockRelay {
(filter.kinds!.any((k) =>
Nip51List.kPossibleKinds.contains(k) &&
Nip51List.kPossibleKinds.contains(entry.value.kind)));
bool timeMatches = _matchesTimeFilter(entry.value, filter);

if (authorsMatch && kindsMatch) {
if (authorsMatch && kindsMatch && timeMatches) {
// Clone the event from the map before signing to avoid mutating the stored original
Nip01Event eventToAdd = entry.value.copyWith();
Nip01Event? eventToAddSigned;
Expand All @@ -369,17 +375,42 @@ class MockRelay {
}
}
}

// Apply limit per filter - sort by created_at desc and take limit
if (filter.limit != null && eventsForThisFilter.length > filter.limit!) {
eventsForThisFilter.sort((a, b) => b.createdAt.compareTo(a.createdAt));
eventsForThisFilter = eventsForThisFilter.take(filter.limit!).toList();
}

allMatchingEvents.addAll(eventsForThisFilter);
}

for (final event in allMatchingEvents) {
// Apply global relay limit if configured
List<Nip01Event> eventsToSend = allMatchingEvents.toList();
if (maxEventsPerRequest != null && eventsToSend.length > maxEventsPerRequest!) {
eventsToSend.sort((a, b) => b.createdAt.compareTo(a.createdAt));
eventsToSend = eventsToSend.take(maxEventsPerRequest!).toList();
}

for (final event in eventsToSend) {
webSocket.add(jsonEncode(
["EVENT", requestId, Nip01EventModel.fromEntity(event).toJson()]));
}

webSocket.add(jsonEncode(["EOSE", requestId]));
}

/// Check if event matches since/until time filters
bool _matchesTimeFilter(Nip01Event event, Filter filter) {
if (filter.since != null && event.createdAt < filter.since!) {
return false;
}
if (filter.until != null && event.createdAt > filter.until!) {
return false;
}
return true;
}

/// Sends event to all connected clients
/// If key pair is provided, it will sign the event
void sendEvent({
Expand Down Expand Up @@ -460,6 +491,16 @@ class MockRelay {
return false;
}

// Check since filter
if (filter.since != null && event.createdAt < filter.since!) {
return false;
}

// Check until filter
if (filter.until != null && event.createdAt > filter.until!) {
return false;
}

// Check #p tag filter
if (filter.pTags != null) {
List<String> eventPTags = event.tags
Expand Down
Loading