Skip to content

Optimize event batch import with parallel verification and transaction batching#123

Merged
greenart7c3 merged 3 commits intomasterfrom
claude/optimize-download-events-Jt7E2
May 4, 2026
Merged

Optimize event batch import with parallel verification and transaction batching#123
greenart7c3 merged 3 commits intomasterfrom
claude/optimize-download-events-Jt7E2

Conversation

@greenart7c3
Copy link
Copy Markdown
Owner

Summary

Refactored the event download and import pipeline to significantly improve performance and memory efficiency when importing large event archives. The changes introduce parallel signature verification, batch database operations, and a new streaming sink pattern for bounded memory usage.

Key Changes

EventDownloader.kt

  • Streaming sink pattern: Modified fetchAllEventsByUserPaginated() to accept a sink callback for processing completed batches, eliminating the need to accumulate all events in memory before writing
  • Parallel relay fetching: Refactored downloadAllEventsByUser() to fetch from multiple relays concurrently using coroutine async/awaitAll() with a semaphore limiting concurrency to 3 relays
  • Simplified subscription state: Replaced ConcurrentSet<Event> with a simple ArrayList buffer and changed event counting from storing full events to tracking received count
  • Improved error handling: Added try-finally block to ensure proper cleanup of subscriptions and connection listeners
  • Removed unnecessary delays: Eliminated the 2-second disconnect/reconnect delay that was blocking the import flow
  • Code cleanup: Removed verbose logging and unused finishedRelays tracking maps

CustomWebSocketServer.kt

  • New batch import method: Added innerProcessEventBatch() optimized for bulk imports with:
    • Parallel signature verification on Dispatchers.Default with semaphore-controlled concurrency
    • Batch duplicate detection using chunked IN (...) queries
    • Collapsing of replaceable events to newest per (pubkey, kind, d-tag) within the batch
    • Single Room transaction for all inserts/deletes
    • Skipping subscription fanout and offline-broadcast (irrelevant for self-archive imports)
  • Policy filtering: Extracted policyAllows() helper for consistent event filtering
  • Delete handling: Refactored deleteEvent() to use new applyDeleteTagsOwnedBy() helper for reuse in batch processing

EventDao.kt

  • Batch operations: Added insertEvents() and new batch helper insertEventsWithTagsBatch() for efficient multi-event inserts
  • Duplicate detection: Added existingIds() query to check which events already exist
  • Deletion tracking: Added getDeletedEventsByIds() to identify events deleted by kind-5 deletion events
  • Atomic batch writes: New insertEventsWithTagsBatch() transaction handles tag cleanup and event/tag insertion atomically

Notable Implementation Details

  • Batch size defaults to 500 events per relay request
  • Parallel verification uses a semaphore based on available CPU cores
  • Replaceable event deduplication happens in-memory before database writes to minimize queries
  • Database operations use chunked queries (500 items per chunk) to avoid SQLite parameter limits
  • The new batch import path is used exclusively by the download-your-events flow, leaving the real-time event processing path (innerProcessEvent) unchanged

https://claude.ai/code/session_01VP6tzr2EBN7dY4R9BTwLXr

claude and others added 3 commits April 23, 2026 20:13
Route the download-your-events flow through a new bulk-import path that
streams events directly into Room instead of accumulating them twice in
memory and paying per-event overhead designed for live WebSocket clients.

- EventDownloader: drop the in-memory receivedEvents list and
  ConcurrentSet buffer, stream each EOSE batch into a suspend sink, fan
  out relays concurrently under a Semaphore(3), ensure
  isImportingEvents is reset in a finally block, and remove hot-path
  per-frame logging.
- CustomWebSocketServer: add innerProcessEventBatch that verifies
  signatures in parallel on Dispatchers.Default, batches duplicate and
  kind-5 deletion checks with chunked IN (...) queries, collapses
  in-batch replaceables to the newest per (pubkey, kind[, d-tag]), and
  writes everything in one Room transaction. Extract
  applyDeleteTagsOwnedBy so both paths share the same delete-tag logic.
- EventDao: add existingIds, getDeletedEventsByIds, deleteTagsForIds,
  multi-row insertEvents, and insertEventsWithTagsBatch.

https://claude.ai/code/session_01VP6tzr2EBN7dY4R9BTwLXr
The relay's WebSocket thread kept appending to the batch buffer while
innerProcessEventBatch was iterating it (distinctBy/toList), producing
a ConcurrentModificationException. Guard the buffer with a lock and
atomically swap it for a fresh one before handing the previous batch
to the sink.

https://claude.ai/code/session_01VP6tzr2EBN7dY4R9BTwLXr
@greenart7c3 greenart7c3 merged commit ea578c0 into master May 4, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants