From 714cdef1afa85c0f8e0f023313d7fd2745731c04 Mon Sep 17 00:00:00 2001 From: javi11 Date: Mon, 20 Apr 2026 10:25:57 +0200 Subject: [PATCH 1/6] feat(database): add import_migrations table and repository Adds a generic import_migrations table to track two-phase migration state (Phase 1 = import NZBs into AltMount; Phase 2 = rewrite arr library symlinks). Includes goose migrations for SQLite and PostgreSQL, ImportMigration model + status consts, a full ImportMigrationRepository with Upsert/MarkImported/MarkFailed/MarkSymlinksMigrated/LookupByExternalID/ ListByStatus/Stats/ExistsForSource/BackfillFromImportQueue, and wires MigrationRepo into the DB struct. --- internal/database/db.go | 5 +- .../database/import_migration_repository.go | 321 ++++++++++++++++++ .../postgres/024_import_migrations.sql | 25 ++ .../sqlite/024_import_migrations.sql | 25 ++ internal/database/models.go | 33 ++ 5 files changed, 408 insertions(+), 1 deletion(-) create mode 100644 internal/database/import_migration_repository.go create mode 100644 internal/database/migrations/postgres/024_import_migrations.sql create mode 100644 internal/database/migrations/sqlite/024_import_migrations.sql diff --git a/internal/database/db.go b/internal/database/db.go index b62ddbf22..69b804da5 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -19,7 +19,8 @@ type DB struct { conn *sql.DB dialect dialectHelper // Repository is kept for backwards-compat; prefer using Connection() directly. - Repository *QueueRepository + Repository *QueueRepository + MigrationRepo *ImportMigrationRepository } // Config holds database configuration. @@ -86,6 +87,7 @@ func newSQLiteDB(config Config) (*DB, error) { dh := dialectHelper{d: DialectSQLite} db := &DB{conn: conn, dialect: dh} db.Repository = NewQueueRepository(conn, DialectSQLite) + db.MigrationRepo = NewImportMigrationRepository(conn, DialectSQLite) return db, nil } @@ -114,6 +116,7 @@ func newPostgresDB(config Config) (*DB, error) { dh := dialectHelper{d: DialectPostgres} db := &DB{conn: conn, dialect: dh} db.Repository = NewQueueRepository(conn, DialectPostgres) + db.MigrationRepo = NewImportMigrationRepository(conn, DialectPostgres) return db, nil } diff --git a/internal/database/import_migration_repository.go b/internal/database/import_migration_repository.go new file mode 100644 index 000000000..63cf03902 --- /dev/null +++ b/internal/database/import_migration_repository.go @@ -0,0 +1,321 @@ +package database + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + "time" +) + +// ImportMigrationRepository handles database operations for import_migrations. +type ImportMigrationRepository struct { + db *dialectAwareDB + dialect dialectHelper +} + +// NewImportMigrationRepository creates a new ImportMigrationRepository. +func NewImportMigrationRepository(db *sql.DB, d Dialect) *ImportMigrationRepository { + return &ImportMigrationRepository{ + db: newDialectAwareDB(db, d), + dialect: dialectHelper{d: d}, + } +} + +// Upsert inserts or updates a migration row keyed by (source, external_id). +// Returns the row ID. +func (r *ImportMigrationRepository) Upsert(ctx context.Context, row *ImportMigration) (int64, error) { + query := ` + INSERT INTO import_migrations + (source, external_id, queue_item_id, relative_path, final_path, status, error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + ON CONFLICT(source, external_id) DO UPDATE SET + queue_item_id = COALESCE(excluded.queue_item_id, import_migrations.queue_item_id), + relative_path = excluded.relative_path, + final_path = COALESCE(excluded.final_path, import_migrations.final_path), + status = excluded.status, + error = excluded.error, + updated_at = datetime('now') + ` + args := []any{ + row.Source, row.ExternalID, row.QueueItemID, + row.RelativePath, row.FinalPath, string(row.Status), row.Error, + } + + if r.dialect.IsPostgres() { + var id int64 + err := r.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&id) + if err != nil && err != sql.ErrNoRows { + return 0, fmt.Errorf("upsert import_migration: %w", err) + } + return id, nil + } + + res, err := r.db.ExecContext(ctx, query, args...) + if err != nil { + return 0, fmt.Errorf("upsert import_migration: %w", err) + } + id, err := res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("upsert import_migration last insert id: %w", err) + } + return id, nil +} + +// MarkImported sets status=imported and final_path for all rows matching queue_item_id. +func (r *ImportMigrationRepository) MarkImported(ctx context.Context, queueItemID int64, finalPath string) error { + query := ` + UPDATE import_migrations + SET status = 'imported', final_path = ?, updated_at = datetime('now') + WHERE queue_item_id = ? + ` + _, err := r.db.ExecContext(ctx, query, finalPath, queueItemID) + if err != nil { + return fmt.Errorf("mark import_migration imported (queue_item_id=%d): %w", queueItemID, err) + } + return nil +} + +// MarkFailed sets status=failed and error for all rows matching queue_item_id. +func (r *ImportMigrationRepository) MarkFailed(ctx context.Context, queueItemID int64, errMsg string) error { + query := ` + UPDATE import_migrations + SET status = 'failed', error = ?, updated_at = datetime('now') + WHERE queue_item_id = ? + ` + _, err := r.db.ExecContext(ctx, query, errMsg, queueItemID) + if err != nil { + return fmt.Errorf("mark import_migration failed (queue_item_id=%d): %w", queueItemID, err) + } + return nil +} + +// MarkSymlinksMigrated sets status=symlinks_migrated for the given row IDs. +func (r *ImportMigrationRepository) MarkSymlinksMigrated(ctx context.Context, ids []int64) error { + if len(ids) == 0 { + return nil + } + + placeholders := make([]string, len(ids)) + args := make([]any, len(ids)+1) + args[0] = time.Now() + for i, id := range ids { + placeholders[i] = "?" + args[i+1] = id + } + + query := fmt.Sprintf(` + UPDATE import_migrations + SET status = 'symlinks_migrated', updated_at = datetime('now') + WHERE id IN (%s) + `, strings.Join(placeholders, ", ")) + + _, err := r.db.ExecContext(ctx, query, args[1:]...) + if err != nil { + return fmt.Errorf("mark import_migrations symlinks_migrated: %w", err) + } + return nil +} + +// LookupByExternalID returns the migration row for (source, external_id), or nil if not found. +func (r *ImportMigrationRepository) LookupByExternalID(ctx context.Context, source, externalID string) (*ImportMigration, error) { + query := ` + SELECT id, source, external_id, queue_item_id, relative_path, final_path, status, error, created_at, updated_at + FROM import_migrations + WHERE source = ? AND external_id = ? + ` + row := r.db.QueryRowContext(ctx, query, source, externalID) + m, err := scanImportMigration(row) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("lookup import_migration (source=%s, external_id=%s): %w", source, externalID, err) + } + return m, nil +} + +// ListByStatus returns paginated rows for source with the given status. +func (r *ImportMigrationRepository) ListByStatus(ctx context.Context, source string, status ImportMigrationStatus, limit, offset int) ([]*ImportMigration, error) { + query := ` + SELECT id, source, external_id, queue_item_id, relative_path, final_path, status, error, created_at, updated_at + FROM import_migrations + WHERE source = ? AND status = ? + ORDER BY id ASC + LIMIT ? OFFSET ? + ` + rows, err := r.db.QueryContext(ctx, query, source, string(status), limit, offset) + if err != nil { + return nil, fmt.Errorf("list import_migrations by status: %w", err) + } + defer rows.Close() + + var result []*ImportMigration + for rows.Next() { + m, err := scanImportMigrationRow(rows) + if err != nil { + return nil, fmt.Errorf("scan import_migration: %w", err) + } + result = append(result, m) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate import_migrations: %w", err) + } + return result, nil +} + +// Stats returns aggregate counts for a source. +func (r *ImportMigrationRepository) Stats(ctx context.Context, source string) (*ImportMigrationStats, error) { + query := ` + SELECT + COUNT(*) AS total, + SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) AS pending, + SUM(CASE WHEN status = 'imported' THEN 1 ELSE 0 END) AS imported, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed, + SUM(CASE WHEN status = 'symlinks_migrated' THEN 1 ELSE 0 END) AS symlinks_migrated + FROM import_migrations + WHERE source = ? + ` + var stats ImportMigrationStats + err := r.db.QueryRowContext(ctx, query, source).Scan( + &stats.Total, + &stats.Pending, + &stats.Imported, + &stats.Failed, + &stats.SymlinksMigrated, + ) + if err != nil { + return nil, fmt.Errorf("stats import_migrations (source=%s): %w", source, err) + } + return &stats, nil +} + +// ExistsForSource returns true if any rows exist for the given source. +func (r *ImportMigrationRepository) ExistsForSource(ctx context.Context, source string) (bool, error) { + query := `SELECT COUNT(*) FROM import_migrations WHERE source = ? LIMIT 1` + var count int + if err := r.db.QueryRowContext(ctx, query, source).Scan(&count); err != nil { + return false, fmt.Errorf("exists import_migrations (source=%s): %w", source, err) + } + return count > 0, nil +} + +// BackfillFromImportQueue reads completed import_queue rows that contain a nzbdav_id +// in their metadata JSON and inserts them as status=imported rows into import_migrations +// (idempotent via ON CONFLICT IGNORE / INSERT OR IGNORE). +// Returns the number of rows inserted. +func (r *ImportMigrationRepository) BackfillFromImportQueue(ctx context.Context) (int, error) { + selectQuery := ` + SELECT id, relative_path, storage_path, metadata + FROM import_queue + WHERE status = 'completed' + AND metadata IS NOT NULL + AND storage_path IS NOT NULL + ` + rows, err := r.db.QueryContext(ctx, selectQuery) + if err != nil { + return 0, fmt.Errorf("backfill: query import_queue: %w", err) + } + defer rows.Close() + + type row struct { + id int64 + relativePath *string + storagePath *string + metadata string + } + + var candidates []row + for rows.Next() { + var candidate row + if err := rows.Scan(&candidate.id, &candidate.relativePath, &candidate.storagePath, &candidate.metadata); err != nil { + return 0, fmt.Errorf("backfill: scan import_queue row: %w", err) + } + candidates = append(candidates, candidate) + } + if err := rows.Err(); err != nil { + return 0, fmt.Errorf("backfill: iterate import_queue: %w", err) + } + + var nzbdavIDStruct struct { + NzbdavID string `json:"nzbdav_id"` + } + + inserted := 0 + for _, c := range candidates { + if err := json.Unmarshal([]byte(c.metadata), &nzbdavIDStruct); err != nil { + // Row has metadata but no parseable nzbdav_id — skip silently. + continue + } + if nzbdavIDStruct.NzbdavID == "" { + continue + } + + relativePath := "" + if c.relativePath != nil { + relativePath = *c.relativePath + } + + insertQuery := ` + INSERT OR IGNORE INTO import_migrations + (source, external_id, queue_item_id, relative_path, final_path, status, created_at, updated_at) + VALUES ('nzbdav', ?, ?, ?, ?, 'imported', datetime('now'), datetime('now')) + ` + if r.dialect.IsPostgres() { + insertQuery = ` + INSERT INTO import_migrations + (source, external_id, queue_item_id, relative_path, final_path, status, created_at, updated_at) + VALUES ('nzbdav', $1, $2, $3, $4, 'imported', NOW(), NOW()) + ON CONFLICT (source, external_id) DO NOTHING + ` + } + + var res sql.Result + if r.dialect.IsPostgres() { + res, err = r.db.ExecContext(ctx, insertQuery, nzbdavIDStruct.NzbdavID, c.id, relativePath, c.storagePath) + } else { + res, err = r.db.ExecContext(ctx, insertQuery, nzbdavIDStruct.NzbdavID, c.id, relativePath, c.storagePath) + } + if err != nil { + return inserted, fmt.Errorf("backfill: insert import_migration (external_id=%s): %w", nzbdavIDStruct.NzbdavID, err) + } + if n, _ := res.RowsAffected(); n > 0 { + inserted++ + } + } + + return inserted, nil +} + +// ─── helpers ───────────────────────────────────────────────────────────────── + +func scanImportMigration(row *sql.Row) (*ImportMigration, error) { + var m ImportMigration + var status string + err := row.Scan( + &m.ID, &m.Source, &m.ExternalID, &m.QueueItemID, + &m.RelativePath, &m.FinalPath, &status, &m.Error, + &m.CreatedAt, &m.UpdatedAt, + ) + if err != nil { + return nil, err + } + m.Status = ImportMigrationStatus(status) + return &m, nil +} + +func scanImportMigrationRow(rows *sql.Rows) (*ImportMigration, error) { + var m ImportMigration + var status string + err := rows.Scan( + &m.ID, &m.Source, &m.ExternalID, &m.QueueItemID, + &m.RelativePath, &m.FinalPath, &status, &m.Error, + &m.CreatedAt, &m.UpdatedAt, + ) + if err != nil { + return nil, err + } + m.Status = ImportMigrationStatus(status) + return &m, nil +} diff --git a/internal/database/migrations/postgres/024_import_migrations.sql b/internal/database/migrations/postgres/024_import_migrations.sql new file mode 100644 index 000000000..9df0925c0 --- /dev/null +++ b/internal/database/migrations/postgres/024_import_migrations.sql @@ -0,0 +1,25 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE import_migrations ( + id BIGSERIAL PRIMARY KEY, + source TEXT NOT NULL, + external_id TEXT NOT NULL, + queue_item_id BIGINT, + relative_path TEXT NOT NULL DEFAULT '', + final_path TEXT, + status TEXT NOT NULL DEFAULT 'pending', + error TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE(source, external_id) +); +CREATE INDEX idx_import_migrations_status ON import_migrations(source, status); +CREATE INDEX idx_import_migrations_queue ON import_migrations(queue_item_id); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX IF EXISTS idx_import_migrations_queue; +DROP INDEX IF EXISTS idx_import_migrations_status; +DROP TABLE IF EXISTS import_migrations; +-- +goose StatementEnd diff --git a/internal/database/migrations/sqlite/024_import_migrations.sql b/internal/database/migrations/sqlite/024_import_migrations.sql new file mode 100644 index 000000000..2ffadb78a --- /dev/null +++ b/internal/database/migrations/sqlite/024_import_migrations.sql @@ -0,0 +1,25 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE import_migrations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT NOT NULL, + external_id TEXT NOT NULL, + queue_item_id INTEGER, + relative_path TEXT NOT NULL DEFAULT '', + final_path TEXT, + status TEXT NOT NULL DEFAULT 'pending', + error TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE(source, external_id) +); +CREATE INDEX idx_import_migrations_status ON import_migrations(source, status); +CREATE INDEX idx_import_migrations_queue ON import_migrations(queue_item_id); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX IF EXISTS idx_import_migrations_queue; +DROP INDEX IF EXISTS idx_import_migrations_status; +DROP TABLE IF EXISTS import_migrations; +-- +goose StatementEnd diff --git a/internal/database/models.go b/internal/database/models.go index 3171fc44e..de1bc9222 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -169,3 +169,36 @@ type ImportHistory struct { CompletedAt time.Time `db:"completed_at"` } +// ImportMigrationStatus represents the status of a migration item +type ImportMigrationStatus string + +const ( + ImportMigrationStatusPending ImportMigrationStatus = "pending" + ImportMigrationStatusImported ImportMigrationStatus = "imported" + ImportMigrationStatusFailed ImportMigrationStatus = "failed" + ImportMigrationStatusSymlinksMigrated ImportMigrationStatus = "symlinks_migrated" +) + +// ImportMigration tracks progress of two-phase migrations (e.g. nzbdav → altmount) +type ImportMigration struct { + ID int64 `db:"id"` + Source string `db:"source"` // e.g. "nzbdav" + ExternalID string `db:"external_id"` // source-specific ID (nzbdav GUID) + QueueItemID *int64 `db:"queue_item_id"` // FK → import_queue.id (nullable) + RelativePath string `db:"relative_path"` // virtual path when enqueued + FinalPath *string `db:"final_path"` // storage_path after import + Status ImportMigrationStatus `db:"status"` + Error *string `db:"error"` + CreatedAt time.Time `db:"created_at"` + UpdatedAt time.Time `db:"updated_at"` +} + +// ImportMigrationStats holds aggregate counts for a source +type ImportMigrationStats struct { + Pending int + Imported int + Failed int + SymlinksMigrated int + Total int +} + From 2a160fc82c8b9e6a5b860b9fc78368daa1128d86 Mon Sep 17 00:00:00 2001 From: javi11 Date: Mon, 20 Apr 2026 10:33:51 +0200 Subject: [PATCH 2/6] refactor(nzbdav): remove ID-symlink write code and FilterExistingNzbdavIds Delete HandleIDMetadataLinks and id_linker.go; remove UpdateIDSymlink and RemoveIDSymlink methods from MetadataService; remove the .id sidecar write block from WriteFileMetadata (read path preserved for Phase 2 compatibility); remove FilterExistingNzbdavIds from QueueRepository, Repository, and the BatchQueueAdder interface; remove related call sites in nzbfilesystem MOVE handler and nzbdav scanner processBatch. --- internal/database/queue_repository.go | 53 ----------- internal/database/repository.go | 55 +---------- .../importer/postprocessor/coordinator.go | 9 +- internal/importer/postprocessor/id_linker.go | 34 ------- internal/importer/scanner/nzbdav.go | 95 ++----------------- internal/importer/service.go | 4 - internal/metadata/service.go | 73 -------------- .../nzbfilesystem/metadata_remote_file.go | 23 ----- 8 files changed, 10 insertions(+), 336 deletions(-) delete mode 100644 internal/importer/postprocessor/id_linker.go diff --git a/internal/database/queue_repository.go b/internal/database/queue_repository.go index f827ce581..6ff9d4d45 100644 --- a/internal/database/queue_repository.go +++ b/internal/database/queue_repository.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "strings" "time" ) @@ -470,58 +469,6 @@ func (r *QueueRepository) IncrementRetryCountAndResetStatus(ctx context.Context, return rowsAffected > 0, nil } -// FilterExistingNzbdavIds checks a list of nzbdav IDs and returns those that already exist in the queue -func (r *QueueRepository) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) { - if len(ids) == 0 { - return nil, nil - } - - // We can't pass too many parameters at once, so we batch the query - batchSize := 500 - existingIds := make([]string, 0) - - for i := 0; i < len(ids); i += batchSize { - end := min(i+batchSize, len(ids)) - - batchIds := ids[i:end] - - // Build placeholders for the IN clause - placeholders := make([]string, len(batchIds)) - args := make([]any, len(batchIds)) - for j, id := range batchIds { - placeholders[j] = "?" - args[j] = id - } - - // Query using json_extract (SQLite) or ->>'key' (PostgreSQL) to find matching IDs - jsonExpr := r.dialect.JSONExtract("metadata", "nzbdav_id") - query := fmt.Sprintf(` - SELECT DISTINCT %s - FROM import_queue - WHERE %s IN (%s) - `, jsonExpr, jsonExpr, strings.Join(placeholders, ",")) - - rows, err := r.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to check existing nzbdav IDs: %w", err) - } - - for rows.Next() { - var id sql.NullString - if err := rows.Scan(&id); err != nil { - rows.Close() - return nil, fmt.Errorf("failed to scan matching id: %w", err) - } - if id.Valid { - existingIds = append(existingIds, id.String) - } - } - rows.Close() - } - - return existingIds, nil -} - // UpdateQueueItemPriority updates the priority of a queue item func (r *QueueRepository) UpdateQueueItemPriority(ctx context.Context, id int64, priority QueuePriority) error { query := `UPDATE import_queue SET priority = ?, updated_at = datetime('now') WHERE id = ?` diff --git a/internal/database/repository.go b/internal/database/repository.go index 4fa78808b..7193b40e5 100644 --- a/internal/database/repository.go +++ b/internal/database/repository.go @@ -69,7 +69,7 @@ func (r *Repository) WithImmediateTransaction(ctx context.Context, fn func(*Repo } // withTransactionMode executes a function within a database transaction with specified mode -func (r *Repository) withTransactionMode(ctx context.Context, mode string, fn func(*Repository) error) error { +func (r *Repository) withTransactionMode(ctx context.Context, _ string, fn func(*Repository) error) error { ddb, ok := r.db.(*dialectAwareDB) if !ok { return fmt.Errorf("repository not connected to dialectAwareDB") @@ -942,59 +942,6 @@ func (r *Repository) IsFileInQueue(ctx context.Context, filePath string) (bool, return true, nil } -// FilterExistingNzbdavIds checks a list of nzbdav IDs and returns those that already exist in the queue -// This is used for deduplication during import -func (r *Repository) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) { - if len(ids) == 0 { - return nil, nil - } - - // We can't pass too many parameters at once, so we batch the query - batchSize := 500 - existingIds := make([]string, 0) - - for i := 0; i < len(ids); i += batchSize { - end := min(i+batchSize, len(ids)) - - batchIds := ids[i:end] - - // Build placeholders for the IN clause - placeholders := make([]string, len(batchIds)) - args := make([]any, len(batchIds)) - for j, id := range batchIds { - placeholders[j] = "?" - args[j] = id - } - - // Query using json_extract (SQLite) or ->>'key' (PostgreSQL) to find matching IDs - jsonExpr := r.dialect.JSONExtract("metadata", "nzbdav_id") - query := fmt.Sprintf(` - SELECT DISTINCT %s - FROM import_queue - WHERE %s IN (%s) - `, jsonExpr, jsonExpr, strings.Join(placeholders, ",")) - - rows, err := r.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to check existing nzbdav IDs: %w", err) - } - - for rows.Next() { - var id sql.NullString - if err := rows.Scan(&id); err != nil { - rows.Close() - return nil, fmt.Errorf("failed to scan matching id: %w", err) - } - if id.Valid { - existingIds = append(existingIds, id.String) - } - } - rows.Close() - } - - return existingIds, nil -} - // UpdateQueueItemPriority updates the priority of a queue item func (r *Repository) UpdateQueueItemPriority(ctx context.Context, id int64, priority QueuePriority) error { query := `UPDATE import_queue SET priority = ?, updated_at = datetime('now') WHERE id = ?` diff --git a/internal/importer/postprocessor/coordinator.go b/internal/importer/postprocessor/coordinator.go index fb01afe30..c4982eebd 100644 --- a/internal/importer/postprocessor/coordinator.go +++ b/internal/importer/postprocessor/coordinator.go @@ -109,10 +109,7 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu result.SymlinksCreated = true } - // 3. Create ID metadata links - c.HandleIDMetadataLinks(ctx, item, resultingPath) - - // 4. Create STRM files if configured + // 3. Create STRM files if configured if err := c.CreateStrmFiles(ctx, item, resultingPath); err != nil { c.log.WarnContext(ctx, "Failed to create STRM files", "queue_id", item.ID, @@ -123,7 +120,7 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu result.StrmCreated = true } - // 5. Schedule health check + // 4. Schedule health check if err := c.ScheduleHealthCheck(ctx, resultingPath); err != nil { c.log.WarnContext(ctx, "Failed to schedule health check", "path", resultingPath, @@ -133,7 +130,7 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu result.HealthScheduled = true } - // 6. Notify ARR applications + // 5. Notify ARR applications if shouldSkipARRNotification(item) { c.log.DebugContext(ctx, "ARR notification skipped (requested by caller)", "queue_id", item.ID, diff --git a/internal/importer/postprocessor/id_linker.go b/internal/importer/postprocessor/id_linker.go deleted file mode 100644 index fe9456d26..000000000 --- a/internal/importer/postprocessor/id_linker.go +++ /dev/null @@ -1,34 +0,0 @@ -package postprocessor - -import ( - "context" - "encoding/json" - - "github.com/javi11/altmount/internal/database" - metapb "github.com/javi11/altmount/internal/metadata/proto" -) - -// HandleIDMetadataLinks creates ID-based metadata links for nzbdav compatibility -func (c *Coordinator) HandleIDMetadataLinks(ctx context.Context, item *database.ImportQueueItem, resultingPath string) { - // 1. Check if the queue item itself has a release-level ID in its metadata - if item.Metadata != nil && *item.Metadata != "" { - var meta struct { - NzbdavID string `json:"nzbdav_id"` - } - if err := json.Unmarshal([]byte(*item.Metadata), &meta); err == nil && meta.NzbdavID != "" { - if err := c.metadataService.UpdateIDSymlink(meta.NzbdavID, resultingPath); err != nil { - c.log.Warn("Failed to create release ID metadata link", "id", meta.NzbdavID, "error", err) - } - } - } - - // 2. Check individual files for IDs using MetadataService walker - _ = c.metadataService.WalkDirectoryFiles(resultingPath, func(fileVirtualPath string, meta *metapb.FileMetadata) error { - if meta.NzbdavId != "" { - if err := c.metadataService.UpdateIDSymlink(meta.NzbdavId, fileVirtualPath); err != nil { - c.log.Warn("Failed to create file ID metadata link", "id", meta.NzbdavId, "error", err) - } - } - return nil - }) -} diff --git a/internal/importer/scanner/nzbdav.go b/internal/importer/scanner/nzbdav.go index 8b2005073..4b2a7030b 100644 --- a/internal/importer/scanner/nzbdav.go +++ b/internal/importer/scanner/nzbdav.go @@ -19,7 +19,6 @@ import ( // BatchQueueAdder defines the interface for batch queue operations type BatchQueueAdder interface { AddBatchToQueue(ctx context.Context, items []*database.ImportQueueItem) error - FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) } // NzbDavImporter handles importing from NZBDav databases @@ -231,98 +230,16 @@ func (n *NzbDavImporter) processBatch(ctx context.Context, batchChan <-chan *dat insertBatch := func() { if len(batch) > 0 { - // 1. Extract IDs from batch to check for duplicates - idMap := make(map[string]*database.ImportQueueItem) - idsToCheck := make([]string, 0, len(batch)) - - type metaStruct struct { - NzbdavID string `json:"nzbdav_id"` - } - - for _, item := range batch { - if item.Metadata != nil { - var meta metaStruct - if err := json.Unmarshal([]byte(*item.Metadata), &meta); err == nil && meta.NzbdavID != "" { - idMap[meta.NzbdavID] = item - idsToCheck = append(idsToCheck, meta.NzbdavID) - } - } - } - - // 2. Check for existing IDs in DB - var existingIds []string - var err error - if len(idsToCheck) > 0 { - existingIds, err = n.batchAdder.FilterExistingNzbdavIds(ctx, idsToCheck) - if err != nil { - n.log.ErrorContext(ctx, "Failed to check for existing IDs", "error", err) - // On error, we proceed with all items - the DB unique constraint on nzb_path - // will catch duplicates, though less efficiently, or we might add duplicates - // if paths differ. Better to fail safe and try to add. - } - } - - // 3. Filter out duplicates - itemsToAdd := make([]*database.ImportQueueItem, 0, len(batch)) - duplicates := 0 - - if len(existingIds) > 0 { - existingMap := make(map[string]bool) - for _, id := range existingIds { - existingMap[id] = true - } - - for _, item := range batch { - isDuplicate := false - if item.Metadata != nil { - var meta metaStruct - if err := json.Unmarshal([]byte(*item.Metadata), &meta); err == nil && meta.NzbdavID != "" { - if existingMap[meta.NzbdavID] { - isDuplicate = true - } - } - } - - if isDuplicate { - duplicates++ - // Cleanup temp NZB file for duplicate - if err := os.Remove(item.NzbPath); err != nil { - n.log.DebugContext(ctx, "Failed to remove duplicate temp NZB", "path", item.NzbPath, "error", err) - } - // Also try to remove the parent temp dir if empty (it was created just for this file) - go func(path string) { - dir := filepath.Dir(path) - _ = os.Remove(dir) - }(item.NzbPath) - } else { - itemsToAdd = append(itemsToAdd, item) - } - } + if err := n.batchAdder.AddBatchToQueue(ctx, batch); err != nil { + n.log.ErrorContext(ctx, "Failed to add batch to queue", "count", len(batch), "error", err) + n.mu.Lock() + n.info.Failed += len(batch) + n.mu.Unlock() } else { - itemsToAdd = batch - } - - if duplicates > 0 { - n.log.InfoContext(ctx, "Skipped duplicate items", "count", duplicates) n.mu.Lock() - n.info.Skipped += duplicates + n.info.Added += len(batch) n.mu.Unlock() } - - // 4. Add unique items to queue - if len(itemsToAdd) > 0 { - if err := n.batchAdder.AddBatchToQueue(ctx, itemsToAdd); err != nil { - n.log.ErrorContext(ctx, "Failed to add batch to queue", "count", len(itemsToAdd), "error", err) - n.mu.Lock() - n.info.Failed += len(itemsToAdd) - n.mu.Unlock() - } else { - n.mu.Lock() - n.info.Added += len(itemsToAdd) - n.mu.Unlock() - } - } - batch = nil // Reset batch } } diff --git a/internal/importer/service.go b/internal/importer/service.go index a6847dc10..e4e88fb69 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -102,10 +102,6 @@ func (a *batchQueueAdapterForImporter) AddBatchToQueue(ctx context.Context, item return a.repo.AddBatchToQueue(ctx, items) } -func (a *batchQueueAdapterForImporter) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) { - return a.repo.FilterExistingNzbdavIds(ctx, ids) -} - // isFileAlreadyProcessed checks if a file has already been processed by checking metadata func isFileAlreadyProcessed(metadataService *metadata.MetadataService, filePath string, scanRoot string) bool { // Calculate virtual path diff --git a/internal/metadata/service.go b/internal/metadata/service.go index 474335544..4060d6de8 100644 --- a/internal/metadata/service.go +++ b/internal/metadata/service.go @@ -8,7 +8,6 @@ import ( "log/slog" "os" "path/filepath" - "runtime" "strings" "time" @@ -113,18 +112,6 @@ func (ms *MetadataService) WriteFileMetadata(virtualPath string, metadata *metap return fmt.Errorf("failed to rename metadata file: %w", err) } - // Handle ID sidecar file - idPath := metadataPath + ".id" - if nzbdavId != "" { - if err := os.WriteFile(idPath, []byte(nzbdavId), 0644); err != nil { - // Log error but don't fail the operation - slog.WarnContext(context.Background(), "Failed to write ID sidecar file", "path", idPath, "error", err) - } - } else { - // Clean up existing ID file if present - _ = os.Remove(idPath) - } - metadata.NzbdavId = nzbdavId // Restore for in-memory use // Update only the lightweight cache; the full proto (with SegmentData) is @@ -395,11 +382,6 @@ func (ms *MetadataService) DeleteFileMetadataWithSourceNzb(ctx context.Context, } } - // Remove .ids/ symlink before deletion - if idData, readErr := os.ReadFile(metadataPath + ".id"); readErr == nil && len(idData) > 0 { - _ = ms.RemoveIDSymlink(string(idData)) - } - // Delete the metadata file err := os.Remove(metadataPath) if err != nil && !os.IsNotExist(err) { @@ -501,56 +483,6 @@ func (ms *MetadataService) RenameFileMetadata(oldVirtualPath, newVirtualPath str return nil } -// UpdateIDSymlink creates or updates an ID-based symlink in the .ids/ sharded directory. -// On Windows, symlinks are not supported and this operation is skipped gracefully. -func (ms *MetadataService) UpdateIDSymlink(nzbdavID, virtualPath string) error { - if runtime.GOOS == "windows" { - return nil // Symlinks not supported on Windows; ID-based lookup via symlinks is skipped - } - - id := strings.ToLower(nzbdavID) - if len(id) < 5 { - return nil // Invalid ID for sharding - } - - shardPath := filepath.Join(".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4])) - fullShardDir := filepath.Join(ms.rootPath, shardPath) - - if err := os.MkdirAll(fullShardDir, 0755); err != nil { - return err - } - - targetMetaPath := ms.GetMetadataFilePath(virtualPath) - linkPath := filepath.Join(fullShardDir, id+".meta") - - // Remove existing symlink if present - os.Remove(linkPath) - - // Create relative symlink - relTarget, err := filepath.Rel(fullShardDir, targetMetaPath) - if err != nil { - return os.Symlink(targetMetaPath, linkPath) - } - - return os.Symlink(relTarget, linkPath) -} - -// RemoveIDSymlink removes an ID-based symlink from the .ids/ sharded directory. -func (ms *MetadataService) RemoveIDSymlink(nzbdavID string) error { - id := strings.ToLower(nzbdavID) - if len(id) < 5 { - return nil - } - - shardPath := filepath.Join(".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4])) - linkPath := filepath.Join(ms.rootPath, shardPath, id+".meta") - - if err := os.Remove(linkPath); err != nil && !os.IsNotExist(err) { - return err - } - return nil -} - // WalkDirectoryFiles walks a metadata directory and calls fn for each file's virtual path and metadata. func (ms *MetadataService) WalkDirectoryFiles(virtualPath string, fn func(fileVirtualPath string, meta *metapb.FileMetadata) error) error { metadataDir := filepath.Join(ms.rootPath, virtualPath) @@ -753,11 +685,6 @@ func (ms *MetadataService) MoveToCorrupted(ctx context.Context, virtualPath stri targetPath := filepath.Join(targetDir, truncatedFilename+".meta") - // Remove .ids/ symlink before moving to corrupted - if idData, readErr := os.ReadFile(metadataPath + ".id"); readErr == nil && len(idData) > 0 { - _ = ms.RemoveIDSymlink(string(idData)) - } - // Move the .meta file if err := os.Rename(metadataPath, targetPath); err != nil { slog.WarnContext(ctx, "Failed to move corrupted metadata, trying copy fallback", "error", err) diff --git a/internal/nzbfilesystem/metadata_remote_file.go b/internal/nzbfilesystem/metadata_remote_file.go index 7be658955..c3cc508c0 100644 --- a/internal/nzbfilesystem/metadata_remote_file.go +++ b/internal/nzbfilesystem/metadata_remote_file.go @@ -384,16 +384,6 @@ func (mrf *MetadataRemoteFile) RenameFile(ctx context.Context, oldName, newName } } - // Update ID symlinks for all files with NzbdavId under the renamed directory - _ = mrf.metadataService.WalkDirectoryFiles(normalizedNew, func(fileVirtualPath string, meta *metapb.FileMetadata) error { - if meta.NzbdavId != "" { - if err := mrf.metadataService.UpdateIDSymlink(meta.NzbdavId, fileVirtualPath); err != nil { - slog.WarnContext(ctx, "Failed to update ID symlink after directory rename", "id", meta.NzbdavId, "path", fileVirtualPath, "error", err) - } - } - return nil - }) - return true, nil } @@ -404,24 +394,11 @@ func (mrf *MetadataRemoteFile) RenameFile(ctx context.Context, oldName, newName return false, nil } - // Read metadata first to get NzbdavId before rename - fileMeta, err := mrf.metadataService.ReadFileMetadata(normalizedOld) - if err != nil { - return false, fmt.Errorf("failed to read old metadata: %w", err) - } - // Use atomic rename instead of read-write-delete if err := mrf.metadataService.RenameFileMetadata(normalizedOld, normalizedNew); err != nil { return false, fmt.Errorf("failed to rename metadata: %w", err) } - // Update ID symlink if file has a NzbdavId - if fileMeta != nil && fileMeta.NzbdavId != "" { - if err := mrf.metadataService.UpdateIDSymlink(fileMeta.NzbdavId, normalizedNew); err != nil { - slog.WarnContext(ctx, "Failed to update ID symlink during MOVE", "id", fileMeta.NzbdavId, "error", err) - } - } - // Update health records and resolve pending repairs if mrf.healthRepository != nil { if err := mrf.healthRepository.RenameHealthRecord(ctx, normalizedOld, normalizedNew); err != nil { From dff49f564a7460c4693091b697a14072f5f299db Mon Sep 17 00:00:00 2001 From: javi11 Date: Mon, 20 Apr 2026 10:39:07 +0200 Subject: [PATCH 3/6] feat(scanner): use import_migrations for dedup, set SkipArrNotification - Add MigrationRecorder interface to scanner package with UpsertMigration and IsMigrationCompleted methods - Update NzbDavImporter to accept MigrationRecorder as second constructor parameter alongside BatchQueueAdder - processBatch now checks IsMigrationCompleted before enqueueing (skip already-imported/symlinks_migrated items) and calls UpsertMigration for new items, then strips nzbdav_id from the queue item metadata leaving only extracted_files if present - createNzbFileAndPrepareItem sets SkipArrNotification=true on all items - batchQueueAdapterForImporter gains migrationRepo field and implements MigrationRecorder via ImportMigrationRepository.Upsert/LookupByExternalID --- internal/importer/scanner/nzbdav.go | 109 ++++++++++++++++++++++++---- internal/importer/service.go | 31 +++++++- 2 files changed, 121 insertions(+), 19 deletions(-) diff --git a/internal/importer/scanner/nzbdav.go b/internal/importer/scanner/nzbdav.go index 4b2a7030b..e43e8aa82 100644 --- a/internal/importer/scanner/nzbdav.go +++ b/internal/importer/scanner/nzbdav.go @@ -21,10 +21,17 @@ type BatchQueueAdder interface { AddBatchToQueue(ctx context.Context, items []*database.ImportQueueItem) error } +// MigrationRecorder defines the interface for recording import migrations +type MigrationRecorder interface { + UpsertMigration(ctx context.Context, source, externalID, relativePath string) (int64, error) + IsMigrationCompleted(ctx context.Context, source, externalID string) (bool, error) +} + // NzbDavImporter handles importing from NZBDav databases type NzbDavImporter struct { - batchAdder BatchQueueAdder - log *slog.Logger + batchAdder BatchQueueAdder + migrationRecorder MigrationRecorder + log *slog.Logger // State management mu sync.RWMutex @@ -33,11 +40,12 @@ type NzbDavImporter struct { } // NewNzbDavImporter creates a new NZBDav importer -func NewNzbDavImporter(batchAdder BatchQueueAdder) *NzbDavImporter { +func NewNzbDavImporter(batchAdder BatchQueueAdder, migrationRecorder MigrationRecorder) *NzbDavImporter { return &NzbDavImporter{ - batchAdder: batchAdder, - log: slog.Default().With("component", "nzbdav-importer"), - info: ImportInfo{Status: ImportStatusIdle}, + batchAdder: batchAdder, + migrationRecorder: migrationRecorder, + log: slog.Default().With("component", "nzbdav-importer"), + info: ImportInfo{Status: ImportStatusIdle}, } } @@ -222,7 +230,9 @@ func (n *NzbDavImporter) performImport(ctx context.Context, dbPath string, blobs } } -// processBatch batches queue items and adds them to the queue +// processBatch batches queue items and adds them to the queue. +// It uses migrationRecorder to deduplicate already-completed items and to +// record new items before enqueueing them. func (n *NzbDavImporter) processBatch(ctx context.Context, batchChan <-chan *database.ImportQueueItem) { var batch []*database.ImportQueueItem ticker := time.NewTicker(500 * time.Millisecond) @@ -244,6 +254,43 @@ func (n *NzbDavImporter) processBatch(ctx context.Context, batchChan <-chan *dat } } + // extractNzbdavID reads nzbdav_id from the item metadata JSON. + extractNzbdavID := func(item *database.ImportQueueItem) string { + if item.Metadata == nil { + return "" + } + var meta struct { + NzbdavID string `json:"nzbdav_id"` + } + if err := json.Unmarshal([]byte(*item.Metadata), &meta); err != nil { + return "" + } + return meta.NzbdavID + } + + // stripNzbdavIDFromMetadata rewrites the metadata JSON removing the nzbdav_id key, + // retaining only other keys (e.g. extracted_files). + stripNzbdavIDFromMetadata := func(item *database.ImportQueueItem) { + if item.Metadata == nil { + return + } + var metaMap map[string]any + if err := json.Unmarshal([]byte(*item.Metadata), &metaMap); err != nil { + return + } + delete(metaMap, "nzbdav_id") + if len(metaMap) == 0 { + item.Metadata = nil + return + } + b, err := json.Marshal(metaMap) + if err != nil { + return + } + s := string(b) + item.Metadata = &s + } + for { select { case item, ok := <-batchChan: @@ -252,6 +299,35 @@ func (n *NzbDavImporter) processBatch(ctx context.Context, batchChan <-chan *dat insertBatch() return } + + nzbdavID := extractNzbdavID(item) + + // Dedup: skip items already successfully imported. + if nzbdavID != "" { + completed, err := n.migrationRecorder.IsMigrationCompleted(ctx, "nzbdav", nzbdavID) + if err != nil { + n.log.ErrorContext(ctx, "Failed to check migration status", "nzbdav_id", nzbdavID, "error", err) + } else if completed { + n.mu.Lock() + n.info.Skipped++ + n.mu.Unlock() + continue + } + + // Record migration row before enqueueing. + relativePath := "" + if item.Category != nil { + relativePath = *item.Category + } + if _, err := n.migrationRecorder.UpsertMigration(ctx, "nzbdav", nzbdavID, relativePath); err != nil { + n.log.ErrorContext(ctx, "Failed to upsert migration", "nzbdav_id", nzbdavID, "error", err) + } + } + + // Strip nzbdav_id from the queue item metadata — it lives in + // import_migrations now. Keep extracted_files if present. + stripNzbdavIDFromMetadata(item) + batch = append(batch, item) if len(batch) >= 100 { // Batch size insertBatch() @@ -324,15 +400,18 @@ func (n *NzbDavImporter) createNzbFileAndPrepareItem(ctx context.Context, res *n // Prepare item struct. RelativePath is left nil so the import mirrors the // nzbdav folder structure under Category without an extra user-supplied prefix. + // SkipArrNotification is true because nzbdav imports are migration jobs — ARR + // scans should not be triggered for each individual item. item := &database.ImportQueueItem{ - NzbPath: nzbPath, - Category: &targetCategory, - Priority: priority, - Status: database.QueueStatusPending, - RetryCount: 0, - MaxRetries: 3, - CreatedAt: time.Now(), - Metadata: &metaJSON, + NzbPath: nzbPath, + Category: &targetCategory, + Priority: priority, + Status: database.QueueStatusPending, + RetryCount: 0, + MaxRetries: 3, + CreatedAt: time.Now(), + Metadata: &metaJSON, + SkipArrNotification: true, } return item, nil diff --git a/internal/importer/service.go b/internal/importer/service.go index e4e88fb69..9c218b6bc 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -93,15 +93,37 @@ func (a *queueAdapterForScanner) IsFileProcessed(filePath string, scanRoot strin return isFileAlreadyProcessed(a.metadataService, filePath, scanRoot) } -// batchQueueAdapterForImporter adapts database repository for scanner.BatchQueueAdder interface +// batchQueueAdapterForImporter adapts database repository for scanner.BatchQueueAdder and +// scanner.MigrationRecorder interfaces. type batchQueueAdapterForImporter struct { - repo *database.QueueRepository + repo *database.QueueRepository + migrationRepo *database.ImportMigrationRepository } func (a *batchQueueAdapterForImporter) AddBatchToQueue(ctx context.Context, items []*database.ImportQueueItem) error { return a.repo.AddBatchToQueue(ctx, items) } +func (a *batchQueueAdapterForImporter) UpsertMigration(ctx context.Context, source, externalID, relativePath string) (int64, error) { + return a.migrationRepo.Upsert(ctx, &database.ImportMigration{ + Source: source, + ExternalID: externalID, + RelativePath: relativePath, + Status: database.ImportMigrationStatusPending, + }) +} + +func (a *batchQueueAdapterForImporter) IsMigrationCompleted(ctx context.Context, source, externalID string) (bool, error) { + row, err := a.migrationRepo.LookupByExternalID(ctx, source, externalID) + if err != nil { + return false, err + } + if row == nil { + return false, nil + } + return row.Status == database.ImportMigrationStatusImported || row.Status == database.ImportMigrationStatusSymlinksMigrated, nil +} + // isFileAlreadyProcessed checks if a file has already been processed by checking metadata func isFileAlreadyProcessed(metadataService *metadata.MetadataService, filePath string, scanRoot string) bool { // Calculate virtual path @@ -233,9 +255,10 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService, // Create adapter for NZBDav imports importerAdapter := &batchQueueAdapterForImporter{ - repo: database.Repository, + repo: database.Repository, + migrationRepo: database.MigrationRepo, } - service.nzbdavImporter = scanner.NewNzbDavImporter(importerAdapter) + service.nzbdavImporter = scanner.NewNzbDavImporter(importerAdapter, importerAdapter) // Create directory watcher (Service implements WatchQueueAdder) service.watcher = scanner.NewWatcher(service, configGetter) From aa32a1eba364ba8905ff00a57a127d7630e71a57 Mon Sep 17 00:00:00 2001 From: javi11 Date: Mon, 20 Apr 2026 10:40:55 +0200 Subject: [PATCH 4/6] feat(importer): update import_migrations on queue item completion/failure Wire s.database.MigrationRepo into handleProcessingSuccess and handleProcessingFailure so that import_migrations rows are marked imported/failed when the corresponding queue item finishes. Both calls are non-fatal: failures are logged as warnings and do not affect the main processing outcome. If no matching migration row exists (non-nzbdav import), the UPDATE simply affects 0 rows. --- internal/importer/service.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/internal/importer/service.go b/internal/importer/service.go index 9c218b6bc..00c1e13a3 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -1003,6 +1003,15 @@ func (s *Service) handleProcessingSuccess(ctx context.Context, item *database.Im return err } + // Update import_migrations row if this was a nzbdav migration import + if s.database.MigrationRepo != nil { + if err := s.database.MigrationRepo.MarkImported(ctx, item.ID, resultingPath); err != nil { + // Non-fatal: log but don't fail + s.log.WarnContext(ctx, "Failed to mark import_migration as imported", + "queue_id", item.ID, "error", err) + } + } + // Notify completion and clear progress tracking if s.broadcaster != nil { s.broadcaster.NotifyComplete(int(item.ID), "completed") @@ -1088,6 +1097,14 @@ func (s *Service) handleProcessingFailure(ctx context.Context, item *database.Im "file", item.NzbPath) } + // Update import_migrations row if this was a nzbdav migration import + if s.database.MigrationRepo != nil { + if err := s.database.MigrationRepo.MarkFailed(ctx, item.ID, errorMessage); err != nil { + s.log.WarnContext(ctx, "Failed to mark import_migration as failed", + "queue_id", item.ID, "error", err) + } + } + // Notify failure and clear progress tracking if s.broadcaster != nil { s.broadcaster.NotifyComplete(int(item.ID), "failed") From 8aecb554fec8ba64b380b48a14164224035990f1 Mon Sep 17 00:00:00 2001 From: javi11 Date: Mon, 20 Apr 2026 10:46:23 +0200 Subject: [PATCH 5/6] feat(api): add migrate-symlinks endpoint and Phase 2 symlink rewriter Adds RewriteLibrarySymlinks in internal/importer/migration to walk a library directory and atomically rewrite arr symlinks that point at /.ids/ to the final altmount path. Introduces DBSymlinkLookup adapter in internal/database, a new POST /import/nzbdav/migrate-symlinks handler, and enriches the existing status endpoint with migration_stats when available. --- cmd/altmount/cmd/serve.go | 1 + internal/api/nzbdav_handlers.go | 122 +++++++++- internal/api/server.go | 7 + internal/database/symlink_lookup.go | 34 +++ internal/importer/migration/symlinks.go | 119 ++++++++++ internal/importer/migration/symlinks_test.go | 228 +++++++++++++++++++ 6 files changed, 510 insertions(+), 1 deletion(-) create mode 100644 internal/database/symlink_lookup.go create mode 100644 internal/importer/migration/symlinks.go create mode 100644 internal/importer/migration/symlinks_test.go diff --git a/cmd/altmount/cmd/serve.go b/cmd/altmount/cmd/serve.go index 64177fc1d..441dbef2f 100644 --- a/cmd/altmount/cmd/serve.go +++ b/cmd/altmount/cmd/serve.go @@ -150,6 +150,7 @@ func runServe(cmd *cobra.Command, args []string) error { apiServer := setupAPIServer(app, repos, authService, configManager, metadataReader, metadataService, fs, poolManager, importerService, arrsService, mountService, progressBroadcaster, streamTracker, cacheSource) apiServer.SetLogFilePath(slogutil.GetLogFilePath(cfg.Log)) + apiServer.SetMigrationRepo(db.MigrationRepo) webdavHandler, err := setupWebDAV(cfg, fs, authService, repos.UserRepo, configManager, streamTracker) if err != nil { diff --git a/internal/api/nzbdav_handlers.go b/internal/api/nzbdav_handlers.go index 9550a0204..9c1fdf596 100644 --- a/internal/api/nzbdav_handlers.go +++ b/internal/api/nzbdav_handlers.go @@ -7,7 +7,9 @@ import ( "time" "github.com/gofiber/fiber/v2" + "github.com/javi11/altmount/internal/database" "github.com/javi11/altmount/internal/importer" + "github.com/javi11/altmount/internal/importer/migration" ) // handleImportNzbdav handles POST /import/nzbdav @@ -114,9 +116,127 @@ func (s *Server) handleGetNzbdavImportStatus(c *fiber.Ctx) error { } status := s.importerService.GetImportStatus() + resp := toImportStatusResponse(status) + + // Attach migration stats when available; omit on error to preserve existing behaviour. + if s.migrationRepo != nil { + if stats, err := s.migrationRepo.Stats(c.Context(), "nzbdav"); err == nil { + resp["migration_stats"] = map[string]any{ + "pending": stats.Pending, + "imported": stats.Imported, + "failed": stats.Failed, + "symlinks_migrated": stats.SymlinksMigrated, + "total": stats.Total, + } + } + } + + return c.Status(200).JSON(fiber.Map{ + "success": true, + "data": resp, + }) +} + +// handleMigrateNzbdavSymlinks handles POST /import/nzbdav/migrate-symlinks +// +// @Summary Migrate NZBDav library symlinks +// @Description Walks a library directory and rewrites symlinks that target the nzbdav mount to point at the altmount path instead. +// @Tags Import +// @Accept json +// @Produce json +// @Param body body object{} true "library_path, source_mount_path, dry_run" +// @Success 200 {object} APIResponse +// @Failure 400 {object} APIResponse +// @Failure 500 {object} APIResponse +// @Security BearerAuth +// @Security ApiKeyAuth +// @Router /import/nzbdav/migrate-symlinks [post] +func (s *Server) handleMigrateNzbdavSymlinks(c *fiber.Ctx) error { + var req struct { + LibraryPath string `json:"library_path"` + SourceMountPath string `json:"source_mount_path"` + DryRun bool `json:"dry_run"` + } + if err := c.BodyParser(&req); err != nil { + return c.Status(400).JSON(fiber.Map{ + "success": false, + "message": "Invalid request body", + "details": err.Error(), + }) + } + + if req.LibraryPath == "" || !filepath.IsAbs(req.LibraryPath) { + return c.Status(400).JSON(fiber.Map{ + "success": false, + "message": "library_path must be a non-empty absolute path", + }) + } + if req.SourceMountPath == "" || !filepath.IsAbs(req.SourceMountPath) { + return c.Status(400).JSON(fiber.Map{ + "success": false, + "message": "source_mount_path must be a non-empty absolute path", + }) + } + + cfg := s.configManager.GetConfig() + if cfg == nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Configuration not available", + }) + } + + // Determine which migration repo to use — prefer the dedicated field, fall back + // to nil-safe check so existing deployments without a migration repo still fail + // gracefully rather than panic. + migRepo := s.migrationRepo + if migRepo == nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Migration repository not available", + }) + } + + ctx := c.Context() + + // Backfill idempotently before walking. + if _, err := migRepo.BackfillFromImportQueue(ctx); err != nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Failed to backfill migration data", + "details": err.Error(), + }) + } + + lookup := database.NewDBSymlinkLookup(migRepo) + + report, err := migration.RewriteLibrarySymlinks( + ctx, + req.LibraryPath, + req.SourceMountPath, + cfg.MountPath, + "nzbdav", + lookup, + req.DryRun, + ) + if err != nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Symlink migration failed", + "details": err.Error(), + }) + } + return c.Status(200).JSON(fiber.Map{ "success": true, - "data": toImportStatusResponse(status), + "data": fiber.Map{ + "scanned": report.Scanned, + "matched": report.Matched, + "rewritten": report.Rewritten, + "unmatched": report.Unmatched, + "errors": report.Errors, + "dry_run": req.DryRun, + }, }) } diff --git a/internal/api/server.go b/internal/api/server.go index ae7494fa3..df011dd4b 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -63,6 +63,7 @@ type Server struct { fuseManager *FuseManager cacheSource *segcache.Source logFilePath string + migrationRepo *database.ImportMigrationRepository ready atomic.Bool } @@ -128,6 +129,11 @@ func (s *Server) SetLogFilePath(path string) { s.logFilePath = path } +// SetMigrationRepo sets the migration repository used by the migrate-symlinks endpoint. +func (s *Server) SetMigrationRepo(repo *database.ImportMigrationRepository) { + s.migrationRepo = repo +} + // SetReady sets the server as ready to accept requests func (s *Server) SetReady(ready bool) { s.ready.Store(ready) @@ -222,6 +228,7 @@ func (s *Server) SetupRoutes(app *fiber.App) { api.Post("/import/nzbdav/reset", s.handleResetNzbdavImportStatus) api.Get("/import/nzbdav/status", s.handleGetNzbdavImportStatus) api.Delete("/import/nzbdav", s.handleCancelNzbdavImport) + api.Post("/import/nzbdav/migrate-symlinks", s.handleMigrateNzbdavSymlinks) // Queue endpoints api.Get("/queue", s.handleListQueue) diff --git a/internal/database/symlink_lookup.go b/internal/database/symlink_lookup.go new file mode 100644 index 000000000..1eabe39f0 --- /dev/null +++ b/internal/database/symlink_lookup.go @@ -0,0 +1,34 @@ +package database + +import ( + "context" + "fmt" +) + +// DBSymlinkLookup adapts ImportMigrationRepository to migration.SymlinkLookup. +type DBSymlinkLookup struct { + repo *ImportMigrationRepository +} + +// NewDBSymlinkLookup creates a new DBSymlinkLookup wrapping the given repository. +func NewDBSymlinkLookup(repo *ImportMigrationRepository) *DBSymlinkLookup { + return &DBSymlinkLookup{repo: repo} +} + +// LookupFinalPath returns the final AltMount path for the given source and externalID. +// Returns ("", false, nil) when no matching row exists or the row has no final_path. +func (l *DBSymlinkLookup) LookupFinalPath(ctx context.Context, source, externalID string) (string, bool, error) { + row, err := l.repo.LookupByExternalID(ctx, source, externalID) + if err != nil { + return "", false, fmt.Errorf("lookup final path (source=%s, id=%s): %w", source, externalID, err) + } + if row == nil || row.FinalPath == nil { + return "", false, nil + } + return *row.FinalPath, true, nil +} + +// MarkSymlinksMigrated sets status=symlinks_migrated for the given row IDs. +func (l *DBSymlinkLookup) MarkSymlinksMigrated(ctx context.Context, ids []int64) error { + return l.repo.MarkSymlinksMigrated(ctx, ids) +} diff --git a/internal/importer/migration/symlinks.go b/internal/importer/migration/symlinks.go new file mode 100644 index 000000000..0e229fdc9 --- /dev/null +++ b/internal/importer/migration/symlinks.go @@ -0,0 +1,119 @@ +package migration + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" +) + +// SymlinkLookup looks up the final AltMount path for a given source and external ID. +type SymlinkLookup interface { + LookupFinalPath(ctx context.Context, source, externalID string) (finalPath string, found bool, err error) + MarkSymlinksMigrated(ctx context.Context, ids []int64) error +} + +// RewriteReport summarizes results of a symlink rewrite operation. +type RewriteReport struct { + Scanned int + Matched int + Rewritten int + Unmatched []string // symlink paths that had no matching migration row + Errors []string // errors encountered (non-fatal) +} + +// RewriteLibrarySymlinks walks libraryPath, finds symlinks whose target starts +// with sourceMountPath+"/.ids/", looks up the GUID in the lookup, and rewrites +// the symlink target to filepath.Join(altmountPath, finalPath). +// +// If dryRun is true, no filesystem changes are made but the report is populated. +func RewriteLibrarySymlinks( + ctx context.Context, + libraryPath string, + sourceMountPath string, + altmountPath string, + source string, + lookup SymlinkLookup, + dryRun bool, +) (*RewriteReport, error) { + report := &RewriteReport{} + + // Normalise source mount prefix used for matching. + prefix := filepath.Clean(sourceMountPath) + "/.ids/" + + err := filepath.WalkDir(libraryPath, func(path string, d fs.DirEntry, walkErr error) error { + // Propagate hard walk errors. + if walkErr != nil { + report.Errors = append(report.Errors, fmt.Sprintf("walk error at %s: %v", path, walkErr)) + return nil + } + + // Check context cancellation at every entry. + if err := ctx.Err(); err != nil { + return err + } + + // Only process symlinks. + if d.Type()&fs.ModeSymlink == 0 { + return nil + } + + report.Scanned++ + + target, err := os.Readlink(path) + if err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("readlink %s: %v", path, err)) + return nil + } + + // Must target our source mount's .ids directory. + if !strings.HasPrefix(target, prefix) { + return nil + } + + // Extract GUID: last path component of the target. + guid := filepath.Base(target) + + finalPath, found, err := lookup.LookupFinalPath(ctx, source, guid) + if err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("lookup %s (guid=%s): %v", path, guid, err)) + return nil + } + if !found { + report.Unmatched = append(report.Unmatched, path) + return nil + } + + report.Matched++ + + if dryRun { + return nil + } + + // Build the new target path. + newTarget := filepath.Join(altmountPath, strings.TrimPrefix(finalPath, "/")) + + // Atomic rewrite: write to a temp name then rename. + tmpPath := path + ".new" + if err := os.Symlink(newTarget, tmpPath); err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("create temp symlink %s -> %s: %v", tmpPath, newTarget, err)) + return nil + } + if err := os.Rename(tmpPath, path); err != nil { + // Clean up the temp file on rename failure. + _ = os.Remove(tmpPath) + report.Errors = append(report.Errors, fmt.Sprintf("rename %s -> %s: %v", tmpPath, path, err)) + return nil + } + + report.Rewritten++ + return nil + }) + if err != nil { + return report, fmt.Errorf("walk library path %s: %w", libraryPath, err) + } + + return report, nil +} diff --git a/internal/importer/migration/symlinks_test.go b/internal/importer/migration/symlinks_test.go new file mode 100644 index 000000000..eb1d7882f --- /dev/null +++ b/internal/importer/migration/symlinks_test.go @@ -0,0 +1,228 @@ +package migration_test + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/javi11/altmount/internal/importer/migration" +) + +// mockLookup implements SymlinkLookup for testing. +type mockLookup struct { + paths map[string]string // guid → finalPath; absent means not found + err error // if non-nil, always return this error +} + +func (m *mockLookup) LookupFinalPath(_ context.Context, _, externalID string) (string, bool, error) { + if m.err != nil { + return "", false, m.err + } + fp, ok := m.paths[externalID] + return fp, ok, nil +} + +func (m *mockLookup) MarkSymlinksMigrated(_ context.Context, _ []int64) error { + return nil +} + +func TestRewriteLibrarySymlinks(t *testing.T) { + t.Parallel() + + const ( + sourceMountPath = "/mnt/nzbdav" + altmountPath = "/mnt/altmount" + source = "nzbdav" + ) + + tests := []struct { + name string + setup func(t *testing.T, dir string) // create symlinks inside dir + lookup *mockLookup + dryRun bool + wantScanned int + wantMatched int + wantRewritten int + wantUnmatched int + wantErrors int + // optional post-check on filesystem state + postCheck func(t *testing.T, dir string) + }{ + { + name: "match and rewrite", + setup: func(t *testing.T, dir string) { + t.Helper() + // Create a symlink pointing to sourceMountPath/.ids/abc123 + target := sourceMountPath + "/.ids/abc123" + link := filepath.Join(dir, "movie.mkv") + if err := os.Symlink(target, link); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{ + paths: map[string]string{ + "abc123": "/movies/Movie (2020)/Movie (2020).mkv", + }, + }, + dryRun: false, + wantScanned: 1, + wantMatched: 1, + wantRewritten: 1, + postCheck: func(t *testing.T, dir string) { + t.Helper() + link := filepath.Join(dir, "movie.mkv") + got, err := os.Readlink(link) + if err != nil { + t.Fatalf("readlink after rewrite: %v", err) + } + want := filepath.Join(altmountPath, "movies/Movie (2020)/Movie (2020).mkv") + if got != want { + t.Errorf("symlink target: got %q, want %q", got, want) + } + }, + }, + { + name: "unmatched guid", + setup: func(t *testing.T, dir string) { + t.Helper() + target := sourceMountPath + "/.ids/unknown-guid" + if err := os.Symlink(target, filepath.Join(dir, "unknown.mkv")); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{paths: map[string]string{}}, + wantScanned: 1, + wantMatched: 0, + wantRewritten: 0, + wantUnmatched: 1, + }, + { + name: "non-nzbdav symlink skipped", + setup: func(t *testing.T, dir string) { + t.Helper() + // Target doesn't contain sourceMountPath/.ids/ + if err := os.Symlink("/some/other/path/file.mkv", filepath.Join(dir, "other.mkv")); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{paths: map[string]string{}}, + wantScanned: 1, + wantMatched: 0, + wantRewritten: 0, + wantUnmatched: 0, + }, + { + name: "dry run - no filesystem change", + setup: func(t *testing.T, dir string) { + t.Helper() + target := sourceMountPath + "/.ids/dryrun-guid" + link := filepath.Join(dir, "dry.mkv") + if err := os.Symlink(target, link); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{ + paths: map[string]string{ + "dryrun-guid": "/movies/Dry/Dry.mkv", + }, + }, + dryRun: true, + wantScanned: 1, + wantMatched: 1, + wantRewritten: 0, + postCheck: func(t *testing.T, dir string) { + t.Helper() + link := filepath.Join(dir, "dry.mkv") + got, err := os.Readlink(link) + if err != nil { + t.Fatalf("readlink after dry run: %v", err) + } + // Target must remain unchanged. + want := sourceMountPath + "/.ids/dryrun-guid" + if got != want { + t.Errorf("dry run: symlink target changed: got %q, want %q", got, want) + } + }, + }, + { + name: "context cancellation stops walk", + setup: func(t *testing.T, dir string) { + t.Helper() + // Create several symlinks so the walk has entries to process. + for i := 0; i < 5; i++ { + target := sourceMountPath + "/.ids/guid-cancel" + link := filepath.Join(dir, "file") + // Only create the first one to keep setup simple. + if i == 0 { + if err := os.Symlink(target, link); err != nil { + t.Fatalf("setup: %v", err) + } + } + } + }, + lookup: &mockLookup{paths: map[string]string{"guid-cancel": "/movies/x.mkv"}}, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + tc.setup(t, dir) + + ctx := context.Background() + + if tc.name == "context cancellation stops walk" { + // Cancel context immediately to verify walk stops. + cancelled, cancel := context.WithCancel(ctx) + cancel() + ctx = cancelled + } + + report, err := migration.RewriteLibrarySymlinks( + ctx, + dir, + sourceMountPath, + altmountPath, + source, + tc.lookup, + tc.dryRun, + ) + + if tc.name == "context cancellation stops walk" { + // We just verify it returns a context error and doesn't panic. + if err == nil || !errors.Is(err, context.Canceled) { + t.Errorf("expected context.Canceled error, got: %v", err) + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if report.Scanned != tc.wantScanned { + t.Errorf("Scanned: got %d, want %d", report.Scanned, tc.wantScanned) + } + if report.Matched != tc.wantMatched { + t.Errorf("Matched: got %d, want %d", report.Matched, tc.wantMatched) + } + if report.Rewritten != tc.wantRewritten { + t.Errorf("Rewritten: got %d, want %d", report.Rewritten, tc.wantRewritten) + } + if len(report.Unmatched) != tc.wantUnmatched { + t.Errorf("Unmatched: got %d, want %d (entries: %v)", len(report.Unmatched), tc.wantUnmatched, report.Unmatched) + } + if len(report.Errors) != tc.wantErrors { + t.Errorf("Errors: got %d, want %d (entries: %v)", len(report.Errors), tc.wantErrors, report.Errors) + } + + if tc.postCheck != nil { + tc.postCheck(t, dir) + } + }) + } +} From de623aea6a8f7923e6167351a2ffab60333f0d1a Mon Sep 17 00:00:00 2001 From: javi11 Date: Wed, 22 Apr 2026 15:34:12 +0200 Subject: [PATCH 6/6] fix(arrs): skip queue items from other download clients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ARR queue cleanup worker iterated every queue record and, via the path-gone ghost check, removed items whose OutputPath was invisible to AltMount — which includes items owned by other download clients (qBittorrent, real SABnzbd, etc.). Filter the cleanup loops by DownloadClient so only AltMount's own items are ever touched. Fixes #523 --- internal/arrs/registrar/manager.go | 8 +++++++- internal/arrs/worker/worker.go | 15 +++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/internal/arrs/registrar/manager.go b/internal/arrs/registrar/manager.go index 170529126..31bc4ea7e 100644 --- a/internal/arrs/registrar/manager.go +++ b/internal/arrs/registrar/manager.go @@ -14,6 +14,12 @@ import ( "golift.io/starr/sonarr" ) +// AltmountDownloadClientName is the name AltMount registers itself under as a +// SABnzbd-compatible download client in Radarr/Sonarr/Lidarr/etc. Other code +// (e.g. the queue cleanup worker) imports this to distinguish AltMount's own +// queue items from those owned by other download clients. +const AltmountDownloadClientName = "AltMount (SABnzbd)" + type Manager struct { instances *instances.Manager clients *clients.Manager @@ -366,7 +372,7 @@ func (m *Manager) EnsureWebhookRegistration(ctx context.Context, altmountURL str // EnsureDownloadClientRegistration ensures that AltMount is registered as a SABnzbd download client in all enabled ARR instances func (m *Manager) EnsureDownloadClientRegistration(ctx context.Context, altmountHost string, altmountPort int, urlBase string, apiKey string) error { allInstances := m.instances.GetAllInstances() - clientName := "AltMount (SABnzbd)" + clientName := AltmountDownloadClientName slog.InfoContext(ctx, "Ensuring AltMount download client registration in ARR instances", "host", altmountHost, diff --git a/internal/arrs/worker/worker.go b/internal/arrs/worker/worker.go index 2778a8552..e16caa865 100644 --- a/internal/arrs/worker/worker.go +++ b/internal/arrs/worker/worker.go @@ -13,6 +13,7 @@ import ( "github.com/javi11/altmount/internal/arrs/clients" "github.com/javi11/altmount/internal/arrs/instances" "github.com/javi11/altmount/internal/arrs/model" + "github.com/javi11/altmount/internal/arrs/registrar" "github.com/javi11/altmount/internal/config" "github.com/javi11/altmount/internal/database" "golift.io/starr" @@ -179,6 +180,13 @@ func (w *Worker) cleanupRadarrQueue(ctx context.Context, instance *model.ConfigI var idsToRemove []int64 for _, q := range queue.Records { + // Only operate on queue items owned by AltMount's registered download client. + // Items from other clients (qBittorrent, real SABnzbd, etc.) may reference + // paths AltMount cannot see and must never be touched — see issue #523. + if q.DownloadClient != registrar.AltmountDownloadClientName { + continue + } + // Strategy 1: Ghost detection — cleanup already-imported files if w.checkGhostByImportHistory(ctx, q.OutputPath, cfg, instance.Name, q.Title) { idsToRemove = append(idsToRemove, q.ID) @@ -301,6 +309,13 @@ func (w *Worker) cleanupSonarrQueue(ctx context.Context, instance *model.ConfigI var idsToRemove []int64 for _, q := range queue.Records { + // Only operate on queue items owned by AltMount's registered download client. + // Items from other clients (qBittorrent, real SABnzbd, etc.) may reference + // paths AltMount cannot see and must never be touched — see issue #523. + if q.DownloadClient != registrar.AltmountDownloadClientName { + continue + } + // Strategy 1: Immediate cleanup for already imported files if w.checkGhostByImportHistory(ctx, q.OutputPath, cfg, instance.Name, q.Title) { idsToRemove = append(idsToRemove, q.ID)