Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions frontend/src/components/queue/ImportMethods.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
FileIcon,
FileText,
FolderOpen,
Info,
Link,
Play,
Search,
Expand Down Expand Up @@ -777,6 +778,19 @@ function NzbDavImportSection() {

return (
<div className="space-y-8">
<div className="rounded-xl border border-info/30 bg-info/5 p-4">
<div className="mb-3 flex items-center gap-2">
<Info className="h-4 w-4 text-info" />
<h4 className="font-bold text-sm">Migration Steps</h4>
</div>
<ol className="list-decimal space-y-1.5 pl-5 text-sm">
<li>Import the files</li>
<li className="font-semibold text-warning">Backup library symlinks (very important)</li>
<li>Make sure AltMount mount is there</li>
<li>Run the symlink migration</li>
</ol>
</div>

<div role="tablist" className="tabs tabs-boxed mb-2 max-w-md">
<button
type="button"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE import_queue ADD COLUMN skip_post_import_links BOOLEAN NOT NULL DEFAULT FALSE;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE import_queue DROP COLUMN IF EXISTS skip_post_import_links;
-- +goose StatementEnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE import_queue ADD COLUMN skip_post_import_links BOOLEAN NOT NULL DEFAULT FALSE;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
-- SQLite does not support DROP COLUMN in older versions; intentional no-op
-- +goose StatementEnd
1 change: 1 addition & 0 deletions internal/database/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type ImportQueueItem struct {
FileSize *int64 `db:"file_size"` // Total size in bytes calculated from segments
TargetPath *string `db:"target_path"` // Optional forced symlink destination path
SkipArrNotification bool `db:"skip_arr_notification"`
SkipPostImportLinks bool `db:"skip_post_import_links"`
}

// BulkOperationResult represents the result of a bulk queue operation
Expand Down
28 changes: 14 additions & 14 deletions internal/database/queue_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (r *QueueRepository) RestartQueueItemsBulk(ctx context.Context, ids []int64
// AddToQueue adds a new NZB file to the import queue
func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem) error {
query := `
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, skip_arr_notification, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, skip_arr_notification, skip_post_import_links, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(nzb_path) DO UPDATE SET
download_id = COALESCE(excluded.download_id, import_queue.download_id),
priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END,
Expand All @@ -128,7 +128,7 @@ func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem)
`

args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status,
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath, item.SkipArrNotification}
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath, item.SkipArrNotification, item.SkipPostImportLinks}

if r.dialect.IsPostgres() {
err := r.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&item.ID)
Expand Down Expand Up @@ -231,7 +231,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI
// Get the complete claimed item data
getQuery := `
SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path, skip_arr_notification
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path, skip_arr_notification, skip_post_import_links
FROM import_queue
WHERE id = ?
`
Expand All @@ -240,7 +240,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI
err = txRepo.db.QueryRowContext(ctx, getQuery, itemID).Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath, &item.SkipArrNotification,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath, &item.SkipArrNotification, &item.SkipPostImportLinks,
)
if err != nil {
return fmt.Errorf("failed to get claimed item: %w", err)
Expand Down Expand Up @@ -493,15 +493,15 @@ func (r *QueueRepository) UpdateQueueItemNzbPath(ctx context.Context, id int64,
func (r *QueueRepository) GetQueueItemByNzbPath(ctx context.Context, nzbPath string) (*ImportQueueItem, error) {
query := `
SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification, skip_post_import_links
FROM import_queue WHERE nzb_path = ? LIMIT 1
`

var item ImportQueueItem
err := r.db.QueryRowContext(ctx, query, nzbPath).Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification, &item.SkipPostImportLinks,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
Expand Down Expand Up @@ -580,8 +580,8 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu
return r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error {
// Prepare batch insert statement
query := `
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, skip_arr_notification, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, skip_arr_notification, skip_post_import_links, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(nzb_path) DO UPDATE SET
download_id = COALESCE(excluded.download_id, import_queue.download_id),
priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END,
Expand All @@ -596,7 +596,7 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu
now := time.Now()
for _, item := range items {
args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status,
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.SkipArrNotification}
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.SkipArrNotification, item.SkipPostImportLinks}

if txRepo.dialect.IsPostgres() {
err := txRepo.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&item.ID)
Expand Down Expand Up @@ -625,15 +625,15 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu
func (r *QueueRepository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueItem, error) {
query := `
SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification, skip_post_import_links
FROM import_queue WHERE id = ?
`

var item ImportQueueItem
err := r.db.QueryRowContext(ctx, query, id).Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification, &item.SkipPostImportLinks,
)
if err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -683,7 +683,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT
err := r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error {
// Select failed items older than the threshold
selectQuery := `SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification, skip_post_import_links
FROM import_queue WHERE status = 'failed' AND updated_at < ?`

rows, err := txRepo.db.QueryContext(ctx, selectQuery, olderThan)
Expand All @@ -697,7 +697,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT
if err := rows.Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification, &item.SkipPostImportLinks,
); err != nil {
return fmt.Errorf("failed to scan failed queue item: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/database/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func setupQueueSchema(t *testing.T, db *sql.DB) {
file_size BIGINT DEFAULT NULL,
target_path TEXT DEFAULT NULL,
skip_arr_notification BOOLEAN NOT NULL DEFAULT FALSE,
skip_post_import_links BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE(nzb_path)
);

Expand Down
47 changes: 29 additions & 18 deletions internal/importer/postprocessor/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,31 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu
// Continue
}

// 2. Create symlinks if configured
if err := c.CreateSymlinks(ctx, item, resultingPath); err != nil {
c.log.WarnContext(ctx, "Failed to create symlinks",
// 2 & 3. Create symlinks and STRM files if configured
if shouldSkipPostImportLinks(item) {
c.log.DebugContext(ctx, "Skipping symlink/STRM creation (post-import links disabled)",
"queue_id", item.ID,
"path", resultingPath,
"error", err)
result.Errors = append(result.Errors, err)
} else {
result.SymlinksCreated = true
}

// 3. Create STRM files if configured
if err := c.CreateStrmFiles(ctx, item, resultingPath); err != nil {
c.log.WarnContext(ctx, "Failed to create STRM files",
"queue_id", item.ID,
"path", resultingPath,
"error", err)
result.Errors = append(result.Errors, err)
"path", resultingPath)
} else {
result.StrmCreated = true
if err := c.CreateSymlinks(ctx, item, resultingPath); err != nil {
c.log.WarnContext(ctx, "Failed to create symlinks",
"queue_id", item.ID,
"path", resultingPath,
"error", err)
result.Errors = append(result.Errors, err)
} else {
result.SymlinksCreated = true
}

if err := c.CreateStrmFiles(ctx, item, resultingPath); err != nil {
c.log.WarnContext(ctx, "Failed to create STRM files",
"queue_id", item.ID,
"path", resultingPath,
"error", err)
result.Errors = append(result.Errors, err)
} else {
result.StrmCreated = true
}
}

// 4. Schedule health check
Expand Down Expand Up @@ -164,3 +169,9 @@ func (c *Coordinator) HandleFailure(ctx context.Context, item *database.ImportQu
func shouldSkipARRNotification(item *database.ImportQueueItem) bool {
return item.SkipArrNotification
}

// shouldSkipPostImportLinks returns true when the caller explicitly requested
// that post-import link creation (symlinks, STRM files) be suppressed.
func shouldSkipPostImportLinks(item *database.ImportQueueItem) bool {
return item != nil && item.SkipPostImportLinks
}
26 changes: 26 additions & 0 deletions internal/importer/postprocessor/coordinator_skip_links_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package postprocessor

import (
"testing"

"github.com/javi11/altmount/internal/database"
)

func TestShouldSkipPostImportLinks(t *testing.T) {
tests := []struct {
name string
item *database.ImportQueueItem
want bool
}{
{"nil item → do not skip", nil, false},
{"flag false → do not skip", &database.ImportQueueItem{SkipPostImportLinks: false}, false},
{"flag true → skip", &database.ImportQueueItem{SkipPostImportLinks: true}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := shouldSkipPostImportLinks(tt.item); got != tt.want {
t.Errorf("shouldSkipPostImportLinks() = %v, want %v", got, tt.want)
}
})
}
}
6 changes: 4 additions & 2 deletions internal/importer/scanner/nzbdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,9 @@ func (n *NzbDavImporter) createNzbFileAndPrepareItem(ctx context.Context, res *n

// Prepare item struct. RelativePath is left nil so the import mirrors the
// nzbdav folder structure under Category without an extra user-supplied prefix.
// SkipArrNotification is true because nzbdav imports are migration jobs — ARR
// scans should not be triggered for each individual item.
// Migration jobs: skip ARR scans (per-item notifications are noisy) and skip
// post-import link creation (symlinks/STRM). Library symlinks are rewritten
// separately by Phase 2 (RewriteLibrarySymlinks).
item := &database.ImportQueueItem{
NzbPath: nzbPath,
Category: &targetCategory,
Expand All @@ -509,6 +510,7 @@ func (n *NzbDavImporter) createNzbFileAndPrepareItem(ctx context.Context, res *n
CreatedAt: time.Now(),
Metadata: &metaJSON,
SkipArrNotification: true,
SkipPostImportLinks: true,
}

return item, nil
Expand Down
3 changes: 2 additions & 1 deletion internal/nzbdav/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func (p *Parser) parseBlobs(db *sql.DB, tree *davTree, out chan<- *ParsedNzb, er
parentPath := trimLastSegment(releaseParentPath)
category, relPath := p.splitPath(parentPath)

blobPath := filepath.Join(p.blobsPath, blobId[0:2], blobId[2:4], blobId)
lowerBlobID := strings.ToLower(blobId)
blobPath := filepath.Join(p.blobsPath, lowerBlobID[0:2], lowerBlobID[2:4], lowerBlobID)
blobFile, err := os.Open(blobPath)
if err != nil {
slog.ErrorContext(context.Background(), "Failed to open blob file", "path", blobPath, "error", err)
Expand Down
62 changes: 62 additions & 0 deletions internal/nzbdav/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,68 @@ func TestParser_Parse_Blobs(t *testing.T) {
assert.Equal(t, nzbContent, string(body))
}

// TestParser_Parse_Blobs_UppercaseUUID verifies that blob IDs stored uppercase in
// the SQLite database (real nzbdav format, e.g. "0AA2BD24-B90C-4E06-A301-DD0D296AD86C")
// are matched against their lowercase on-disk layout, since nzbdav's C# BlobStore
// writes paths using Guid.ToString("N") which is always lowercase.
func TestParser_Parse_Blobs_UppercaseUUID(t *testing.T) {
tmpDir := t.TempDir()
blobsDir := filepath.Join(tmpDir, "blobs")
dbPath := filepath.Join(tmpDir, "blobs_upper.db")
db, err := sql.Open("sqlite3", dbPath)
require.NoError(t, err)
defer db.Close()

_, err = db.Exec(`
CREATE TABLE DavItems (
Id TEXT PRIMARY KEY,
ParentId TEXT,
Name TEXT,
FileSize INTEGER,
Path TEXT,
NzbBlobId TEXT,
SubType INTEGER
);
CREATE TABLE NzbNames (
Id TEXT PRIMARY KEY,
FileName TEXT
);
`)
require.NoError(t, err)

// DB stores the UUID uppercase with hyphens (default EF Core Guid TEXT format).
dbBlobID := "0AA2BD24-B90C-4E06-A301-DD0D296AD86C"
// Disk stores it lowercase (Guid.ToString("N") / Guid.ToString()).
diskBlobID := "0aa2bd24-b90c-4e06-a301-dd0d296ad86c"
blobPath := filepath.Join(blobsDir, diskBlobID[0:2], diskBlobID[2:4], diskBlobID)
nzbContent := `<?xml version="1.0" encoding="UTF-8"?>
<nzb xmlns="http://www.newzbin.com/DTD/nzb/nzb-1.1.dtd">
<file poster="poster" date="12345" subject="subject">
<groups><group>alt.binaries.test</group></groups>
<segments><segment bytes="100" number="1">msgid@test</segment></segments>
</file>
</nzb>`
writeZstdBlob(t, blobPath, []byte(nzbContent))

_, err = db.Exec(`
INSERT INTO NzbNames (Id, FileName) VALUES (?, 'My Movie.nzb');
INSERT INTO DavItems (Id, ParentId, Name, Path, NzbBlobId, SubType) VALUES
('root', NULL, '/', '/', NULL, 1),
('movies', 'root', 'movies', '/movies', NULL, 1),
('folder', 'movies', 'My Movie', '/movies/My Movie', NULL, 1),
('item1', 'folder', 'My Movie.mkv', '/movies/My Movie/My Movie.mkv', ?, 203);
`, dbBlobID, dbBlobID)
require.NoError(t, err)

out, errChan := NewParser(dbPath, blobsDir).Parse()
got := collect(t, out, errChan)

require.Len(t, got, 1)
assert.Equal(t, "item1", got[0].ID)
body, _ := io.ReadAll(got[0].Content)
assert.Equal(t, nzbContent, string(body))
}

func TestParser_Parse_Blobs_Uncompressed(t *testing.T) {
tmpDir := t.TempDir()
blobsDir := filepath.Join(tmpDir, "blobs")
Expand Down
Loading