diff --git a/internal/importer/scanner/directory.go b/internal/importer/scanner/directory.go index 50b5a1e4..892c50d1 100644 --- a/internal/importer/scanner/directory.go +++ b/internal/importer/scanner/directory.go @@ -14,13 +14,28 @@ import ( // QueueAdder defines the interface for adding items to the queue type QueueAdder interface { AddToQueue(ctx context.Context, filePath string, relativePath *string, metadata *string) error + // AddBatchToQueue inserts multiple pending items in a single DB transaction. + // Used by the directory scanner to amortise per-file insert overhead. + AddBatchToQueue(ctx context.Context, items []QueueBatchItem) error IsFileInQueue(ctx context.Context, filePath string) bool IsFileProcessed(filePath string, scanRoot string) bool } +// QueueBatchItem describes a pending queue insertion discovered during a scan. +type QueueBatchItem struct { + FilePath string + RelativePath *string + Metadata *string +} + // defaultMaxScanDepth prevents runaway traversal of deep or cyclically-linked trees. const defaultMaxScanDepth = 10 +// scanBatchSize is the number of discovered files accumulated before flushing +// to the database as a single batch insert. Bigger batches improve throughput +// but hold the transaction longer and delay visibility to queue workers. +const scanBatchSize = 100 + // DirectoryScanner handles manual directory scanning for NZB/STRM files type DirectoryScanner struct { queueAdder QueueAdder @@ -136,6 +151,22 @@ func (d *DirectoryScanner) performScan(ctx context.Context, scanPath string) { d.log.DebugContext(ctx, "Scanning directory for NZB files", "dir", scanPath) + pending := make([]QueueBatchItem, 0, scanBatchSize) + flush := func() { + if len(pending) == 0 { + return + } + if err := d.queueAdder.AddBatchToQueue(ctx, pending); err != nil { + d.log.ErrorContext(ctx, "Failed to add batch to queue during scan", + "batch_size", len(pending), "error", err) + } else { + d.mu.Lock() + d.info.FilesAdded += len(pending) + d.mu.Unlock() + } + pending = pending[:0] + } + err := filepath.WalkDir(scanPath, func(path string, entry fs.DirEntry, err error) error { // Check for cancellation select { @@ -186,21 +217,23 @@ func (d *DirectoryScanner) performScan(ctx context.Context, scanPath string) { } if d.queueAdder.IsFileProcessed(path, scanPath) { - d.log.DebugContext(ctx, "Skipping file - already processed", "file", path) return nil } - if err := d.queueAdder.AddToQueue(ctx, path, &scanPath, nil); err != nil { - d.log.ErrorContext(ctx, "Failed to add file to queue during scan", "file", path, "error", err) + pending = append(pending, QueueBatchItem{ + FilePath: path, + RelativePath: &scanPath, + }) + if len(pending) >= scanBatchSize { + flush() } - d.mu.Lock() - d.info.FilesAdded++ - d.mu.Unlock() - return nil }) + // Flush any remaining items accumulated before walk end or early return. + flush() + if err != nil && !strings.Contains(err.Error(), "scan cancelled") { d.log.ErrorContext(ctx, "Failed to scan directory", "dir", scanPath, "error", err) d.mu.Lock() diff --git a/internal/importer/service.go b/internal/importer/service.go index 63020f41..906961e8 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -63,13 +63,29 @@ type queueAdapterForScanner struct { } func (a *queueAdapterForScanner) AddToQueue(ctx context.Context, filePath string, relativePath *string, metadata *string) error { + item := a.buildQueueItem(filePath, relativePath, metadata) + return a.repo.AddToQueue(ctx, item) +} + +func (a *queueAdapterForScanner) AddBatchToQueue(ctx context.Context, items []scanner.QueueBatchItem) error { + if len(items) == 0 { + return nil + } + queueItems := make([]*database.ImportQueueItem, 0, len(items)) + for _, it := range items { + queueItems = append(queueItems, a.buildQueueItem(it.FilePath, it.RelativePath, it.Metadata)) + } + return a.repo.AddBatchToQueue(ctx, queueItems) +} + +func (a *queueAdapterForScanner) buildQueueItem(filePath string, relativePath *string, metadata *string) *database.ImportQueueItem { // Calculate file size before adding to queue var fileSize *int64 if size, err := a.calcFileSize(filePath); err == nil { fileSize = &size } - item := &database.ImportQueueItem{ + return &database.ImportQueueItem{ DownloadID: nil, // Generated later in service if needed NzbPath: filePath, RelativePath: relativePath, @@ -81,8 +97,6 @@ func (a *queueAdapterForScanner) AddToQueue(ctx context.Context, filePath string Metadata: metadata, CreatedAt: time.Now(), } - - return a.repo.AddToQueue(ctx, item) } func (a *queueAdapterForScanner) IsFileInQueue(ctx context.Context, filePath string) bool { @@ -205,8 +219,20 @@ type Service struct { // HandleFailure can clean them up without changing the ItemProcessor interface. // Keys are item.ID (int64), values are []string. writtenPathsCache sync.Map + + // postProcessWG tracks in-flight async post-processing goroutines so Stop() + // can wait for them to complete gracefully. + postProcessWG sync.WaitGroup + + // postProcessSem bounds the number of concurrent post-processing goroutines + // to avoid runaway goroutine creation when many items complete in a burst. + postProcessSem chan struct{} } +// maxConcurrentPostProcess caps concurrent post-processing goroutines to prevent +// unbounded growth during large import bursts (NzbDav migrations, bulk scans). +const maxConcurrentPostProcess = 32 + // NewService creates a new NZB import service with manual scanning and queue processing capabilities func NewService(config ServiceConfig, metadataService *metadata.MetadataService, database *database.DB, poolManager pool.Manager, rcloneClient rclonecli.RcloneRcClient, configGetter config.ConfigGetter, healthRepo *database.HealthRepository, broadcaster *progress.ProgressBroadcaster, userRepo *database.UserRepository) (*Service, error) { // Set defaults @@ -245,6 +271,7 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService, cancel: cancel, cancelFuncs: make(map[int64]context.CancelFunc), paused: false, + postProcessSem: make(chan struct{}, maxConcurrentPostProcess), } // Set recorder for processor @@ -427,6 +454,21 @@ func (s *Service) Stop(ctx context.Context) error { // Stop directory watcher s.watcher.Stop() + // Wait for in-flight async post-processing to finish with a bounded timeout + // so shutdown doesn't hang on stuck external calls (ARR, rclone). + postProcDone := make(chan struct{}) + go func() { + s.postProcessWG.Wait() + close(postProcDone) + }() + select { + case <-postProcDone: + case <-time.After(15 * time.Second): + s.log.WarnContext(ctx, "Timeout waiting for post-processing to finish") + case <-ctx.Done(): + s.log.WarnContext(ctx, "Context cancelled while waiting for post-processing") + } + // Cancel service context s.cancel() @@ -974,7 +1016,11 @@ func (s *Service) resolveCategoryPath(category string) string { return category } -// handleProcessingSuccess handles all steps after successful NZB processing +// handleProcessingSuccess persists the successful import result and dispatches +// post-processing (VFS notify, symlinks, STRM, health checks, ARR notifications, +// NZB cleanup) to a background goroutine. Returning quickly lets the worker +// claim the next queue item instead of blocking on the ~1s VFS propagation +// delay and downstream network I/O. func (s *Service) handleProcessingSuccess(ctx context.Context, item *database.ImportQueueItem, resultingPath string) error { // Add storage path to database if err := s.database.Repository.AddStoragePath(ctx, item.ID, resultingPath); err != nil { @@ -982,27 +1028,9 @@ func (s *Service) handleProcessingSuccess(ctx context.Context, item *database.Im return err } - // Refresh mount path if needed before post-processing - s.postProcessor.RefreshMountPathIfNeeded(ctx, resultingPath, item.ID) - - // Delegate all post-processing to the coordinator - // This handles: VFS notification, symlinks, ID links, STRM files, health checks, ARR notifications - result, err := s.postProcessor.HandleSuccess(ctx, item, resultingPath) - if err != nil { - s.log.ErrorContext(ctx, "Post-processing failed", "queue_id", item.ID, "error", err) - return err - } - - // Log any non-fatal errors from post-processing - if len(result.Errors) > 0 { - for _, postErr := range result.Errors { - s.log.WarnContext(ctx, "Post-processing warning", - "queue_id", item.ID, - "error", postErr) - } - } - - // Mark as completed in queue database + // Mark as completed in queue database before post-processing. The import + // itself has succeeded — downstream link/notification failures are logged + // as warnings and should not block completion or the next worker pickup. if err := s.database.Repository.UpdateQueueItemStatus(ctx, item.ID, database.QueueStatusCompleted, nil); err != nil { s.log.ErrorContext(ctx, "Failed to mark item as completed", "queue_id", item.ID, "error", err) return err @@ -1025,18 +1053,73 @@ func (s *Service) handleProcessingSuccess(ctx context.Context, item *database.Im s.log.InfoContext(ctx, "Successfully processed queue item", "queue_id", item.ID, "file", item.NzbPath) - // Handle cleanup of completed NZB if configured - cfg := s.configGetter() - if cfg.Metadata.DeleteCompletedNzb != nil && *cfg.Metadata.DeleteCompletedNzb { - s.log.InfoContext(ctx, "Deleting completed NZB (per config)", "file", item.NzbPath) - if err := os.Remove(item.NzbPath); err != nil { - s.log.WarnContext(ctx, "Failed to delete completed NZB", "file", item.NzbPath, "error", err) - } - } + s.dispatchPostProcessing(item, resultingPath) return nil } +// dispatchPostProcessing runs all post-import work (VFS notify + propagation +// delay, symlinks, STRM, health checks, ARR notifications, NZB cleanup) in a +// background goroutine so the queue worker is freed immediately. Concurrency +// is bounded by postProcessSem and Stop() waits for in-flight work via +// postProcessWG. +func (s *Service) dispatchPostProcessing(item *database.ImportQueueItem, resultingPath string) { + s.postProcessWG.Add(1) + + // Acquire semaphore slot. This is best-effort: if the service context is + // cancelled (shutdown), skip post-processing entirely. + select { + case s.postProcessSem <- struct{}{}: + case <-s.ctx.Done(): + s.postProcessWG.Done() + return + } + + go func() { + defer func() { + <-s.postProcessSem + s.postProcessWG.Done() + }() + + // Use the service-level context so post-processing survives per-worker + // context cancellation (e.g. worker resize) and is only cancelled on + // full service shutdown. + ctx := s.ctx + + // Refresh mount path if needed before post-processing + s.postProcessor.RefreshMountPathIfNeeded(ctx, resultingPath, item.ID) + + // Delegate all post-processing to the coordinator + // This handles: VFS notification, symlinks, ID links, STRM files, health checks, ARR notifications + result, err := s.postProcessor.HandleSuccess(ctx, item, resultingPath) + if err != nil { + if ctx.Err() != nil { + // Shutdown — not a real failure + return + } + s.log.ErrorContext(ctx, "Post-processing failed", "queue_id", item.ID, "error", err) + return + } + + if result != nil && len(result.Errors) > 0 { + for _, postErr := range result.Errors { + s.log.WarnContext(ctx, "Post-processing warning", + "queue_id", item.ID, + "error", postErr) + } + } + + // Handle cleanup of completed NZB if configured + cfg := s.configGetter() + if cfg != nil && cfg.Metadata.DeleteCompletedNzb != nil && *cfg.Metadata.DeleteCompletedNzb { + s.log.InfoContext(ctx, "Deleting completed NZB (per config)", "file", item.NzbPath) + if err := os.Remove(item.NzbPath); err != nil { + s.log.WarnContext(ctx, "Failed to delete completed NZB", "file", item.NzbPath, "error", err) + } + } + }() +} + // OnItemClaimed implements queue.QueueEventListener. It broadcasts a queue-changed // notification whenever a worker claims a pending item (pending → processing transition). func (s *Service) OnItemClaimed(ctx context.Context, item *database.ImportQueueItem) {