Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions internal/importer/scanner/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,28 @@ import (
// QueueAdder defines the interface for adding items to the queue
type QueueAdder interface {
AddToQueue(ctx context.Context, filePath string, relativePath *string, metadata *string) error
// AddBatchToQueue inserts multiple pending items in a single DB transaction.
// Used by the directory scanner to amortise per-file insert overhead.
AddBatchToQueue(ctx context.Context, items []QueueBatchItem) error
IsFileInQueue(ctx context.Context, filePath string) bool
IsFileProcessed(filePath string, scanRoot string) bool
}

// QueueBatchItem describes a pending queue insertion discovered during a scan.
type QueueBatchItem struct {
FilePath string
RelativePath *string
Metadata *string
}

// defaultMaxScanDepth prevents runaway traversal of deep or cyclically-linked trees.
const defaultMaxScanDepth = 10

// scanBatchSize is the number of discovered files accumulated before flushing
// to the database as a single batch insert. Bigger batches improve throughput
// but hold the transaction longer and delay visibility to queue workers.
const scanBatchSize = 100

// DirectoryScanner handles manual directory scanning for NZB/STRM files
type DirectoryScanner struct {
queueAdder QueueAdder
Expand Down Expand Up @@ -136,6 +151,22 @@ func (d *DirectoryScanner) performScan(ctx context.Context, scanPath string) {

d.log.DebugContext(ctx, "Scanning directory for NZB files", "dir", scanPath)

pending := make([]QueueBatchItem, 0, scanBatchSize)
flush := func() {
if len(pending) == 0 {
return
}
if err := d.queueAdder.AddBatchToQueue(ctx, pending); err != nil {
d.log.ErrorContext(ctx, "Failed to add batch to queue during scan",
"batch_size", len(pending), "error", err)
} else {
d.mu.Lock()
d.info.FilesAdded += len(pending)
d.mu.Unlock()
}
pending = pending[:0]
}

err := filepath.WalkDir(scanPath, func(path string, entry fs.DirEntry, err error) error {
// Check for cancellation
select {
Expand Down Expand Up @@ -186,21 +217,23 @@ func (d *DirectoryScanner) performScan(ctx context.Context, scanPath string) {
}

if d.queueAdder.IsFileProcessed(path, scanPath) {
d.log.DebugContext(ctx, "Skipping file - already processed", "file", path)
return nil
}

if err := d.queueAdder.AddToQueue(ctx, path, &scanPath, nil); err != nil {
d.log.ErrorContext(ctx, "Failed to add file to queue during scan", "file", path, "error", err)
pending = append(pending, QueueBatchItem{
FilePath: path,
RelativePath: &scanPath,
})
if len(pending) >= scanBatchSize {
flush()
}

d.mu.Lock()
d.info.FilesAdded++
d.mu.Unlock()

return nil
})

// Flush any remaining items accumulated before walk end or early return.
flush()

if err != nil && !strings.Contains(err.Error(), "scan cancelled") {
d.log.ErrorContext(ctx, "Failed to scan directory", "dir", scanPath, "error", err)
d.mu.Lock()
Expand Down
149 changes: 116 additions & 33 deletions internal/importer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,29 @@ type queueAdapterForScanner struct {
}

func (a *queueAdapterForScanner) AddToQueue(ctx context.Context, filePath string, relativePath *string, metadata *string) error {
item := a.buildQueueItem(filePath, relativePath, metadata)
return a.repo.AddToQueue(ctx, item)
}

func (a *queueAdapterForScanner) AddBatchToQueue(ctx context.Context, items []scanner.QueueBatchItem) error {
if len(items) == 0 {
return nil
}
queueItems := make([]*database.ImportQueueItem, 0, len(items))
for _, it := range items {
queueItems = append(queueItems, a.buildQueueItem(it.FilePath, it.RelativePath, it.Metadata))
}
return a.repo.AddBatchToQueue(ctx, queueItems)
}

func (a *queueAdapterForScanner) buildQueueItem(filePath string, relativePath *string, metadata *string) *database.ImportQueueItem {
// Calculate file size before adding to queue
var fileSize *int64
if size, err := a.calcFileSize(filePath); err == nil {
fileSize = &size
}

item := &database.ImportQueueItem{
return &database.ImportQueueItem{
DownloadID: nil, // Generated later in service if needed
NzbPath: filePath,
RelativePath: relativePath,
Expand All @@ -81,8 +97,6 @@ func (a *queueAdapterForScanner) AddToQueue(ctx context.Context, filePath string
Metadata: metadata,
CreatedAt: time.Now(),
}

return a.repo.AddToQueue(ctx, item)
}

func (a *queueAdapterForScanner) IsFileInQueue(ctx context.Context, filePath string) bool {
Expand Down Expand Up @@ -205,8 +219,20 @@ type Service struct {
// HandleFailure can clean them up without changing the ItemProcessor interface.
// Keys are item.ID (int64), values are []string.
writtenPathsCache sync.Map

// postProcessWG tracks in-flight async post-processing goroutines so Stop()
// can wait for them to complete gracefully.
postProcessWG sync.WaitGroup

// postProcessSem bounds the number of concurrent post-processing goroutines
// to avoid runaway goroutine creation when many items complete in a burst.
postProcessSem chan struct{}
}

// maxConcurrentPostProcess caps concurrent post-processing goroutines to prevent
// unbounded growth during large import bursts (NzbDav migrations, bulk scans).
const maxConcurrentPostProcess = 32

// NewService creates a new NZB import service with manual scanning and queue processing capabilities
func NewService(config ServiceConfig, metadataService *metadata.MetadataService, database *database.DB, poolManager pool.Manager, rcloneClient rclonecli.RcloneRcClient, configGetter config.ConfigGetter, healthRepo *database.HealthRepository, broadcaster *progress.ProgressBroadcaster, userRepo *database.UserRepository) (*Service, error) {
// Set defaults
Expand Down Expand Up @@ -245,6 +271,7 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService,
cancel: cancel,
cancelFuncs: make(map[int64]context.CancelFunc),
paused: false,
postProcessSem: make(chan struct{}, maxConcurrentPostProcess),
}

// Set recorder for processor
Expand Down Expand Up @@ -427,6 +454,21 @@ func (s *Service) Stop(ctx context.Context) error {
// Stop directory watcher
s.watcher.Stop()

// Wait for in-flight async post-processing to finish with a bounded timeout
// so shutdown doesn't hang on stuck external calls (ARR, rclone).
postProcDone := make(chan struct{})
go func() {
s.postProcessWG.Wait()
close(postProcDone)
}()
select {
case <-postProcDone:
case <-time.After(15 * time.Second):
s.log.WarnContext(ctx, "Timeout waiting for post-processing to finish")
case <-ctx.Done():
s.log.WarnContext(ctx, "Context cancelled while waiting for post-processing")
}

// Cancel service context
s.cancel()

Expand Down Expand Up @@ -974,35 +1016,21 @@ func (s *Service) resolveCategoryPath(category string) string {
return category
}

// handleProcessingSuccess handles all steps after successful NZB processing
// handleProcessingSuccess persists the successful import result and dispatches
// post-processing (VFS notify, symlinks, STRM, health checks, ARR notifications,
// NZB cleanup) to a background goroutine. Returning quickly lets the worker
// claim the next queue item instead of blocking on the ~1s VFS propagation
// delay and downstream network I/O.
func (s *Service) handleProcessingSuccess(ctx context.Context, item *database.ImportQueueItem, resultingPath string) error {
// Add storage path to database
if err := s.database.Repository.AddStoragePath(ctx, item.ID, resultingPath); err != nil {
s.log.ErrorContext(ctx, "Failed to add storage path", "queue_id", item.ID, "error", err)
return err
}

// Refresh mount path if needed before post-processing
s.postProcessor.RefreshMountPathIfNeeded(ctx, resultingPath, item.ID)

// Delegate all post-processing to the coordinator
// This handles: VFS notification, symlinks, ID links, STRM files, health checks, ARR notifications
result, err := s.postProcessor.HandleSuccess(ctx, item, resultingPath)
if err != nil {
s.log.ErrorContext(ctx, "Post-processing failed", "queue_id", item.ID, "error", err)
return err
}

// Log any non-fatal errors from post-processing
if len(result.Errors) > 0 {
for _, postErr := range result.Errors {
s.log.WarnContext(ctx, "Post-processing warning",
"queue_id", item.ID,
"error", postErr)
}
}

// Mark as completed in queue database
// Mark as completed in queue database before post-processing. The import
// itself has succeeded — downstream link/notification failures are logged
// as warnings and should not block completion or the next worker pickup.
if err := s.database.Repository.UpdateQueueItemStatus(ctx, item.ID, database.QueueStatusCompleted, nil); err != nil {
s.log.ErrorContext(ctx, "Failed to mark item as completed", "queue_id", item.ID, "error", err)
return err
Expand All @@ -1025,18 +1053,73 @@ func (s *Service) handleProcessingSuccess(ctx context.Context, item *database.Im

s.log.InfoContext(ctx, "Successfully processed queue item", "queue_id", item.ID, "file", item.NzbPath)

// Handle cleanup of completed NZB if configured
cfg := s.configGetter()
if cfg.Metadata.DeleteCompletedNzb != nil && *cfg.Metadata.DeleteCompletedNzb {
s.log.InfoContext(ctx, "Deleting completed NZB (per config)", "file", item.NzbPath)
if err := os.Remove(item.NzbPath); err != nil {
s.log.WarnContext(ctx, "Failed to delete completed NZB", "file", item.NzbPath, "error", err)
}
}
s.dispatchPostProcessing(item, resultingPath)

return nil
}

// dispatchPostProcessing runs all post-import work (VFS notify + propagation
// delay, symlinks, STRM, health checks, ARR notifications, NZB cleanup) in a
// background goroutine so the queue worker is freed immediately. Concurrency
// is bounded by postProcessSem and Stop() waits for in-flight work via
// postProcessWG.
func (s *Service) dispatchPostProcessing(item *database.ImportQueueItem, resultingPath string) {
s.postProcessWG.Add(1)

// Acquire semaphore slot. This is best-effort: if the service context is
// cancelled (shutdown), skip post-processing entirely.
select {
case s.postProcessSem <- struct{}{}:
case <-s.ctx.Done():
s.postProcessWG.Done()
return
}

go func() {
defer func() {
<-s.postProcessSem
s.postProcessWG.Done()
}()

// Use the service-level context so post-processing survives per-worker
// context cancellation (e.g. worker resize) and is only cancelled on
// full service shutdown.
ctx := s.ctx

// Refresh mount path if needed before post-processing
s.postProcessor.RefreshMountPathIfNeeded(ctx, resultingPath, item.ID)

// Delegate all post-processing to the coordinator
// This handles: VFS notification, symlinks, ID links, STRM files, health checks, ARR notifications
result, err := s.postProcessor.HandleSuccess(ctx, item, resultingPath)
if err != nil {
if ctx.Err() != nil {
// Shutdown — not a real failure
return
}
s.log.ErrorContext(ctx, "Post-processing failed", "queue_id", item.ID, "error", err)
return
}

if result != nil && len(result.Errors) > 0 {
for _, postErr := range result.Errors {
s.log.WarnContext(ctx, "Post-processing warning",
"queue_id", item.ID,
"error", postErr)
}
}

// Handle cleanup of completed NZB if configured
cfg := s.configGetter()
if cfg != nil && cfg.Metadata.DeleteCompletedNzb != nil && *cfg.Metadata.DeleteCompletedNzb {
s.log.InfoContext(ctx, "Deleting completed NZB (per config)", "file", item.NzbPath)
if err := os.Remove(item.NzbPath); err != nil {
s.log.WarnContext(ctx, "Failed to delete completed NZB", "file", item.NzbPath, "error", err)
}
}
}()
}

// OnItemClaimed implements queue.QueueEventListener. It broadcasts a queue-changed
// notification whenever a worker claims a pending item (pending → processing transition).
func (s *Service) OnItemClaimed(ctx context.Context, item *database.ImportQueueItem) {
Expand Down
Loading