Async post-processing and batch queue inserts for import service#529
Closed
Async post-processing and batch queue inserts for import service#529
Conversation
…erts Queue workers previously blocked on post-processing (VFS notify + 1s FUSE propagation sleep, symlink/STRM creation, health check scheduling, ARR notifications) before picking up the next item. Dispatch that work to a bounded background pool and mark the queue item completed synchronously so workers return immediately after a successful import. Directory scans also inserted one queue row per file. Accumulate discovered files and flush every 100 into a single transaction via the existing AddBatchToQueue path, cutting per-file DB overhead on large libraries.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR refactors the NZB import service to improve throughput and responsiveness by:
Key Changes
Post-Processing Async Dispatch
handleProcessingSuccess()into a newdispatchPostProcessing()method that runs in a background goroutinepostProcessWG(WaitGroup) to track in-flight post-processing goroutines for graceful shutdownpostProcessSem(buffered channel) to bound concurrent post-processing to 32 goroutines, preventing runaway growth during bulk importsStop()to wait for in-flight post-processing with a 15-second timeout before shutdownBatch Queue Inserts
AddBatchToQueue()method toqueueAdapterForScannerto insert multiple items in a single database transactionQueueBatchItemstruct in the scanner package to describe pending queue insertionsDirectoryScanner.performScan()to accumulate discovered files in a pending batch (up to 100 items) before flushing to the databaseCode Organization
buildQueueItem()helper method to reduce duplication between single and batch insert pathsImplementation Details
s.ctx) rather than per-worker context, ensuring work survives worker pool resizing but is cancelled on full service shutdownhttps://claude.ai/code/session_01RK87AYKMQKBxk1k9cWafDC