Add relay aggregator service for mirroring events from follows#122
Open
greenart7c3 wants to merge 20 commits intomasterfrom
Open
Add relay aggregator service for mirroring events from follows#122greenart7c3 wants to merge 20 commits intomasterfrom
greenart7c3 wants to merge 20 commits intomasterfrom
Conversation
Adds a live outbox-style aggregator that pulls events for a configured pubkey's follows from each follow's NIP-65 write relays, plus any events p-tagging the aggregator on its read relays. Events are written through the normal innerProcessEvent pipeline so local clients connected to ws://127.0.0.1:4869 see them immediately. - RelayAggregator: supervises subscriptions, refreshes every N minutes, reconnects on disconnect, respects isImportingEvents - Settings: relayAggregatorEnabled, aggregatorPubkey, kinds set (defaults 0,1,3,6,7,10002,30023), refresh minutes, includeTagged, lastSync - SettingsScreen: new "Relay Aggregator" section - WebSocketServerService: starts/stops aggregator with the server
Bootstrap contact-list and NIP-65 lookups (in RelayAggregator and EventDownloader) now query purplepag.es, user.kindpag.es, profiles.nostr1.com and directory.yabu.me instead of purplepag.es + relay.nostr.band.
Adds NIP-09 deletion requests (5) and NIP-22 comments (1111) to the default relay aggregator kinds so delete events and threaded comments are mirrored alongside the other baseline social kinds.
Exposes an AggregatorStatus StateFlow from RelayAggregator with phase (idle / bootstrapping / refreshing / listening), author count, configured and currently-connected relay counts, lifetime events received, and last-refresh timestamp. Connection liveness is tracked from onIncomingMessage/onDisconnected/onCannotConnect callbacks on subscribed relays. HomeScreen renders an AggregatorStatusCard below the relay start/stop controls whenever relayAggregatorEnabled is true, so users can see at a glance what the aggregator is doing and how many of its outbox relays are alive.
Extends the WebSocket service's persistent notification with an aggregator subtitle (phase + relays connected) and a BigTextStyle expansion containing full details (phase description, author and relay counts, lifetime events received, last refresh age). A scope coroutine collects RelayAggregator.status and re-issues notify(id=1, ...) on every change so users see real-time aggregator progress without opening the app.
The previous BOOTSTRAP_RELAYS list (purplepag.es, user.kindpag.es, profiles.nostr1.com, directory.yabu.me) was also serving as the outbox fallback for follows without a published NIP-65 and for the tagged-events subscription. Those four relays are profile / relay-list indexers and do not carry kind-1 notes, reposts or reactions, so any follow routed to that fallback produced ~zero content events. Split the constant in two: - INDEXER_RELAYS keeps the four indexers for kind-3 and kind-10002 bootstrap lookups (their original purpose). - FALLBACK_OUTBOX_RELAYS (relay.damus.io, nos.lol, relay.primal.net) is used when a follow lacks NIP-65 and is unioned into the tagged-events subscription so it works even when the aggregator pubkey has no read relays advertised.
Two problems were causing the aggregator to receive a trickle of events and then stop: 1. Bootstrapping NIP-65 ran one fetchFirst per follow, sequentially. With a few hundred follows that could take many minutes before the outbox subscriptions were even opened. 2. When a tracked outbox relay dropped, Quartz would reconnect the socket but did not re-send our REQ. The old refresh-on-disconnect kick was debounced to 30s and gated by a single-flight lock, so any churn could leave an outbox relay silently unsubscribed until the next hourly refresh. Changes: - Batch NIP-65 lookups for all uncached follows into a single subscription against the indexer relays, collected until EOSE from every relay or a 15 s timeout. - Keep a per-relay map of active (subId, filters) pairs; re-push each one immediately on any onDisconnected / onCannotConnect for a tracked relay (debounced per relay so a reconnect storm doesn't spam). - Add a withTimeoutOrNull around the contact-list bootstrap so a dead indexer can't stall start-up. - More informative logs during bootstrap (follow count, cache hit/miss, batch fetch size).
- batchFetchRelayLists reused the same subId across author chunks, so Quartz's addOrUpdate() replaced the filter set on every iteration and only the last chunk was actually queried at the indexers. Pack every chunk as multiple filters in a single subscribe call so every follow's outbox is resolved. - The p-tagged subscription iterated taggedSubRelays calling subscribe(id, mapOf(relay to filters)) per relay, with the same id. Each call sent a CLOSE to the previous relay and a REQ to the new one, leaving the sub active on just the last relay. Issue one subscribe with the full relay set instead. - Lower MAX_AUTHORS_PER_SUB from 500 to 100 so stricter relays don't reject the filter; chunks now ride in a single REQ anyway. - Install a shared OkHttp Dispatcher (maxRequests=1024, maxRequestsPerHost=32) so hundreds of relay WebSocket upgrades can happen in parallel instead of queuing behind the default 64-slot pool. - Drop the manual scheduleResub / resendSubsForRelay machinery. Quartz's NostrClient.onConnected() already calls syncFilters() which re-sends every desired REQ on reconnect. The manual path just turned flapping or unreachable relays into a resub storm. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- onConfigChanged now detects identity changes by comparing the new Settings.aggregatorPubkey against currentPubkey (the pubkey the running instance was started for). On change, stop() drops every subscription and zeroes eventsReceived / authorCount, lastSync is reset so the new user gets the full cold-start window, and start() kicks off a refresh with forceFreshBootstrap set. - loadOrBootstrapContactList gains a forceRefresh flag. When true it bypasses the DB cache and fetches from the indexer set; if the fetched event is older than what we had cached, the cache wins, so a failed/partial fetch never downgrades the follow list. - Added logs throughout the refresh lifecycle: config changes, refresh start / sleep, contact-list resolution timing, NIP-65 cache hit/miss + fetch timing, outbox fan-out summary (including how many authors fell back to FALLBACK_OUTBOX_RELAYS), per-refresh main-sub counts (new / reused / shrunk / dropped), tagged-sub targets, and a final "Refresh complete in Xms" line. - Listener logs a one-line "Relay live" on first message per relay, "Disconnected" / "Cannot connect" with a running counted ratio, and a heartbeat every 100 received events so the stream is visible without per-event spam. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds Settings.relayAggregatorExtraRelays (Set<String>) persisted via LocalPreferences. When the aggregator is enabled with no pubkey but a non-empty relay list, it subscribes to each listed relay with a plain kinds + since filter (no authors, no NIP-65 lookup, no contact list bootstrap, no tagged sub) and reuses the existing diff-vs-activeRelaySubs pipeline so add/remove of relays takes effect on the next config change. - Settings / LocalPreferences: new field + pref key, round-tripped as a StringSet. - RelayAggregator: enable gate accepts pubkey OR extra relays; refresh has a no-pubkey branch that seeds relayToAuthors with empty author sets. The subscribe loop interprets an empty set as a single "unfiltered" chunk and omits the authors field from the Filter. Tagged sub is skipped in no-pubkey mode. - SettingsScreen: new relay list editor under the kinds list, backed by the existing PubkeyInputRow/PubkeyListItem components. Input accepts wss://, ws://, or a bare host (auto-prefixed to wss://); anything with a different scheme or no host is rejected with a toast. The aggregator enable switch now accepts either a pubkey or at least one relay and toasts otherwise. - strings.xml: labels, hint, validation error, combined pubkey-or-relays-required message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Removes the null check for `connection` when calling `EventSubscription.executeAll` in `EventDao`. Adds debug logging to track when events are sent to specific subscriptions and removes a redundant log entry during notification creation in `WebSocketServerService`.
Removes the `Log.d` call in `EventSubscription` that logged event and subscription IDs during distribution to reduce log noise.
Simplifies `insertEventWithTags` by removing the optional boolean toggle used to suppress subscription notifications. The method now always calls `EventSubscription.executeAll` upon a successful insertion. Updates call sites in `CitrineContentProvider` and `CustomWebSocketServer` to reflect the changed signature, removing instances where notifications were previously disabled.
- Track lastSeen per pubkey, seeded from the local DB at refresh start and updated as events arrive, so each author chunk's since is min(lastSeen)-overlap instead of a single global cursor. Authors are sorted by lastSeen DESC before chunking so caught-up pubkeys group together and a new follow no longer inherits the cohort's recent cursor. - Add an opt-in "Pause on mobile data" setting that suspends aggregator subscriptions whenever the active network is metered (mobile data or metered Wi-Fi) and resumes on unmetered Wi-Fi. Surfaces a new "Paused (metered network)" phase in the status card and the foreground-service notification. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The aggregator status flow fires on every received event and every relay connect/disconnect, which rewrote the foreground-service notification dozens of times per second and racked up wakelocks. Sample the flow so the notification is rebuilt at most once every 5 seconds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Without a kinds filter the tagged subscription pulled every event p-tagging the aggregator pubkey, including kinds the user explicitly opted out of via relayAggregatorKinds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The tagged subscription used to send a single REQ on the aggregator pubkey's own inbox with `#p = [aggPubkey]`, which only caught events addressed to the aggregator user. Replace it with a per-inbox-relay subscription map: for each follow (and self), look up their NIP-65 read relays, group authors by inbox relay, and send a `#p = chunk` filter on each relay (chunked at MAX_AUTHORS_PER_SUB). FALLBACK_OUTBOX_RELAYS are always included so we still catch taggers publishing to popular general-purpose relays. Tracks these in a parallel activeTaggedRelaySubs map so toggling relayAggregatorIncludeTagged off only tears down the tagged subs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…relays" This reverts commit 60002a5.
Without this, the aggregator listener could ingest new events into tables we're clearing, leaving stragglers behind. Stop the aggregator before clearAllTables() and restart it afterwards if the setting is still enabled. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Triggers a subscription rebuild when the pubkey changes regardless of the current `running` state. Also simplifies author list initialization in `Filter` using the `ifEmpty` extension function.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements a relay aggregator service that continuously mirrors events from a configured aggregator pubkey's social graph into the local relay. This allows the relay to serve as a personal event cache for a user's network.
Key Changes
New RelayAggregator service (
RelayAggregator.kt):CustomWebSocketService.server?.innerProcessEvent()Settings integration:
Settings.ktandLocalPreferences.ktUI additions (
SettingsScreen.kt):Service lifecycle (
WebSocketServerService.kt):String resources:
Implementation Details
sincefilter to catch any missed eventshttps://claude.ai/code/session_013E5Csdsg25KiCpqwvcT1y2