diff --git a/cmd/altmount/cmd/serve.go b/cmd/altmount/cmd/serve.go index 64177fc1d..441dbef2f 100644 --- a/cmd/altmount/cmd/serve.go +++ b/cmd/altmount/cmd/serve.go @@ -150,6 +150,7 @@ func runServe(cmd *cobra.Command, args []string) error { apiServer := setupAPIServer(app, repos, authService, configManager, metadataReader, metadataService, fs, poolManager, importerService, arrsService, mountService, progressBroadcaster, streamTracker, cacheSource) apiServer.SetLogFilePath(slogutil.GetLogFilePath(cfg.Log)) + apiServer.SetMigrationRepo(db.MigrationRepo) webdavHandler, err := setupWebDAV(cfg, fs, authService, repos.UserRepo, configManager, streamTracker) if err != nil { diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index a2b722c90..091b25343 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -16,6 +16,8 @@ import type { ImportStatusResponse, LibrarySyncStatus, ManualScanRequest, + NzbdavMigrateSymlinksRequest, + NzbdavMigrateSymlinksResponse, PoolMetrics, QueueHistoricalStatsResponse, QueueItem, @@ -86,7 +88,13 @@ export class APIClient { if (response.status === 401) { window.dispatchEvent(new CustomEvent("api:unauthorized")); } - const errorData = await response.json(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let errorData: any = {}; + try { + errorData = await response.json(); + } catch { + // empty or non-JSON error body — fall through to status-based message + } const errorMessage = (typeof errorData.error === "object" ? errorData.error?.message : errorData.error) || errorData.message || @@ -844,6 +852,28 @@ export class APIClient { }); } + async clearPendingNzbdavMigrations() { + return this.request<{ message: string; data: { deleted: number } }>( + "/import/nzbdav/pending-migrations", + { method: "DELETE" }, + ); + } + + async clearAllNzbdavMigrations() { + return this.request<{ message: string; data: { deleted: number } }>( + "/import/nzbdav/migrations", + { method: "DELETE" }, + ); + } + + async migrateNzbdavSymlinks(req: NzbdavMigrateSymlinksRequest) { + return this.request("/import/nzbdav/migrate-symlinks", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(req), + }); + } + // SABnzbd file upload endpoint async uploadNzbFile(file: File, apiKey: string): Promise { const formData = new FormData(); diff --git a/frontend/src/components/files/FileBrowserModal.tsx b/frontend/src/components/files/FileBrowserModal.tsx index e19f02b0a..16284b254 100644 --- a/frontend/src/components/files/FileBrowserModal.tsx +++ b/frontend/src/components/files/FileBrowserModal.tsx @@ -133,7 +133,8 @@ export function FileBrowserModal({ - diff --git a/frontend/src/components/queue/ImportMethods.tsx b/frontend/src/components/queue/ImportMethods.tsx index 8bc48ab44..e19b7d4c2 100644 --- a/frontend/src/components/queue/ImportMethods.tsx +++ b/frontend/src/components/queue/ImportMethods.tsx @@ -1,5 +1,6 @@ import { AlertCircle, + ArrowRight, CheckCircle2, Database, Download, @@ -20,7 +21,11 @@ import { useToast } from "../../contexts/ToastContext"; import { useCancelNzbdavImport, useCancelScan, + useClearAllNzbdavMigrations, + useClearPendingNzbdavMigrations, + useMigrateNzbdavSymlinks, useNzbdavImportStatus, + useQueueStats, useResetNzbdavImportStatus, useScanStatus, useSearchNZBByName, @@ -29,6 +34,7 @@ import { useUploadToQueue, } from "../../hooks/useApi"; import { useConfig } from "../../hooks/useConfig"; +import type { NzbdavMigrateSymlinksResponse } from "../../types/api"; import { ScanStatus } from "../../types/api"; import { FileBrowserModal } from "../files/FileBrowserModal"; import { ErrorAlert } from "../ui/ErrorAlert"; @@ -39,7 +45,8 @@ type ImportTab = "nzbdav" | "directory" | "upload"; const IMPORT_SECTIONS = { nzbdav: { title: "From NZBDav", - description: "Import your existing NZBDav database to populate the library.", + description: + "Import your existing NZBDav database, then migrate arr library symlinks to point at AltMount.", icon: Database, }, directory: { @@ -604,6 +611,7 @@ function StatusBadge({ status }: { status: string }) { } function NzbDavImportSection() { + const [nzbdavTab, setNzbdavTab] = useState<"import" | "migrate">("import"); const [inputMethod, setInputMethod] = useState<"server" | "upload">("server"); const [selectedDbPath, setSelectedDbPath] = useState(""); const [blobsPath, setBlobsPath] = useState(""); @@ -615,8 +623,11 @@ function NzbDavImportSection() { const [isBlobsBrowserOpen, setIsBlobsBrowserOpen] = useState(false); const { data: importStatus } = useNzbdavImportStatus(2000); + const { data: queueStats } = useQueueStats(2000); const cancelImport = useCancelNzbdavImport(); const resetImport = useResetNzbdavImportStatus(); + const clearPending = useClearPendingNzbdavMigrations(); + const clearAllMigrations = useClearAllNzbdavMigrations(); const isRunning = importStatus?.status === "running"; const isCanceling = importStatus?.status === "canceling"; @@ -712,6 +723,204 @@ function NzbDavImportSection() { } }; + const handleClearAllMigrations = async () => { + const total = importStatus?.migration_stats?.total ?? 0; + const confirmed = window.confirm( + `Delete ALL ${total} NZBDav migration row(s)?\n\nThis will force a full re-import of every blob on the next scan. Use this only if you've deleted the imported files from AltMount and want to start over.`, + ); + if (!confirmed) return; + try { + const res = await clearAllMigrations.mutateAsync(); + await resetImport.mutateAsync(); + showToast({ + title: "Cleared", + message: `Deleted ${res.data?.deleted ?? 0} migration rows and reset scanner.`, + type: "success", + }); + } catch (err) { + showToast({ + title: "Clear Failed", + message: err instanceof Error ? err.message : "Could not clear migrations", + type: "error", + }); + } + }; + + const handleClearPendingAndReset = async () => { + const pendingCount = importStatus?.migration_stats?.pending ?? 0; + const confirmed = window.confirm( + `Delete ${pendingCount} pending migration row(s) and reset the scanner?\n\nImported and migrated rows are preserved. Use this to start fresh after a cancelled or stuck import.`, + ); + if (!confirmed) return; + try { + const res = await clearPending.mutateAsync(); + await resetImport.mutateAsync(); + showToast({ + title: "Cleared", + message: `Deleted ${res.data?.deleted ?? 0} pending migration rows and reset scanner.`, + type: "success", + }); + } catch (err) { + showToast({ + title: "Clear Failed", + message: err instanceof Error ? err.message : "Could not clear pending migrations", + type: "error", + }); + } + }; + + const migrationStats = importStatus?.migration_stats; + const queueProcessing = queueStats?.total_processing ?? 0; + const queueQueued = queueStats?.total_queued ?? 0; + const queueDrained = queueProcessing === 0 && queueQueued === 0; + const showPhase2 = isCompleted || (migrationStats?.symlinks_migrated ?? 0) > 0; + + return ( +
+
+ + +
+ + {nzbdavTab === "migrate" ? ( + + ) : ( + + )} +
+ ); +} + +type NzbdavImportTabContentProps = { + error: Error | null; + isRunning: boolean; + isCanceling: boolean; + isCompleted: boolean; + hasResults: boolean; + importStatus: ReturnType["data"]; + progressPercent: number; + inputMethod: "server" | "upload"; + setInputMethod: (v: "server" | "upload") => void; + selectedDbPath: string; + setSelectedDbPath: (v: string) => void; + blobsPath: string; + setBlobsPath: (v: string) => void; + selectedFile: File | null; + isLoading: boolean; + handleSubmit: (e: React.FormEvent) => Promise; + handleFileUpload: (e: React.ChangeEvent) => void; + handleCancel: () => Promise; + handleReset: () => Promise; + handleClearPendingAndReset: () => Promise; + isFileBrowserOpen: boolean; + setIsFileBrowserOpen: (v: boolean) => void; + isBlobsBrowserOpen: boolean; + setIsBlobsBrowserOpen: (v: boolean) => void; + handleFileSelect: (path: string) => void; + handleBlobsSelect: (path: string) => void; + cancelImport: ReturnType; + resetImport: ReturnType; + clearPending: ReturnType; + clearAllMigrations: ReturnType; + handleClearAllMigrations: () => Promise; + showPhase2: boolean; + migrationStats?: MigrationStats; + queueDrained: boolean; + queueProcessing: number; + queueQueued: number; +}; + +function NzbDavImportTabContent({ + error, + isRunning, + isCanceling, + isCompleted, + hasResults, + importStatus, + progressPercent, + inputMethod, + setInputMethod, + selectedDbPath, + setSelectedDbPath, + blobsPath, + setBlobsPath, + selectedFile, + isLoading, + handleSubmit, + handleFileUpload, + handleCancel, + handleReset, + handleClearPendingAndReset, + isFileBrowserOpen, + setIsFileBrowserOpen, + isBlobsBrowserOpen, + setIsBlobsBrowserOpen, + handleFileSelect, + handleBlobsSelect, + cancelImport, + resetImport, + clearPending, + clearAllMigrations, + handleClearAllMigrations, + showPhase2, + migrationStats, + queueDrained, + queueProcessing, + queueQueued, +}: NzbdavImportTabContentProps) { return (
{error && } @@ -766,6 +975,17 @@ function NzbDavImportSection() { Stop Import )} + {(isRunning || isCanceling) && ( + + )} {!isRunning && !isCanceling && ( +
+ )} + + {(importStatus?.migration_stats?.total ?? 0) > 0 && ( +
+
+ + + {importStatus?.migration_stats?.total} migration row(s) exist + from previous imports. Clear all to force a full re-import (use after deleting the + imported files from AltMount). + +
+ +
+ )} +

@@ -964,6 +1225,15 @@ function NzbDavImportSection() { )} + {showPhase2 && ( + + )} + setIsFileBrowserOpen(false)} @@ -982,6 +1252,313 @@ function NzbDavImportSection() { ); } +interface MigrationStats { + pending: number; + imported: number; + failed: number; + symlinks_migrated: number; + total: number; +} + +interface SymlinkMigrationFormProps { + migrationStats?: MigrationStats; + intro?: React.ReactNode; + disabled?: boolean; + disabledBanner?: React.ReactNode; +} + +function SymlinkMigrationForm({ + migrationStats, + intro, + disabled = false, + disabledBanner, +}: SymlinkMigrationFormProps) { + const [libraryPath, setLibraryPath] = useState(""); + const [sourceMountPath, setSourceMountPath] = useState(""); + const [dryRun, setDryRun] = useState(true); + const [lastReport, setLastReport] = useState(null); + const migrate = useMigrateNzbdavSymlinks(); + const { showToast } = useToast(); + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + try { + const report = await migrate.mutateAsync({ libraryPath, sourceMountPath, dryRun }); + setLastReport(report); + showToast({ + title: dryRun ? "Dry Run Complete" : "Migration Complete", + message: dryRun + ? `Would rewrite ${report.matched} of ${report.scanned} symlinks.` + : `Rewrote ${report.rewritten} of ${report.scanned} symlinks.`, + type: "success", + }); + } catch (err) { + showToast({ + title: "Migration Failed", + message: err instanceof Error ? err.message : "An error occurred", + type: "error", + }); + } + }; + + const handleApply = async () => { + try { + const report = await migrate.mutateAsync({ libraryPath, sourceMountPath, dryRun: false }); + setLastReport(report); + showToast({ + title: "Migration Complete", + message: `Rewrote ${report.rewritten} of ${report.scanned} symlinks.`, + type: "success", + }); + } catch (err) { + showToast({ + title: "Migration Failed", + message: err instanceof Error ? err.message : "An error occurred", + type: "error", + }); + } + }; + + return ( +
+ {intro && ( +
+ +
{intro}
+
+ )} + + {migrationStats && migrationStats.total > 0 && ( +
+
+ + Imported + + + {migrationStats.imported} + +
+
+ + Migrated + + + {migrationStats.symlinks_migrated} + +
+
+ + Failed + + {migrationStats.failed} +
+
+ )} + + {disabled ? ( + disabledBanner + ) : ( +
+
+ Library Path + setLibraryPath(e.target.value)} + required + /> +

+ The directory containing your arr library symlinks. +

+
+ +
+ NZBDav Mount Path + setSourceMountPath(e.target.value)} + required + /> +

+ The root path of your nzbdav mount (symlinks currently point here). +

+
+ + + +
+ + + {lastReport?.dry_run && lastReport.matched > 0 && ( + + )} +
+ + )} + + {lastReport && ( +
+
+ + + {lastReport.dry_run ? "Dry Run Results" : "Migration Results"} + +
+
+
+
{lastReport.scanned}
+
Scanned
+
+
+
{lastReport.matched}
+
Matched
+
+
+
{lastReport.rewritten}
+
+ {lastReport.dry_run ? "Would Rewrite" : "Rewritten"} +
+
+
+
+ {(lastReport.unmatched ?? []).length} +
+
Unmatched
+
+
+ {(lastReport.unmatched ?? []).length > 0 && ( +
+ + {(lastReport.unmatched ?? []).length} unmatched GUIDs + +
    + {(lastReport.unmatched ?? []).map((guid) => ( +
  • + {guid} +
  • + ))} +
+
+ )} + {(lastReport.skipped_wrong_prefix ?? 0) > 0 && ( +
+ + + {lastReport.skipped_wrong_prefix} symlink(s) skipped — their target + doesn't point at the configured NZBDav Mount Path. Check that the path is correct. + +
+ )} + {(lastReport.errors ?? []).length > 0 && ( +
+ + {(lastReport.errors ?? []).length} error(s) — check server logs +
+ )} +
+ )} +
+ ); +} + +interface NzbdavPhase2SectionProps { + migrationStats?: MigrationStats; + queueDrained: boolean; + queueProcessing: number; + queueQueued: number; +} + +function NzbdavPhase2Section({ + migrationStats, + queueDrained, + queueProcessing, + queueQueued, +}: NzbdavPhase2SectionProps) { + const disabledBanner = ( +
+
+ + Waiting for queue to finish +
+

+ Symlink rewrite runs after all imported NZBs finish processing in the queue. +

+

+ {queueProcessing} processing · {queueQueued} queued + remaining +

+
+ ); + + return ( +
+
+

+ Phase 2 — Migrate Library Symlinks +

+
+
+ + + Rewrite your arr library symlinks from the nzbdav mount to AltMount. Available once the + import queue drains. Run a dry run first to preview changes, then + apply. +

+ } + disabled={!queueDrained} + disabledBanner={disabledBanner} + /> +
+ ); +} + +function MigrateSymlinksSection() { + return ( +
+ + Use this if you've already imported NZBs (either through AltMount or a previous NZBDav + setup) and just need to rewrite your arr library symlinks to point at AltMount. Only + rows already marked as imported will be matched. +

+ } + /> +
+ ); +} + function DirectoryScanSection() { const [scanPath, setScanPath] = useState(""); const [validationError, setValidationError] = useState(""); diff --git a/frontend/src/hooks/useApi.ts b/frontend/src/hooks/useApi.ts index ea3007cce..6fac9e402 100644 --- a/frontend/src/hooks/useApi.ts +++ b/frontend/src/hooks/useApi.ts @@ -20,10 +20,11 @@ export const useQueue = (params?: { }); }; -export const useQueueStats = () => { +export const useQueueStats = (refetchInterval?: number) => { return useQuery({ queryKey: ["queue", "stats"], queryFn: () => apiClient.getQueueStats(), + refetchInterval, }); }; @@ -461,6 +462,44 @@ export const useCancelNzbdavImport = () => { }); }; +export const useClearPendingNzbdavMigrations = () => { + const queryClient = useQueryClient(); + + return useMutation({ + mutationFn: () => apiClient.clearPendingNzbdavMigrations(), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ["import", "nzbdav", "status"] }); + }, + }); +}; + +export const useClearAllNzbdavMigrations = () => { + const queryClient = useQueryClient(); + + return useMutation({ + mutationFn: () => apiClient.clearAllNzbdavMigrations(), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ["import", "nzbdav", "status"] }); + }, + }); +}; + +export const useMigrateNzbdavSymlinks = () => { + const queryClient = useQueryClient(); + + return useMutation({ + mutationFn: (req: { libraryPath: string; sourceMountPath: string; dryRun: boolean }) => + apiClient.migrateNzbdavSymlinks({ + library_path: req.libraryPath, + source_mount_path: req.sourceMountPath, + dry_run: req.dryRun, + }), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ["import", "nzbdav", "status"] }); + }, + }); +}; + // Native upload hook (using JWT authentication) export const useUploadToQueue = () => { const queryClient = useQueryClient(); diff --git a/frontend/src/types/api.ts b/frontend/src/types/api.ts index af85fae4c..cf5da2005 100644 --- a/frontend/src/types/api.ts +++ b/frontend/src/types/api.ts @@ -134,6 +134,14 @@ export interface ScanStatusResponse { } // Import Job types +export interface MigrationStats { + pending: number; + imported: number; + failed: number; + symlinks_migrated: number; + total: number; +} + export interface ImportStatusResponse { status: "idle" | "running" | "canceling" | "completed"; total: number; @@ -141,6 +149,23 @@ export interface ImportStatusResponse { failed: number; skipped?: number; last_error?: string; + migration_stats?: MigrationStats; +} + +export interface NzbdavMigrateSymlinksRequest { + library_path: string; + source_mount_path: string; + dry_run: boolean; +} + +export interface NzbdavMigrateSymlinksResponse { + scanned: number; + matched: number; + rewritten: number; + skipped_wrong_prefix?: number; + unmatched: string[]; + errors: string[]; + dry_run: boolean; } // Health types diff --git a/internal/api/nzbdav_handlers.go b/internal/api/nzbdav_handlers.go index 9550a0204..5cea94c57 100644 --- a/internal/api/nzbdav_handlers.go +++ b/internal/api/nzbdav_handlers.go @@ -7,7 +7,9 @@ import ( "time" "github.com/gofiber/fiber/v2" + "github.com/javi11/altmount/internal/database" "github.com/javi11/altmount/internal/importer" + "github.com/javi11/altmount/internal/importer/migration" ) // handleImportNzbdav handles POST /import/nzbdav @@ -114,9 +116,128 @@ func (s *Server) handleGetNzbdavImportStatus(c *fiber.Ctx) error { } status := s.importerService.GetImportStatus() + resp := toImportStatusResponse(status) + + // Attach migration stats when available; omit on error to preserve existing behaviour. + if s.migrationRepo != nil { + if stats, err := s.migrationRepo.Stats(c.Context(), "nzbdav"); err == nil { + resp["migration_stats"] = map[string]any{ + "pending": stats.Pending, + "imported": stats.Imported, + "failed": stats.Failed, + "symlinks_migrated": stats.SymlinksMigrated, + "total": stats.Total, + } + } + } + return c.Status(200).JSON(fiber.Map{ "success": true, - "data": toImportStatusResponse(status), + "data": resp, + }) +} + +// handleMigrateNzbdavSymlinks handles POST /import/nzbdav/migrate-symlinks +// +// @Summary Migrate NZBDav library symlinks +// @Description Walks a library directory and rewrites symlinks that target the nzbdav mount to point at the altmount path instead. +// @Tags Import +// @Accept json +// @Produce json +// @Param body body object{} true "library_path, source_mount_path, dry_run" +// @Success 200 {object} APIResponse +// @Failure 400 {object} APIResponse +// @Failure 500 {object} APIResponse +// @Security BearerAuth +// @Security ApiKeyAuth +// @Router /import/nzbdav/migrate-symlinks [post] +func (s *Server) handleMigrateNzbdavSymlinks(c *fiber.Ctx) error { + var req struct { + LibraryPath string `json:"library_path"` + SourceMountPath string `json:"source_mount_path"` + DryRun bool `json:"dry_run"` + } + if err := c.BodyParser(&req); err != nil { + return c.Status(400).JSON(fiber.Map{ + "success": false, + "message": "Invalid request body", + "details": err.Error(), + }) + } + + if req.LibraryPath == "" || !filepath.IsAbs(req.LibraryPath) { + return c.Status(400).JSON(fiber.Map{ + "success": false, + "message": "library_path must be a non-empty absolute path", + }) + } + if req.SourceMountPath == "" || !filepath.IsAbs(req.SourceMountPath) { + return c.Status(400).JSON(fiber.Map{ + "success": false, + "message": "source_mount_path must be a non-empty absolute path", + }) + } + + cfg := s.configManager.GetConfig() + if cfg == nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Configuration not available", + }) + } + + // Determine which migration repo to use — prefer the dedicated field, fall back + // to nil-safe check so existing deployments without a migration repo still fail + // gracefully rather than panic. + migRepo := s.migrationRepo + if migRepo == nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Migration repository not available", + }) + } + + ctx := c.Context() + + // Backfill idempotently before walking. + if _, err := migRepo.BackfillFromImportQueue(ctx); err != nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Failed to backfill migration data", + "details": err.Error(), + }) + } + + lookup := database.NewDBSymlinkLookup(migRepo) + + report, err := migration.RewriteLibrarySymlinks( + ctx, + req.LibraryPath, + req.SourceMountPath, + cfg.MountPath, + "nzbdav", + lookup, + req.DryRun, + ) + if err != nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Symlink migration failed", + "details": err.Error(), + }) + } + + return c.Status(200).JSON(fiber.Map{ + "success": true, + "data": fiber.Map{ + "scanned": report.Scanned, + "matched": report.Matched, + "rewritten": report.Rewritten, + "skipped_wrong_prefix": report.SkippedWrongPrefix, + "unmatched": report.Unmatched, + "errors": report.Errors, + "dry_run": req.DryRun, + }, }) } @@ -178,6 +299,78 @@ func (s *Server) handleResetNzbdavImportStatus(c *fiber.Ctx) error { }) } +// handleClearPendingNzbdavMigrations handles DELETE /import/nzbdav/pending-migrations +// +// @Summary Clear pending NZBDav migration rows +// @Description Deletes all import_migrations rows with status='pending' for source='nzbdav'. Keeps imported/symlinks_migrated rows untouched. Use to remove orphaned rows from a cancelled/failed import before re-importing. +// @Tags Import +// @Produce json +// @Success 200 {object} APIResponse +// @Security BearerAuth +// @Security ApiKeyAuth +// @Router /import/nzbdav/pending-migrations [delete] +func (s *Server) handleClearPendingNzbdavMigrations(c *fiber.Ctx) error { + if s.migrationRepo == nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Migration repository not available", + }) + } + + deleted, err := s.migrationRepo.DeletePendingBySource(c.Context(), "nzbdav") + if err != nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Failed to clear pending migrations", + "details": err.Error(), + }) + } + + return c.Status(200).JSON(fiber.Map{ + "success": true, + "message": fmt.Sprintf("Cleared %d pending migration rows", deleted), + "data": fiber.Map{ + "deleted": deleted, + }, + }) +} + +// handleClearAllNzbdavMigrations handles DELETE /import/nzbdav/migrations +// +// @Summary Clear ALL NZBDav migration rows +// @Description Deletes every import_migrations row for source='nzbdav' regardless of status. Use to force a full re-import after the imported files have been deleted from AltMount. This will cause the scanner to re-process every blob on the next import. +// @Tags Import +// @Produce json +// @Success 200 {object} APIResponse +// @Security BearerAuth +// @Security ApiKeyAuth +// @Router /import/nzbdav/migrations [delete] +func (s *Server) handleClearAllNzbdavMigrations(c *fiber.Ctx) error { + if s.migrationRepo == nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Migration repository not available", + }) + } + + deleted, err := s.migrationRepo.DeleteAllBySource(c.Context(), "nzbdav") + if err != nil { + return c.Status(500).JSON(fiber.Map{ + "success": false, + "message": "Failed to clear migrations", + "details": err.Error(), + }) + } + + return c.Status(200).JSON(fiber.Map{ + "success": true, + "message": fmt.Sprintf("Cleared %d migration rows", deleted), + "data": fiber.Map{ + "deleted": deleted, + }, + }) +} + func toImportStatusResponse(info importer.ImportInfo) map[string]any { return map[string]any{ "status": string(info.Status), diff --git a/internal/api/server.go b/internal/api/server.go index ae7494fa3..d923ed519 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -63,6 +63,7 @@ type Server struct { fuseManager *FuseManager cacheSource *segcache.Source logFilePath string + migrationRepo *database.ImportMigrationRepository ready atomic.Bool } @@ -128,6 +129,11 @@ func (s *Server) SetLogFilePath(path string) { s.logFilePath = path } +// SetMigrationRepo sets the migration repository used by the migrate-symlinks endpoint. +func (s *Server) SetMigrationRepo(repo *database.ImportMigrationRepository) { + s.migrationRepo = repo +} + // SetReady sets the server as ready to accept requests func (s *Server) SetReady(ready bool) { s.ready.Store(ready) @@ -222,6 +228,9 @@ func (s *Server) SetupRoutes(app *fiber.App) { api.Post("/import/nzbdav/reset", s.handleResetNzbdavImportStatus) api.Get("/import/nzbdav/status", s.handleGetNzbdavImportStatus) api.Delete("/import/nzbdav", s.handleCancelNzbdavImport) + api.Delete("/import/nzbdav/pending-migrations", s.handleClearPendingNzbdavMigrations) + api.Delete("/import/nzbdav/migrations", s.handleClearAllNzbdavMigrations) + api.Post("/import/nzbdav/migrate-symlinks", s.handleMigrateNzbdavSymlinks) // Queue endpoints api.Get("/queue", s.handleListQueue) diff --git a/internal/database/db.go b/internal/database/db.go index b62ddbf22..69b804da5 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -19,7 +19,8 @@ type DB struct { conn *sql.DB dialect dialectHelper // Repository is kept for backwards-compat; prefer using Connection() directly. - Repository *QueueRepository + Repository *QueueRepository + MigrationRepo *ImportMigrationRepository } // Config holds database configuration. @@ -86,6 +87,7 @@ func newSQLiteDB(config Config) (*DB, error) { dh := dialectHelper{d: DialectSQLite} db := &DB{conn: conn, dialect: dh} db.Repository = NewQueueRepository(conn, DialectSQLite) + db.MigrationRepo = NewImportMigrationRepository(conn, DialectSQLite) return db, nil } @@ -114,6 +116,7 @@ func newPostgresDB(config Config) (*DB, error) { dh := dialectHelper{d: DialectPostgres} db := &DB{conn: conn, dialect: dh} db.Repository = NewQueueRepository(conn, DialectPostgres) + db.MigrationRepo = NewImportMigrationRepository(conn, DialectPostgres) return db, nil } diff --git a/internal/database/import_migration_repository.go b/internal/database/import_migration_repository.go new file mode 100644 index 000000000..0aa857403 --- /dev/null +++ b/internal/database/import_migration_repository.go @@ -0,0 +1,377 @@ +package database + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" +) + +// ImportMigrationRepository handles database operations for import_migrations. +type ImportMigrationRepository struct { + db *dialectAwareDB + dialect dialectHelper +} + +// NewImportMigrationRepository creates a new ImportMigrationRepository. +func NewImportMigrationRepository(db *sql.DB, d Dialect) *ImportMigrationRepository { + return &ImportMigrationRepository{ + db: newDialectAwareDB(db, d), + dialect: dialectHelper{d: d}, + } +} + +// Upsert inserts or updates a migration row keyed by (source, external_id). +// Returns the row ID. +func (r *ImportMigrationRepository) Upsert(ctx context.Context, row *ImportMigration) (int64, error) { + query := ` + INSERT INTO import_migrations + (source, external_id, queue_item_id, relative_path, final_path, status, error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + ON CONFLICT(source, external_id) DO UPDATE SET + queue_item_id = COALESCE(excluded.queue_item_id, import_migrations.queue_item_id), + relative_path = excluded.relative_path, + final_path = COALESCE(excluded.final_path, import_migrations.final_path), + status = excluded.status, + error = excluded.error, + updated_at = datetime('now') + ` + args := []any{ + row.Source, row.ExternalID, row.QueueItemID, + row.RelativePath, row.FinalPath, string(row.Status), row.Error, + } + + if r.dialect.IsPostgres() { + var id int64 + err := r.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&id) + if err != nil && err != sql.ErrNoRows { + return 0, fmt.Errorf("upsert import_migration: %w", err) + } + return id, nil + } + + res, err := r.db.ExecContext(ctx, query, args...) + if err != nil { + return 0, fmt.Errorf("upsert import_migration: %w", err) + } + id, err := res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("upsert import_migration last insert id: %w", err) + } + return id, nil +} + +// MarkImported sets status=imported and final_path for all rows matching queue_item_id. +func (r *ImportMigrationRepository) MarkImported(ctx context.Context, queueItemID int64, finalPath string) error { + query := ` + UPDATE import_migrations + SET status = 'imported', final_path = ?, updated_at = datetime('now') + WHERE queue_item_id = ? + ` + _, err := r.db.ExecContext(ctx, query, finalPath, queueItemID) + if err != nil { + return fmt.Errorf("mark import_migration imported (queue_item_id=%d): %w", queueItemID, err) + } + return nil +} + +// MarkFailed sets status=failed and error for all rows matching queue_item_id. +func (r *ImportMigrationRepository) MarkFailed(ctx context.Context, queueItemID int64, errMsg string) error { + query := ` + UPDATE import_migrations + SET status = 'failed', error = ?, updated_at = datetime('now') + WHERE queue_item_id = ? + ` + _, err := r.db.ExecContext(ctx, query, errMsg, queueItemID) + if err != nil { + return fmt.Errorf("mark import_migration failed (queue_item_id=%d): %w", queueItemID, err) + } + return nil +} + +// LinkQueueItemID sets queue_item_id for all migration rows matching (source, externalIDs). +// Unconditionally overwrites any existing queue_item_id so that re-imports after a +// cancelled/failed first attempt can re-link to the new queue item. This is safe because +// IsMigrationCompleted already short-circuits rows with status=imported before we get here. +func (r *ImportMigrationRepository) LinkQueueItemID(ctx context.Context, source string, externalIDs []string, queueItemID int64) error { + if len(externalIDs) == 0 { + return nil + } + + placeholders := make([]string, len(externalIDs)) + args := make([]any, 0, len(externalIDs)+2) + args = append(args, queueItemID) + for i, id := range externalIDs { + placeholders[i] = "?" + args = append(args, id) + } + args = append(args, source) + + query := fmt.Sprintf(` + UPDATE import_migrations + SET queue_item_id = ?, updated_at = datetime('now') + WHERE external_id IN (%s) + AND source = ? + `, strings.Join(placeholders, ", ")) + + _, err := r.db.ExecContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("link queue_item_id (source=%s, queueItemID=%d): %w", source, queueItemID, err) + } + return nil +} + +// MarkSymlinksMigrated sets status=symlinks_migrated for the given row IDs. +func (r *ImportMigrationRepository) MarkSymlinksMigrated(ctx context.Context, ids []int64) error { + if len(ids) == 0 { + return nil + } + + placeholders := make([]string, len(ids)) + args := make([]any, len(ids)) + for i, id := range ids { + placeholders[i] = "?" + args[i] = id + } + + query := fmt.Sprintf(` + UPDATE import_migrations + SET status = 'symlinks_migrated', updated_at = datetime('now') + WHERE id IN (%s) + `, strings.Join(placeholders, ", ")) + + _, err := r.db.ExecContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("mark import_migrations symlinks_migrated: %w", err) + } + return nil +} + +// LookupByExternalID returns the migration row for (source, external_id), or nil if not found. +func (r *ImportMigrationRepository) LookupByExternalID(ctx context.Context, source, externalID string) (*ImportMigration, error) { + query := ` + SELECT id, source, external_id, queue_item_id, relative_path, final_path, status, error, created_at, updated_at + FROM import_migrations + WHERE source = ? AND external_id = ? + ` + row := r.db.QueryRowContext(ctx, query, source, externalID) + m, err := scanImportMigration(row) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("lookup import_migration (source=%s, external_id=%s): %w", source, externalID, err) + } + return m, nil +} + +// ListByStatus returns paginated rows for source with the given status. +func (r *ImportMigrationRepository) ListByStatus(ctx context.Context, source string, status ImportMigrationStatus, limit, offset int) ([]*ImportMigration, error) { + query := ` + SELECT id, source, external_id, queue_item_id, relative_path, final_path, status, error, created_at, updated_at + FROM import_migrations + WHERE source = ? AND status = ? + ORDER BY id ASC + LIMIT ? OFFSET ? + ` + rows, err := r.db.QueryContext(ctx, query, source, string(status), limit, offset) + if err != nil { + return nil, fmt.Errorf("list import_migrations by status: %w", err) + } + defer rows.Close() + + var result []*ImportMigration + for rows.Next() { + m, err := scanImportMigrationRow(rows) + if err != nil { + return nil, fmt.Errorf("scan import_migration: %w", err) + } + result = append(result, m) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate import_migrations: %w", err) + } + return result, nil +} + +// Stats returns aggregate counts for a source. +func (r *ImportMigrationRepository) Stats(ctx context.Context, source string) (*ImportMigrationStats, error) { + query := ` + SELECT + COUNT(*) AS total, + COALESCE(SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END), 0) AS pending, + COALESCE(SUM(CASE WHEN status = 'imported' THEN 1 ELSE 0 END), 0) AS imported, + COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed, + COALESCE(SUM(CASE WHEN status = 'symlinks_migrated' THEN 1 ELSE 0 END), 0) AS symlinks_migrated + FROM import_migrations + WHERE source = ? + ` + var stats ImportMigrationStats + err := r.db.QueryRowContext(ctx, query, source).Scan( + &stats.Total, + &stats.Pending, + &stats.Imported, + &stats.Failed, + &stats.SymlinksMigrated, + ) + if err != nil { + return nil, fmt.Errorf("stats import_migrations (source=%s): %w", source, err) + } + return &stats, nil +} + +// DeletePendingBySource removes all migration rows for a source that have +// status='pending'. Returns the number of rows deleted. Use this to clear +// orphaned rows from a previous import attempt so a fresh import starts clean +// (imported/symlinks_migrated rows are preserved). +func (r *ImportMigrationRepository) DeletePendingBySource(ctx context.Context, source string) (int64, error) { + res, err := r.db.ExecContext(ctx, `DELETE FROM import_migrations WHERE source = ? AND status = 'pending'`, source) + if err != nil { + return 0, fmt.Errorf("delete pending import_migrations (source=%s): %w", source, err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("delete pending import_migrations rows affected: %w", err) + } + return n, nil +} + +// DeleteAllBySource removes every migration row for a source regardless of +// status. Returns the number of rows deleted. Use to force a full re-import +// after the imported files have been deleted from AltMount. +func (r *ImportMigrationRepository) DeleteAllBySource(ctx context.Context, source string) (int64, error) { + res, err := r.db.ExecContext(ctx, `DELETE FROM import_migrations WHERE source = ?`, source) + if err != nil { + return 0, fmt.Errorf("delete all import_migrations (source=%s): %w", source, err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("delete all import_migrations rows affected: %w", err) + } + return n, nil +} + +// ExistsForSource returns true if any rows exist for the given source. +func (r *ImportMigrationRepository) ExistsForSource(ctx context.Context, source string) (bool, error) { + query := `SELECT COUNT(*) FROM import_migrations WHERE source = ? LIMIT 1` + var count int + if err := r.db.QueryRowContext(ctx, query, source).Scan(&count); err != nil { + return false, fmt.Errorf("exists import_migrations (source=%s): %w", source, err) + } + return count > 0, nil +} + +// BackfillFromImportQueue reads completed import_queue rows that contain a nzbdav_id +// in their metadata JSON and inserts them as status=imported rows into import_migrations +// (idempotent via ON CONFLICT IGNORE / INSERT OR IGNORE). +// Returns the number of rows inserted. +func (r *ImportMigrationRepository) BackfillFromImportQueue(ctx context.Context) (int, error) { + selectQuery := ` + SELECT id, relative_path, storage_path, metadata + FROM import_queue + WHERE status = 'completed' + AND metadata IS NOT NULL + AND storage_path IS NOT NULL + ` + rows, err := r.db.QueryContext(ctx, selectQuery) + if err != nil { + return 0, fmt.Errorf("backfill: query import_queue: %w", err) + } + defer rows.Close() + + type row struct { + id int64 + relativePath *string + storagePath *string + metadata string + } + + var candidates []row + for rows.Next() { + var candidate row + if err := rows.Scan(&candidate.id, &candidate.relativePath, &candidate.storagePath, &candidate.metadata); err != nil { + return 0, fmt.Errorf("backfill: scan import_queue row: %w", err) + } + candidates = append(candidates, candidate) + } + if err := rows.Err(); err != nil { + return 0, fmt.Errorf("backfill: iterate import_queue: %w", err) + } + + var nzbdavIDStruct struct { + NzbdavID string `json:"nzbdav_id"` + } + + insertQuery := ` + INSERT OR IGNORE INTO import_migrations + (source, external_id, queue_item_id, relative_path, final_path, status, created_at, updated_at) + VALUES ('nzbdav', ?, ?, ?, ?, 'imported', datetime('now'), datetime('now')) + ` + if r.dialect.IsPostgres() { + insertQuery = ` + INSERT INTO import_migrations + (source, external_id, queue_item_id, relative_path, final_path, status, created_at, updated_at) + VALUES ('nzbdav', $1, $2, $3, $4, 'imported', NOW(), NOW()) + ON CONFLICT (source, external_id) DO NOTHING + ` + } + + inserted := 0 + for _, c := range candidates { + if err := json.Unmarshal([]byte(c.metadata), &nzbdavIDStruct); err != nil { + // Row has metadata but no parseable nzbdav_id — skip silently. + continue + } + if nzbdavIDStruct.NzbdavID == "" { + continue + } + + relativePath := "" + if c.relativePath != nil { + relativePath = *c.relativePath + } + + res, execErr := r.db.ExecContext(ctx, insertQuery, nzbdavIDStruct.NzbdavID, c.id, relativePath, c.storagePath) + if execErr != nil { + return inserted, fmt.Errorf("backfill: insert import_migration (external_id=%s): %w", nzbdavIDStruct.NzbdavID, execErr) + } + if n, _ := res.RowsAffected(); n > 0 { + inserted++ + } + } + + return inserted, nil +} + +// ─── helpers ───────────────────────────────────────────────────────────────── + +func scanImportMigration(row *sql.Row) (*ImportMigration, error) { + var m ImportMigration + var status string + err := row.Scan( + &m.ID, &m.Source, &m.ExternalID, &m.QueueItemID, + &m.RelativePath, &m.FinalPath, &status, &m.Error, + &m.CreatedAt, &m.UpdatedAt, + ) + if err != nil { + return nil, err + } + m.Status = ImportMigrationStatus(status) + return &m, nil +} + +func scanImportMigrationRow(rows *sql.Rows) (*ImportMigration, error) { + var m ImportMigration + var status string + err := rows.Scan( + &m.ID, &m.Source, &m.ExternalID, &m.QueueItemID, + &m.RelativePath, &m.FinalPath, &status, &m.Error, + &m.CreatedAt, &m.UpdatedAt, + ) + if err != nil { + return nil, err + } + m.Status = ImportMigrationStatus(status) + return &m, nil +} diff --git a/internal/database/migrations/postgres/024_import_migrations.sql b/internal/database/migrations/postgres/024_import_migrations.sql new file mode 100644 index 000000000..9df0925c0 --- /dev/null +++ b/internal/database/migrations/postgres/024_import_migrations.sql @@ -0,0 +1,25 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE import_migrations ( + id BIGSERIAL PRIMARY KEY, + source TEXT NOT NULL, + external_id TEXT NOT NULL, + queue_item_id BIGINT, + relative_path TEXT NOT NULL DEFAULT '', + final_path TEXT, + status TEXT NOT NULL DEFAULT 'pending', + error TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE(source, external_id) +); +CREATE INDEX idx_import_migrations_status ON import_migrations(source, status); +CREATE INDEX idx_import_migrations_queue ON import_migrations(queue_item_id); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX IF EXISTS idx_import_migrations_queue; +DROP INDEX IF EXISTS idx_import_migrations_status; +DROP TABLE IF EXISTS import_migrations; +-- +goose StatementEnd diff --git a/internal/database/migrations/sqlite/024_import_migrations.sql b/internal/database/migrations/sqlite/024_import_migrations.sql new file mode 100644 index 000000000..2ffadb78a --- /dev/null +++ b/internal/database/migrations/sqlite/024_import_migrations.sql @@ -0,0 +1,25 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE import_migrations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT NOT NULL, + external_id TEXT NOT NULL, + queue_item_id INTEGER, + relative_path TEXT NOT NULL DEFAULT '', + final_path TEXT, + status TEXT NOT NULL DEFAULT 'pending', + error TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE(source, external_id) +); +CREATE INDEX idx_import_migrations_status ON import_migrations(source, status); +CREATE INDEX idx_import_migrations_queue ON import_migrations(queue_item_id); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX IF EXISTS idx_import_migrations_queue; +DROP INDEX IF EXISTS idx_import_migrations_status; +DROP TABLE IF EXISTS import_migrations; +-- +goose StatementEnd diff --git a/internal/database/models.go b/internal/database/models.go index 3171fc44e..de1bc9222 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -169,3 +169,36 @@ type ImportHistory struct { CompletedAt time.Time `db:"completed_at"` } +// ImportMigrationStatus represents the status of a migration item +type ImportMigrationStatus string + +const ( + ImportMigrationStatusPending ImportMigrationStatus = "pending" + ImportMigrationStatusImported ImportMigrationStatus = "imported" + ImportMigrationStatusFailed ImportMigrationStatus = "failed" + ImportMigrationStatusSymlinksMigrated ImportMigrationStatus = "symlinks_migrated" +) + +// ImportMigration tracks progress of two-phase migrations (e.g. nzbdav → altmount) +type ImportMigration struct { + ID int64 `db:"id"` + Source string `db:"source"` // e.g. "nzbdav" + ExternalID string `db:"external_id"` // source-specific ID (nzbdav GUID) + QueueItemID *int64 `db:"queue_item_id"` // FK → import_queue.id (nullable) + RelativePath string `db:"relative_path"` // virtual path when enqueued + FinalPath *string `db:"final_path"` // storage_path after import + Status ImportMigrationStatus `db:"status"` + Error *string `db:"error"` + CreatedAt time.Time `db:"created_at"` + UpdatedAt time.Time `db:"updated_at"` +} + +// ImportMigrationStats holds aggregate counts for a source +type ImportMigrationStats struct { + Pending int + Imported int + Failed int + SymlinksMigrated int + Total int +} + diff --git a/internal/database/queue_repository.go b/internal/database/queue_repository.go index f827ce581..6ff9d4d45 100644 --- a/internal/database/queue_repository.go +++ b/internal/database/queue_repository.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "strings" "time" ) @@ -470,58 +469,6 @@ func (r *QueueRepository) IncrementRetryCountAndResetStatus(ctx context.Context, return rowsAffected > 0, nil } -// FilterExistingNzbdavIds checks a list of nzbdav IDs and returns those that already exist in the queue -func (r *QueueRepository) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) { - if len(ids) == 0 { - return nil, nil - } - - // We can't pass too many parameters at once, so we batch the query - batchSize := 500 - existingIds := make([]string, 0) - - for i := 0; i < len(ids); i += batchSize { - end := min(i+batchSize, len(ids)) - - batchIds := ids[i:end] - - // Build placeholders for the IN clause - placeholders := make([]string, len(batchIds)) - args := make([]any, len(batchIds)) - for j, id := range batchIds { - placeholders[j] = "?" - args[j] = id - } - - // Query using json_extract (SQLite) or ->>'key' (PostgreSQL) to find matching IDs - jsonExpr := r.dialect.JSONExtract("metadata", "nzbdav_id") - query := fmt.Sprintf(` - SELECT DISTINCT %s - FROM import_queue - WHERE %s IN (%s) - `, jsonExpr, jsonExpr, strings.Join(placeholders, ",")) - - rows, err := r.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to check existing nzbdav IDs: %w", err) - } - - for rows.Next() { - var id sql.NullString - if err := rows.Scan(&id); err != nil { - rows.Close() - return nil, fmt.Errorf("failed to scan matching id: %w", err) - } - if id.Valid { - existingIds = append(existingIds, id.String) - } - } - rows.Close() - } - - return existingIds, nil -} - // UpdateQueueItemPriority updates the priority of a queue item func (r *QueueRepository) UpdateQueueItemPriority(ctx context.Context, id int64, priority QueuePriority) error { query := `UPDATE import_queue SET priority = ?, updated_at = datetime('now') WHERE id = ?` diff --git a/internal/database/repository.go b/internal/database/repository.go index 4fa78808b..7193b40e5 100644 --- a/internal/database/repository.go +++ b/internal/database/repository.go @@ -69,7 +69,7 @@ func (r *Repository) WithImmediateTransaction(ctx context.Context, fn func(*Repo } // withTransactionMode executes a function within a database transaction with specified mode -func (r *Repository) withTransactionMode(ctx context.Context, mode string, fn func(*Repository) error) error { +func (r *Repository) withTransactionMode(ctx context.Context, _ string, fn func(*Repository) error) error { ddb, ok := r.db.(*dialectAwareDB) if !ok { return fmt.Errorf("repository not connected to dialectAwareDB") @@ -942,59 +942,6 @@ func (r *Repository) IsFileInQueue(ctx context.Context, filePath string) (bool, return true, nil } -// FilterExistingNzbdavIds checks a list of nzbdav IDs and returns those that already exist in the queue -// This is used for deduplication during import -func (r *Repository) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) { - if len(ids) == 0 { - return nil, nil - } - - // We can't pass too many parameters at once, so we batch the query - batchSize := 500 - existingIds := make([]string, 0) - - for i := 0; i < len(ids); i += batchSize { - end := min(i+batchSize, len(ids)) - - batchIds := ids[i:end] - - // Build placeholders for the IN clause - placeholders := make([]string, len(batchIds)) - args := make([]any, len(batchIds)) - for j, id := range batchIds { - placeholders[j] = "?" - args[j] = id - } - - // Query using json_extract (SQLite) or ->>'key' (PostgreSQL) to find matching IDs - jsonExpr := r.dialect.JSONExtract("metadata", "nzbdav_id") - query := fmt.Sprintf(` - SELECT DISTINCT %s - FROM import_queue - WHERE %s IN (%s) - `, jsonExpr, jsonExpr, strings.Join(placeholders, ",")) - - rows, err := r.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to check existing nzbdav IDs: %w", err) - } - - for rows.Next() { - var id sql.NullString - if err := rows.Scan(&id); err != nil { - rows.Close() - return nil, fmt.Errorf("failed to scan matching id: %w", err) - } - if id.Valid { - existingIds = append(existingIds, id.String) - } - } - rows.Close() - } - - return existingIds, nil -} - // UpdateQueueItemPriority updates the priority of a queue item func (r *Repository) UpdateQueueItemPriority(ctx context.Context, id int64, priority QueuePriority) error { query := `UPDATE import_queue SET priority = ?, updated_at = datetime('now') WHERE id = ?` diff --git a/internal/database/symlink_lookup.go b/internal/database/symlink_lookup.go new file mode 100644 index 000000000..2b8bd38c8 --- /dev/null +++ b/internal/database/symlink_lookup.go @@ -0,0 +1,45 @@ +package database + +import ( + "context" + "fmt" + "path/filepath" + "strings" +) + +// DBSymlinkLookup adapts ImportMigrationRepository to migration.SymlinkLookup. +type DBSymlinkLookup struct { + repo *ImportMigrationRepository +} + +// NewDBSymlinkLookup creates a new DBSymlinkLookup wrapping the given repository. +func NewDBSymlinkLookup(repo *ImportMigrationRepository) *DBSymlinkLookup { + return &DBSymlinkLookup{repo: repo} +} + +// LookupFinalPath returns the final AltMount path for the given source and externalID. +// Returns ("", false, nil) when no matching row exists or the row has no final_path. +// +// Season-pack episode rows use a "file:" relative_path to signal that +// final_path stores the season directory and the episode path must be computed by joining +// them. This keeps MarkImported simple (always stores the directory) while allowing +// per-episode resolution here. +func (l *DBSymlinkLookup) LookupFinalPath(ctx context.Context, source, externalID string) (string, bool, error) { + row, err := l.repo.LookupByExternalID(ctx, source, externalID) + if err != nil { + return "", false, fmt.Errorf("lookup final path (source=%s, id=%s): %w", source, externalID, err) + } + if row == nil || row.FinalPath == nil { + return "", false, nil + } + finalPath := *row.FinalPath + if episodeFilename, ok := strings.CutPrefix(row.RelativePath, "file:"); ok && episodeFilename != "" { + finalPath = filepath.Join(finalPath, episodeFilename) + } + return finalPath, true, nil +} + +// MarkSymlinksMigrated sets status=symlinks_migrated for the given row IDs. +func (l *DBSymlinkLookup) MarkSymlinksMigrated(ctx context.Context, ids []int64) error { + return l.repo.MarkSymlinksMigrated(ctx, ids) +} diff --git a/internal/importer/migration/symlinks.go b/internal/importer/migration/symlinks.go new file mode 100644 index 000000000..5256ccad5 --- /dev/null +++ b/internal/importer/migration/symlinks.go @@ -0,0 +1,149 @@ +package migration + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" +) + +// SymlinkLookup looks up the final AltMount path for a given source and external ID. +type SymlinkLookup interface { + LookupFinalPath(ctx context.Context, source, externalID string) (finalPath string, found bool, err error) + MarkSymlinksMigrated(ctx context.Context, ids []int64) error +} + +// RewriteReport summarizes results of a symlink rewrite operation. +type RewriteReport struct { + Scanned int + Matched int + Rewritten int + SkippedWrongPrefix int // symlinks whose target didn't point at sourceMountPath/.ids/ — usually a misconfigured mount path + Unmatched []string // symlink paths that had no matching migration row + Errors []string // errors encountered (non-fatal) +} + +// RewriteLibrarySymlinks walks libraryPath, finds symlinks (real OS symlinks or +// rclone .rclonelink text files) whose target starts with sourceMountPath+"/.ids/", +// looks up the GUID in the lookup, and rewrites the target to +// filepath.Join(altmountPath, finalPath). +// +// If dryRun is true, no filesystem changes are made but the report is populated. +func RewriteLibrarySymlinks( + ctx context.Context, + libraryPath string, + sourceMountPath string, + altmountPath string, + source string, + lookup SymlinkLookup, + dryRun bool, +) (*RewriteReport, error) { + report := &RewriteReport{ + Unmatched: []string{}, + Errors: []string{}, + } + + // Normalise source mount prefix used for matching. + prefix := filepath.Clean(sourceMountPath) + "/.ids/" + + err := filepath.WalkDir(libraryPath, func(path string, d fs.DirEntry, walkErr error) error { + // Propagate hard walk errors. + if walkErr != nil { + report.Errors = append(report.Errors, fmt.Sprintf("walk error at %s: %v", path, walkErr)) + return nil + } + + // Check context cancellation at every entry. + if err := ctx.Err(); err != nil { + return err + } + + isSymlink := d.Type()&fs.ModeSymlink != 0 + isRcloneLink := !d.IsDir() && strings.HasSuffix(d.Name(), ".rclonelink") + + // Only process real OS symlinks and rclone .rclonelink text files. + if !isSymlink && !isRcloneLink { + return nil + } + + report.Scanned++ + + var target string + if isSymlink { + var err error + target, err = os.Readlink(path) + if err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("readlink %s: %v", path, err)) + return nil + } + } else { + // .rclonelink: file content is the symlink target path. + content, err := os.ReadFile(path) + if err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("read rclonelink %s: %v", path, err)) + return nil + } + target = strings.TrimRight(string(content), "\r\n") + } + + // Must target our source mount's .ids directory. + if !strings.HasPrefix(target, prefix) { + report.SkippedWrongPrefix++ + return nil + } + + // Extract GUID: last path component of the target. + // Normalise to upper-case: nzbdav .ids/ paths use lowercase UUIDs but + // import_migrations stores the DavItem ID in the original uppercase form. + guid := strings.ToUpper(filepath.Base(target)) + + finalPath, found, err := lookup.LookupFinalPath(ctx, source, guid) + if err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("lookup %s (guid=%s): %v", path, guid, err)) + return nil + } + if !found { + report.Unmatched = append(report.Unmatched, path) + return nil + } + + report.Matched++ + + if dryRun { + return nil + } + + // Build the new target path. + newTarget := filepath.Join(altmountPath, strings.TrimPrefix(finalPath, "/")) + + if isSymlink { + // Atomic rewrite via temp file + rename. + tmpPath := path + ".new" + if err := os.Symlink(newTarget, tmpPath); err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("create temp symlink %s -> %s: %v", tmpPath, newTarget, err)) + return nil + } + if err := os.Rename(tmpPath, path); err != nil { + _ = os.Remove(tmpPath) + report.Errors = append(report.Errors, fmt.Sprintf("rename %s -> %s: %v", tmpPath, path, err)) + return nil + } + } else { + // .rclonelink: overwrite file content with the new target path. + if err := os.WriteFile(path, []byte(newTarget), 0o644); err != nil { + report.Errors = append(report.Errors, fmt.Sprintf("write rclonelink %s: %v", path, err)) + return nil + } + } + + report.Rewritten++ + return nil + }) + if err != nil { + return report, fmt.Errorf("walk library path %s: %w", libraryPath, err) + } + + return report, nil +} diff --git a/internal/importer/migration/symlinks_test.go b/internal/importer/migration/symlinks_test.go new file mode 100644 index 000000000..373dca5bd --- /dev/null +++ b/internal/importer/migration/symlinks_test.go @@ -0,0 +1,293 @@ +package migration_test + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/javi11/altmount/internal/importer/migration" +) + +// mockLookup implements SymlinkLookup for testing. +type mockLookup struct { + paths map[string]string // guid → finalPath; absent means not found + err error // if non-nil, always return this error +} + +func (m *mockLookup) LookupFinalPath(_ context.Context, _, externalID string) (string, bool, error) { + if m.err != nil { + return "", false, m.err + } + fp, ok := m.paths[externalID] + return fp, ok, nil +} + +func (m *mockLookup) MarkSymlinksMigrated(_ context.Context, _ []int64) error { + return nil +} + +func TestRewriteLibrarySymlinks(t *testing.T) { + t.Parallel() + + const ( + sourceMountPath = "/mnt/nzbdav" + altmountPath = "/mnt/altmount" + source = "nzbdav" + ) + + tests := []struct { + name string + setup func(t *testing.T, dir string) // create symlinks inside dir + lookup *mockLookup + dryRun bool + wantScanned int + wantMatched int + wantRewritten int + wantUnmatched int + wantErrors int + wantSkippedWrongPrefix int + // optional post-check on filesystem state + postCheck func(t *testing.T, dir string) + }{ + { + name: "match and rewrite", + setup: func(t *testing.T, dir string) { + t.Helper() + // Create a symlink pointing to sourceMountPath/.ids/abc123 + target := sourceMountPath + "/.ids/abc123" + link := filepath.Join(dir, "movie.mkv") + if err := os.Symlink(target, link); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{ + paths: map[string]string{ + // GUID is normalised to uppercase before lookup. + "ABC123": "/movies/Movie (2020)/Movie (2020).mkv", + }, + }, + dryRun: false, + wantScanned: 1, + wantMatched: 1, + wantRewritten: 1, + postCheck: func(t *testing.T, dir string) { + t.Helper() + link := filepath.Join(dir, "movie.mkv") + got, err := os.Readlink(link) + if err != nil { + t.Fatalf("readlink after rewrite: %v", err) + } + want := filepath.Join(altmountPath, "movies/Movie (2020)/Movie (2020).mkv") + if got != want { + t.Errorf("symlink target: got %q, want %q", got, want) + } + }, + }, + { + name: "unmatched guid", + setup: func(t *testing.T, dir string) { + t.Helper() + target := sourceMountPath + "/.ids/unknown-guid" + if err := os.Symlink(target, filepath.Join(dir, "unknown.mkv")); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{paths: map[string]string{}}, + wantScanned: 1, + wantMatched: 0, + wantRewritten: 0, + wantUnmatched: 1, + }, + { + name: "non-nzbdav symlink skipped", + setup: func(t *testing.T, dir string) { + t.Helper() + // Target doesn't contain sourceMountPath/.ids/ + if err := os.Symlink("/some/other/path/file.mkv", filepath.Join(dir, "other.mkv")); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{paths: map[string]string{}}, + wantScanned: 1, + wantMatched: 0, + wantRewritten: 0, + wantUnmatched: 0, + wantSkippedWrongPrefix: 1, + }, + { + name: "dry run - no filesystem change", + setup: func(t *testing.T, dir string) { + t.Helper() + target := sourceMountPath + "/.ids/dryrun-guid" + link := filepath.Join(dir, "dry.mkv") + if err := os.Symlink(target, link); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{ + paths: map[string]string{ + "DRYRUN-GUID": "/movies/Dry/Dry.mkv", + }, + }, + dryRun: true, + wantScanned: 1, + wantMatched: 1, + wantRewritten: 0, + postCheck: func(t *testing.T, dir string) { + t.Helper() + link := filepath.Join(dir, "dry.mkv") + got, err := os.Readlink(link) + if err != nil { + t.Fatalf("readlink after dry run: %v", err) + } + // Target must remain unchanged. + want := sourceMountPath + "/.ids/dryrun-guid" + if got != want { + t.Errorf("dry run: symlink target changed: got %q, want %q", got, want) + } + }, + }, + { + name: "rclonelink match and rewrite", + setup: func(t *testing.T, dir string) { + t.Helper() + // Simulate rclone .rclonelink file: plain text containing the symlink target. + // nzbdav .ids/ paths use lowercase UUIDs; the lookup normalises to uppercase. + content := sourceMountPath + "/.ids/8/c/0/9/b/8c09b35b-2868-4fb0-9ce3-35e6abbca785" + err := os.WriteFile(filepath.Join(dir, "episode.mkv.rclonelink"), []byte(content), 0o644) + if err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{ + paths: map[string]string{ + // Key must be uppercase — that's how import_migrations stores the DavItem ID. + "8C09B35B-2868-4FB0-9CE3-35E6ABBCA785": "/tv/Show S01/Show.S01E01.mkv", + }, + }, + dryRun: false, + wantScanned: 1, + wantMatched: 1, + wantRewritten: 1, + postCheck: func(t *testing.T, dir string) { + t.Helper() + content, err := os.ReadFile(filepath.Join(dir, "episode.mkv.rclonelink")) + if err != nil { + t.Fatalf("readfile after rewrite: %v", err) + } + want := filepath.Join(altmountPath, "tv/Show S01/Show.S01E01.mkv") + if string(content) != want { + t.Errorf("rclonelink content: got %q, want %q", string(content), want) + } + }, + }, + { + name: "rclonelink dry run - no filesystem change", + setup: func(t *testing.T, dir string) { + t.Helper() + // Use a realistic lowercase UUID as nzbdav .ids/ paths contain. + content := sourceMountPath + "/.ids/d/r/y/r/c/dryrc1one-0000-4fb0-9ce3-35e6abbca785" + err := os.WriteFile(filepath.Join(dir, "movie.mkv.rclonelink"), []byte(content), 0o644) + if err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{ + paths: map[string]string{ + "DRYRC1ONE-0000-4FB0-9CE3-35E6ABBCA785": "/movies/Dry/Dry.mkv", + }, + }, + dryRun: true, + wantScanned: 1, + wantMatched: 1, + wantRewritten: 0, + postCheck: func(t *testing.T, dir string) { + t.Helper() + content, err := os.ReadFile(filepath.Join(dir, "movie.mkv.rclonelink")) + if err != nil { + t.Fatalf("readfile after dry run: %v", err) + } + want := sourceMountPath + "/.ids/d/r/y/r/c/dryrc1one-0000-4fb0-9ce3-35e6abbca785" + if string(content) != want { + t.Errorf("dry run: rclonelink changed: got %q, want %q", string(content), want) + } + }, + }, + { + name: "context cancellation stops walk", + setup: func(t *testing.T, dir string) { + t.Helper() + // Create one symlink so the walk has an entry to process. + target := sourceMountPath + "/.ids/guid-cancel" + if err := os.Symlink(target, filepath.Join(dir, "file")); err != nil { + t.Fatalf("setup: %v", err) + } + }, + lookup: &mockLookup{paths: map[string]string{"guid-cancel": "/movies/x.mkv"}}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + tc.setup(t, dir) + + ctx := context.Background() + + if tc.name == "context cancellation stops walk" { + // Cancel context immediately to verify walk stops. + cancelled, cancel := context.WithCancel(ctx) + cancel() + ctx = cancelled + } + + report, err := migration.RewriteLibrarySymlinks( + ctx, + dir, + sourceMountPath, + altmountPath, + source, + tc.lookup, + tc.dryRun, + ) + + if tc.name == "context cancellation stops walk" { + // We just verify it returns a context error and doesn't panic. + if err == nil || !errors.Is(err, context.Canceled) { + t.Errorf("expected context.Canceled error, got: %v", err) + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if report.Scanned != tc.wantScanned { + t.Errorf("Scanned: got %d, want %d", report.Scanned, tc.wantScanned) + } + if report.Matched != tc.wantMatched { + t.Errorf("Matched: got %d, want %d", report.Matched, tc.wantMatched) + } + if report.Rewritten != tc.wantRewritten { + t.Errorf("Rewritten: got %d, want %d", report.Rewritten, tc.wantRewritten) + } + if len(report.Unmatched) != tc.wantUnmatched { + t.Errorf("Unmatched: got %d, want %d (entries: %v)", len(report.Unmatched), tc.wantUnmatched, report.Unmatched) + } + if len(report.Errors) != tc.wantErrors { + t.Errorf("Errors: got %d, want %d (entries: %v)", len(report.Errors), tc.wantErrors, report.Errors) + } + if report.SkippedWrongPrefix != tc.wantSkippedWrongPrefix { + t.Errorf("SkippedWrongPrefix: got %d, want %d", report.SkippedWrongPrefix, tc.wantSkippedWrongPrefix) + } + + if tc.postCheck != nil { + tc.postCheck(t, dir) + } + }) + } +} diff --git a/internal/importer/postprocessor/coordinator.go b/internal/importer/postprocessor/coordinator.go index fb01afe30..c4982eebd 100644 --- a/internal/importer/postprocessor/coordinator.go +++ b/internal/importer/postprocessor/coordinator.go @@ -109,10 +109,7 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu result.SymlinksCreated = true } - // 3. Create ID metadata links - c.HandleIDMetadataLinks(ctx, item, resultingPath) - - // 4. Create STRM files if configured + // 3. Create STRM files if configured if err := c.CreateStrmFiles(ctx, item, resultingPath); err != nil { c.log.WarnContext(ctx, "Failed to create STRM files", "queue_id", item.ID, @@ -123,7 +120,7 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu result.StrmCreated = true } - // 5. Schedule health check + // 4. Schedule health check if err := c.ScheduleHealthCheck(ctx, resultingPath); err != nil { c.log.WarnContext(ctx, "Failed to schedule health check", "path", resultingPath, @@ -133,7 +130,7 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu result.HealthScheduled = true } - // 6. Notify ARR applications + // 5. Notify ARR applications if shouldSkipARRNotification(item) { c.log.DebugContext(ctx, "ARR notification skipped (requested by caller)", "queue_id", item.ID, diff --git a/internal/importer/postprocessor/id_linker.go b/internal/importer/postprocessor/id_linker.go deleted file mode 100644 index fe9456d26..000000000 --- a/internal/importer/postprocessor/id_linker.go +++ /dev/null @@ -1,34 +0,0 @@ -package postprocessor - -import ( - "context" - "encoding/json" - - "github.com/javi11/altmount/internal/database" - metapb "github.com/javi11/altmount/internal/metadata/proto" -) - -// HandleIDMetadataLinks creates ID-based metadata links for nzbdav compatibility -func (c *Coordinator) HandleIDMetadataLinks(ctx context.Context, item *database.ImportQueueItem, resultingPath string) { - // 1. Check if the queue item itself has a release-level ID in its metadata - if item.Metadata != nil && *item.Metadata != "" { - var meta struct { - NzbdavID string `json:"nzbdav_id"` - } - if err := json.Unmarshal([]byte(*item.Metadata), &meta); err == nil && meta.NzbdavID != "" { - if err := c.metadataService.UpdateIDSymlink(meta.NzbdavID, resultingPath); err != nil { - c.log.Warn("Failed to create release ID metadata link", "id", meta.NzbdavID, "error", err) - } - } - } - - // 2. Check individual files for IDs using MetadataService walker - _ = c.metadataService.WalkDirectoryFiles(resultingPath, func(fileVirtualPath string, meta *metapb.FileMetadata) error { - if meta.NzbdavId != "" { - if err := c.metadataService.UpdateIDSymlink(meta.NzbdavId, fileVirtualPath); err != nil { - c.log.Warn("Failed to create file ID metadata link", "id", meta.NzbdavId, "error", err) - } - } - return nil - }) -} diff --git a/internal/importer/scanner/nzbdav.go b/internal/importer/scanner/nzbdav.go index 8b2005073..f42fb38ab 100644 --- a/internal/importer/scanner/nzbdav.go +++ b/internal/importer/scanner/nzbdav.go @@ -19,26 +19,42 @@ import ( // BatchQueueAdder defines the interface for batch queue operations type BatchQueueAdder interface { AddBatchToQueue(ctx context.Context, items []*database.ImportQueueItem) error - FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) +} + +// MigrationRecorder defines the interface for recording import migrations +type MigrationRecorder interface { + UpsertMigration(ctx context.Context, source, externalID, relativePath string) (int64, error) + IsMigrationCompleted(ctx context.Context, source, externalID string) (bool, error) + // LinkQueueItemID sets queue_item_id for all migration rows identified by + // (source, externalIDs) where queue_item_id is currently NULL. Called after + // AddBatchToQueue assigns IDs to the queue items. + LinkQueueItemID(ctx context.Context, source string, externalIDs []string, queueItemID int64) error } // NzbDavImporter handles importing from NZBDav databases type NzbDavImporter struct { - batchAdder BatchQueueAdder - log *slog.Logger + batchAdder BatchQueueAdder + migrationRecorder MigrationRecorder + log *slog.Logger // State management mu sync.RWMutex info ImportInfo cancelFunc context.CancelFunc + // epoch is bumped on every Start and Reset. performImport captures the epoch + // at launch; its deferred state update is skipped if the epoch changed in the + // meantime (Reset was called, or a new Start superseded it). This keeps + // Reset synchronous and avoids stuck "Canceling" states when workers are slow. + epoch int64 } // NewNzbDavImporter creates a new NZBDav importer -func NewNzbDavImporter(batchAdder BatchQueueAdder) *NzbDavImporter { +func NewNzbDavImporter(batchAdder BatchQueueAdder, migrationRecorder MigrationRecorder) *NzbDavImporter { return &NzbDavImporter{ - batchAdder: batchAdder, - log: slog.Default().With("component", "nzbdav-importer"), - info: ImportInfo{Status: ImportStatusIdle}, + batchAdder: batchAdder, + migrationRecorder: migrationRecorder, + log: slog.Default().With("component", "nzbdav-importer"), + info: ImportInfo{Status: ImportStatusIdle}, } } @@ -48,12 +64,14 @@ func (n *NzbDavImporter) Start(dbPath string, blobsPath string, cleanupFile bool defer n.mu.Unlock() if n.info.Status == ImportStatusRunning || n.info.Status == ImportStatusCanceling { - return fmt.Errorf("import already in progress") + return fmt.Errorf("import already in progress - call reset to force clear") } // Create import context importCtx, cancel := context.WithCancel(context.Background()) n.cancelFunc = cancel + n.epoch++ + epoch := n.epoch // Initialize status n.info = ImportInfo{ @@ -64,7 +82,7 @@ func (n *NzbDavImporter) Start(dbPath string, blobsPath string, cleanupFile bool Skipped: 0, } - go n.performImport(importCtx, dbPath, blobsPath, cleanupFile) + go n.performImport(importCtx, epoch, dbPath, blobsPath, cleanupFile) return nil } @@ -76,47 +94,50 @@ func (n *NzbDavImporter) GetStatus() ImportInfo { return n.info } -// Cancel cancels the current import operation +// Cancel requests cancellation of the current import operation. Idempotent: +// calling while already canceling or idle is a no-op. func (n *NzbDavImporter) Cancel() error { n.mu.Lock() defer n.mu.Unlock() - if n.info.Status == ImportStatusIdle { - return fmt.Errorf("no import is currently running") - } - - if n.info.Status == ImportStatusCanceling { - return fmt.Errorf("import is already being canceled") - } - - n.info.Status = ImportStatusCanceling if n.cancelFunc != nil { n.cancelFunc() } - + if n.info.Status == ImportStatusRunning { + n.info.Status = ImportStatusCanceling + } return nil } -// Reset resets the import status to Idle +// Reset force-clears the import state to Idle regardless of current status. +// If a goroutine is still running it receives a cancellation and its deferred +// state update is invalidated via the epoch bump — the caller can immediately +// Start a new import without being blocked by a stuck "Canceling" state. func (n *NzbDavImporter) Reset() { n.mu.Lock() defer n.mu.Unlock() - if n.info.Status == ImportStatusCompleted || n.info.Status == ImportStatusIdle { - n.info = ImportInfo{Status: ImportStatusIdle} + n.epoch++ + if n.cancelFunc != nil { + n.cancelFunc() + n.cancelFunc = nil } + n.info = ImportInfo{Status: ImportStatusIdle} } // performImport performs the actual import work -func (n *NzbDavImporter) performImport(ctx context.Context, dbPath string, blobsPath string, cleanupFile bool) { +func (n *NzbDavImporter) performImport(ctx context.Context, epoch int64, dbPath string, blobsPath string, cleanupFile bool) { // Parse Database parser := nzbdav.NewParser(dbPath, blobsPath) nzbChan, errChan := parser.Parse() defer func() { n.mu.Lock() - n.info.Status = ImportStatusCompleted - n.cancelFunc = nil + // Only update status if a newer Start/Reset hasn't superseded us. + if n.epoch == epoch { + n.info.Status = ImportStatusCompleted + n.cancelFunc = nil + } n.mu.Unlock() if cleanupFile { @@ -223,120 +244,181 @@ func (n *NzbDavImporter) performImport(ctx context.Context, dbPath string, blobs } } -// processBatch batches queue items and adds them to the queue +// nzbdavAlias mirrors nzbdav.ParsedNzbAlias stored in queue item metadata. +type nzbdavAlias struct { + ID string `json:"ID"` + Name string `json:"Name"` +} + +// processBatch batches queue items and adds them to the queue. +// It uses migrationRecorder to deduplicate already-completed items and to +// record new items before enqueueing them. +// +// After AddBatchToQueue assigns IDs to items, LinkQueueItemID is called so that +// MarkImported can later set final_path on every related migration row. func (n *NzbDavImporter) processBatch(ctx context.Context, batchChan <-chan *database.ImportQueueItem) { var batch []*database.ImportQueueItem + // batchExternalIDs[i] holds all nzbdav external IDs (canonical + aliases) + // for batch[i]. Used to link queue_item_id after insertion. + var batchExternalIDs [][]string + ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() - insertBatch := func() { - if len(batch) > 0 { - // 1. Extract IDs from batch to check for duplicates - idMap := make(map[string]*database.ImportQueueItem) - idsToCheck := make([]string, 0, len(batch)) + // extractMeta reads nzbdav-specific fields from the item metadata JSON. + type nzbdavMeta struct { + NzbdavID string `json:"nzbdav_id"` + DavItemName string `json:"nzbdav_dav_item_name"` + NzbdavAliases []nzbdavAlias `json:"nzbdav_aliases"` + } + extractMeta := func(item *database.ImportQueueItem) nzbdavMeta { + if item.Metadata == nil { + return nzbdavMeta{} + } + var m nzbdavMeta + _ = json.Unmarshal([]byte(*item.Metadata), &m) + return m + } - type metaStruct struct { - NzbdavID string `json:"nzbdav_id"` - } + // stripNzbdavKeysFromMetadata removes nzbdav_* keys from metadata JSON, + // retaining only other keys (e.g. extracted_files). + stripNzbdavKeysFromMetadata := func(item *database.ImportQueueItem) { + if item.Metadata == nil { + return + } + var metaMap map[string]any + if err := json.Unmarshal([]byte(*item.Metadata), &metaMap); err != nil { + return + } + delete(metaMap, "nzbdav_id") + delete(metaMap, "nzbdav_dav_item_name") + delete(metaMap, "nzbdav_aliases") + if len(metaMap) == 0 { + item.Metadata = nil + return + } + b, err := json.Marshal(metaMap) + if err != nil { + return + } + s := string(b) + item.Metadata = &s + } - for _, item := range batch { - if item.Metadata != nil { - var meta metaStruct - if err := json.Unmarshal([]byte(*item.Metadata), &meta); err == nil && meta.NzbdavID != "" { - idMap[meta.NzbdavID] = item - idsToCheck = append(idsToCheck, meta.NzbdavID) - } + // fileRelPath builds the relative_path value for a migration row that maps + // to a specific episode file within a season-pack directory. The "file:" + // prefix signals LookupFinalPath to join the stored final_path (season dir) + // with the episode filename. + fileRelPath := func(name string) string { + return "file:" + name + } + + insertBatch := func() { + if len(batch) == 0 { + return + } + if err := n.batchAdder.AddBatchToQueue(ctx, batch); err != nil { + n.log.ErrorContext(ctx, "Failed to add batch to queue", "count", len(batch), "error", err) + n.mu.Lock() + n.info.Failed += len(batch) + n.mu.Unlock() + } else { + // Link queue_item_id for every migration row associated with each item. + // AddBatchToQueue populates item.ID for all items in the slice. + for i, item := range batch { + if item.ID == 0 || len(batchExternalIDs[i]) == 0 { + continue + } + if err := n.migrationRecorder.LinkQueueItemID(ctx, "nzbdav", batchExternalIDs[i], item.ID); err != nil { + n.log.WarnContext(ctx, "Failed to link queue_item_id to migration rows", + "queue_item_id", item.ID, "error", err) } } + n.mu.Lock() + n.info.Added += len(batch) + n.mu.Unlock() + } + batch = nil + batchExternalIDs = nil + } - // 2. Check for existing IDs in DB - var existingIds []string - var err error - if len(idsToCheck) > 0 { - existingIds, err = n.batchAdder.FilterExistingNzbdavIds(ctx, idsToCheck) - if err != nil { - n.log.ErrorContext(ctx, "Failed to check for existing IDs", "error", err) - // On error, we proceed with all items - the DB unique constraint on nzb_path - // will catch duplicates, though less efficiently, or we might add duplicates - // if paths differ. Better to fail safe and try to add. - } + for { + select { + case item, ok := <-batchChan: + if !ok { + insertBatch() + return } - // 3. Filter out duplicates - itemsToAdd := make([]*database.ImportQueueItem, 0, len(batch)) - duplicates := 0 + meta := extractMeta(item) + nzbdavID := meta.NzbdavID - if len(existingIds) > 0 { - existingMap := make(map[string]bool) - for _, id := range existingIds { - existingMap[id] = true + // Dedup: skip items already successfully imported. + if nzbdavID != "" { + completed, err := n.migrationRecorder.IsMigrationCompleted(ctx, "nzbdav", nzbdavID) + if err != nil { + n.log.ErrorContext(ctx, "Failed to check migration status", "nzbdav_id", nzbdavID, "error", err) + } else if completed { + n.mu.Lock() + n.info.Skipped++ + n.mu.Unlock() + continue } - for _, item := range batch { - isDuplicate := false - if item.Metadata != nil { - var meta metaStruct - if err := json.Unmarshal([]byte(*item.Metadata), &meta); err == nil && meta.NzbdavID != "" { - if existingMap[meta.NzbdavID] { - isDuplicate = true - } - } + // Determine whether this blob represents a season pack (aliases with + // distinct names from the canonical) or a single/duplicate release. + isSeasonPack := false + for _, a := range meta.NzbdavAliases { + if a.Name != meta.DavItemName { + isSeasonPack = true + break } + } - if isDuplicate { - duplicates++ - // Cleanup temp NZB file for duplicate - if err := os.Remove(item.NzbPath); err != nil { - n.log.DebugContext(ctx, "Failed to remove duplicate temp NZB", "path", item.NzbPath, "error", err) - } - // Also try to remove the parent temp dir if empty (it was created just for this file) - go func(path string) { - dir := filepath.Dir(path) - _ = os.Remove(dir) - }(item.NzbPath) - } else { - itemsToAdd = append(itemsToAdd, item) - } + // Build relative_path for the canonical migration row. + canonicalRelPath := "" + if item.Category != nil { + canonicalRelPath = *item.Category + } + if isSeasonPack && meta.DavItemName != "" { + // Season pack: each DavItem maps to a specific episode file. + // Use "file:" prefix so LookupFinalPath can compute the + // episode-specific path from the season directory. + canonicalRelPath = fileRelPath(meta.DavItemName) + } + if _, err := n.migrationRecorder.UpsertMigration(ctx, "nzbdav", nzbdavID, canonicalRelPath); err != nil { + n.log.ErrorContext(ctx, "Failed to upsert migration", "nzbdav_id", nzbdavID, "error", err) } - } else { - itemsToAdd = batch - } - if duplicates > 0 { - n.log.InfoContext(ctx, "Skipped duplicate items", "count", duplicates) - n.mu.Lock() - n.info.Skipped += duplicates - n.mu.Unlock() + // Register migration rows for alias DavItem IDs so the Phase 2 + // symlink rewriter can resolve every episode rclonelink. + for _, alias := range meta.NzbdavAliases { + aliasRelPath := canonicalRelPath // default: same as canonical (duplicate case) + if isSeasonPack && alias.Name != "" { + aliasRelPath = fileRelPath(alias.Name) + } + if _, err := n.migrationRecorder.UpsertMigration(ctx, "nzbdav", alias.ID, aliasRelPath); err != nil { + n.log.ErrorContext(ctx, "Failed to upsert alias migration", "alias_id", alias.ID, "error", err) + } + } } - // 4. Add unique items to queue - if len(itemsToAdd) > 0 { - if err := n.batchAdder.AddBatchToQueue(ctx, itemsToAdd); err != nil { - n.log.ErrorContext(ctx, "Failed to add batch to queue", "count", len(itemsToAdd), "error", err) - n.mu.Lock() - n.info.Failed += len(itemsToAdd) - n.mu.Unlock() - } else { - n.mu.Lock() - n.info.Added += len(itemsToAdd) - n.mu.Unlock() + // Collect all external IDs for this item so we can link them after insertion. + var externalIDs []string + if nzbdavID != "" { + externalIDs = append(externalIDs, nzbdavID) + for _, a := range meta.NzbdavAliases { + externalIDs = append(externalIDs, a.ID) } } - batch = nil // Reset batch - } - } + // Strip nzbdav_* keys from the queue item metadata — they live in + // import_migrations now. Keep extracted_files if present. + stripNzbdavKeysFromMetadata(item) - for { - select { - case item, ok := <-batchChan: - if !ok { - // Channel closed, drain remaining batch - insertBatch() - return - } batch = append(batch, item) - if len(batch) >= 100 { // Batch size + batchExternalIDs = append(batchExternalIDs, externalIDs) + if len(batch) >= 100 { insertBatch() } case <-ticker.C: @@ -394,10 +476,18 @@ func (n *NzbDavImporter) createNzbFileAndPrepareItem(ctx context.Context, res *n priority := database.QueuePriorityNormal - // Store original ID and extracted files in metadata + // Store original ID, optional alias IDs, and extracted files in metadata. + // nzbdav_dav_item_name: the DavItem.Name for the canonical (episode filename for season packs). + // nzbdav_aliases: other DavItems sharing the same blob (non-empty for season packs). metaMap := map[string]any{ "nzbdav_id": res.ID, } + if res.DavItemName != "" { + metaMap["nzbdav_dav_item_name"] = res.DavItemName + } + if len(res.AliasDavItems) > 0 { + metaMap["nzbdav_aliases"] = res.AliasDavItems + } if len(res.ExtractedFiles) > 0 { metaMap["extracted_files"] = res.ExtractedFiles } @@ -407,15 +497,18 @@ func (n *NzbDavImporter) createNzbFileAndPrepareItem(ctx context.Context, res *n // Prepare item struct. RelativePath is left nil so the import mirrors the // nzbdav folder structure under Category without an extra user-supplied prefix. + // SkipArrNotification is true because nzbdav imports are migration jobs — ARR + // scans should not be triggered for each individual item. item := &database.ImportQueueItem{ - NzbPath: nzbPath, - Category: &targetCategory, - Priority: priority, - Status: database.QueueStatusPending, - RetryCount: 0, - MaxRetries: 3, - CreatedAt: time.Now(), - Metadata: &metaJSON, + NzbPath: nzbPath, + Category: &targetCategory, + Priority: priority, + Status: database.QueueStatusPending, + RetryCount: 0, + MaxRetries: 3, + CreatedAt: time.Now(), + Metadata: &metaJSON, + SkipArrNotification: true, } return item, nil diff --git a/internal/importer/service.go b/internal/importer/service.go index a6847dc10..4c7f93f61 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -93,17 +93,39 @@ func (a *queueAdapterForScanner) IsFileProcessed(filePath string, scanRoot strin return isFileAlreadyProcessed(a.metadataService, filePath, scanRoot) } -// batchQueueAdapterForImporter adapts database repository for scanner.BatchQueueAdder interface +// batchQueueAdapterForImporter adapts database repository for scanner.BatchQueueAdder and +// scanner.MigrationRecorder interfaces. type batchQueueAdapterForImporter struct { - repo *database.QueueRepository + repo *database.QueueRepository + migrationRepo *database.ImportMigrationRepository } func (a *batchQueueAdapterForImporter) AddBatchToQueue(ctx context.Context, items []*database.ImportQueueItem) error { return a.repo.AddBatchToQueue(ctx, items) } -func (a *batchQueueAdapterForImporter) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error) { - return a.repo.FilterExistingNzbdavIds(ctx, ids) +func (a *batchQueueAdapterForImporter) UpsertMigration(ctx context.Context, source, externalID, relativePath string) (int64, error) { + return a.migrationRepo.Upsert(ctx, &database.ImportMigration{ + Source: source, + ExternalID: externalID, + RelativePath: relativePath, + Status: database.ImportMigrationStatusPending, + }) +} + +func (a *batchQueueAdapterForImporter) IsMigrationCompleted(ctx context.Context, source, externalID string) (bool, error) { + row, err := a.migrationRepo.LookupByExternalID(ctx, source, externalID) + if err != nil { + return false, err + } + if row == nil { + return false, nil + } + return row.Status == database.ImportMigrationStatusImported || row.Status == database.ImportMigrationStatusSymlinksMigrated, nil +} + +func (a *batchQueueAdapterForImporter) LinkQueueItemID(ctx context.Context, source string, externalIDs []string, queueItemID int64) error { + return a.migrationRepo.LinkQueueItemID(ctx, source, externalIDs, queueItemID) } // isFileAlreadyProcessed checks if a file has already been processed by checking metadata @@ -237,9 +259,10 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService, // Create adapter for NZBDav imports importerAdapter := &batchQueueAdapterForImporter{ - repo: database.Repository, + repo: database.Repository, + migrationRepo: database.MigrationRepo, } - service.nzbdavImporter = scanner.NewNzbDavImporter(importerAdapter) + service.nzbdavImporter = scanner.NewNzbDavImporter(importerAdapter, importerAdapter) // Create directory watcher (Service implements WatchQueueAdder) service.watcher = scanner.NewWatcher(service, configGetter) @@ -984,6 +1007,15 @@ func (s *Service) handleProcessingSuccess(ctx context.Context, item *database.Im return err } + // Update import_migrations row if this was a nzbdav migration import + if s.database.MigrationRepo != nil { + if err := s.database.MigrationRepo.MarkImported(ctx, item.ID, resultingPath); err != nil { + // Non-fatal: log but don't fail + s.log.WarnContext(ctx, "Failed to mark import_migration as imported", + "queue_id", item.ID, "error", err) + } + } + // Notify completion and clear progress tracking if s.broadcaster != nil { s.broadcaster.NotifyComplete(int(item.ID), "completed") @@ -1069,6 +1101,14 @@ func (s *Service) handleProcessingFailure(ctx context.Context, item *database.Im "file", item.NzbPath) } + // Update import_migrations row if this was a nzbdav migration import + if s.database.MigrationRepo != nil { + if err := s.database.MigrationRepo.MarkFailed(ctx, item.ID, errorMessage); err != nil { + s.log.WarnContext(ctx, "Failed to mark import_migration as failed", + "queue_id", item.ID, "error", err) + } + } + // Notify failure and clear progress tracking if s.broadcaster != nil { s.broadcaster.NotifyComplete(int(item.ID), "failed") diff --git a/internal/metadata/service.go b/internal/metadata/service.go index 474335544..4060d6de8 100644 --- a/internal/metadata/service.go +++ b/internal/metadata/service.go @@ -8,7 +8,6 @@ import ( "log/slog" "os" "path/filepath" - "runtime" "strings" "time" @@ -113,18 +112,6 @@ func (ms *MetadataService) WriteFileMetadata(virtualPath string, metadata *metap return fmt.Errorf("failed to rename metadata file: %w", err) } - // Handle ID sidecar file - idPath := metadataPath + ".id" - if nzbdavId != "" { - if err := os.WriteFile(idPath, []byte(nzbdavId), 0644); err != nil { - // Log error but don't fail the operation - slog.WarnContext(context.Background(), "Failed to write ID sidecar file", "path", idPath, "error", err) - } - } else { - // Clean up existing ID file if present - _ = os.Remove(idPath) - } - metadata.NzbdavId = nzbdavId // Restore for in-memory use // Update only the lightweight cache; the full proto (with SegmentData) is @@ -395,11 +382,6 @@ func (ms *MetadataService) DeleteFileMetadataWithSourceNzb(ctx context.Context, } } - // Remove .ids/ symlink before deletion - if idData, readErr := os.ReadFile(metadataPath + ".id"); readErr == nil && len(idData) > 0 { - _ = ms.RemoveIDSymlink(string(idData)) - } - // Delete the metadata file err := os.Remove(metadataPath) if err != nil && !os.IsNotExist(err) { @@ -501,56 +483,6 @@ func (ms *MetadataService) RenameFileMetadata(oldVirtualPath, newVirtualPath str return nil } -// UpdateIDSymlink creates or updates an ID-based symlink in the .ids/ sharded directory. -// On Windows, symlinks are not supported and this operation is skipped gracefully. -func (ms *MetadataService) UpdateIDSymlink(nzbdavID, virtualPath string) error { - if runtime.GOOS == "windows" { - return nil // Symlinks not supported on Windows; ID-based lookup via symlinks is skipped - } - - id := strings.ToLower(nzbdavID) - if len(id) < 5 { - return nil // Invalid ID for sharding - } - - shardPath := filepath.Join(".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4])) - fullShardDir := filepath.Join(ms.rootPath, shardPath) - - if err := os.MkdirAll(fullShardDir, 0755); err != nil { - return err - } - - targetMetaPath := ms.GetMetadataFilePath(virtualPath) - linkPath := filepath.Join(fullShardDir, id+".meta") - - // Remove existing symlink if present - os.Remove(linkPath) - - // Create relative symlink - relTarget, err := filepath.Rel(fullShardDir, targetMetaPath) - if err != nil { - return os.Symlink(targetMetaPath, linkPath) - } - - return os.Symlink(relTarget, linkPath) -} - -// RemoveIDSymlink removes an ID-based symlink from the .ids/ sharded directory. -func (ms *MetadataService) RemoveIDSymlink(nzbdavID string) error { - id := strings.ToLower(nzbdavID) - if len(id) < 5 { - return nil - } - - shardPath := filepath.Join(".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4])) - linkPath := filepath.Join(ms.rootPath, shardPath, id+".meta") - - if err := os.Remove(linkPath); err != nil && !os.IsNotExist(err) { - return err - } - return nil -} - // WalkDirectoryFiles walks a metadata directory and calls fn for each file's virtual path and metadata. func (ms *MetadataService) WalkDirectoryFiles(virtualPath string, fn func(fileVirtualPath string, meta *metapb.FileMetadata) error) error { metadataDir := filepath.Join(ms.rootPath, virtualPath) @@ -753,11 +685,6 @@ func (ms *MetadataService) MoveToCorrupted(ctx context.Context, virtualPath stri targetPath := filepath.Join(targetDir, truncatedFilename+".meta") - // Remove .ids/ symlink before moving to corrupted - if idData, readErr := os.ReadFile(metadataPath + ".id"); readErr == nil && len(idData) > 0 { - _ = ms.RemoveIDSymlink(string(idData)) - } - // Move the .meta file if err := os.Rename(metadataPath, targetPath); err != nil { slog.WarnContext(ctx, "Failed to move corrupted metadata, trying copy fallback", "error", err) diff --git a/internal/metadata/service_test.go b/internal/metadata/service_test.go index 88fc6dac6..3f684e2ee 100644 --- a/internal/metadata/service_test.go +++ b/internal/metadata/service_test.go @@ -12,65 +12,26 @@ import ( "github.com/stretchr/testify/require" ) -// setupMetaWithIDSymlink creates a MetadataService at t.TempDir(), writes metadata + -// .id sidecar + .ids/ symlink via UpdateIDSymlink. Returns the service and root path. -func setupMetaWithIDSymlink(t *testing.T, virtualPath, nzbdavID string) (*MetadataService, string) { - t.Helper() +func TestDeleteFileMetadataWithSourceNzb_RemovesMetadata(t *testing.T) { root := t.TempDir() ms := NewMetadataService(root) + virtualPath := filepath.Join("movies", "test_movie.mkv") + meta := ms.CreateFileMetadata( 1024, "test.nzb", metapb.FileStatus_FILE_STATUS_HEALTHY, - nil, metapb.Encryption_NONE, "", "", nil, nil, 0, nil, nzbdavID, + nil, metapb.Encryption_NONE, "", "", nil, nil, 0, nil, "abcde12345", ) require.NoError(t, ms.WriteFileMetadata(virtualPath, meta)) - // WriteFileMetadata writes the .id sidecar when nzbdavID != "". - // Now create the .ids/ symlink. - require.NoError(t, ms.UpdateIDSymlink(nzbdavID, virtualPath)) - - // Verify symlink was created metaPath := ms.GetMetadataFilePath(virtualPath) require.FileExists(t, metaPath) - require.FileExists(t, metaPath+".id") - - return ms, root -} - -// idSymlinkPath returns the expected .ids/ symlink path for a given nzbdavID. -func idSymlinkPath(root, nzbdavID string) string { - id := nzbdavID - return filepath.Join(root, ".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4]), id+".meta") -} - -func TestDeleteFileMetadataWithSourceNzb_RemovesIDSymlink(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("symlinks not supported on Windows") - } - - virtualPath := filepath.Join("movies", "test_movie.mkv") - nzbdavID := "abcde12345" - - ms, root := setupMetaWithIDSymlink(t, virtualPath, nzbdavID) - - // Verify the symlink exists before deletion - linkPath := idSymlinkPath(root, nzbdavID) - _, err := os.Lstat(linkPath) - require.NoError(t, err, "symlink should exist before delete") - // Delete metadata ctx := context.Background() require.NoError(t, ms.DeleteFileMetadataWithSourceNzb(ctx, virtualPath, false)) - // Verify .ids/ symlink was removed - _, err = os.Lstat(linkPath) - assert.True(t, os.IsNotExist(err), "symlink should be removed after delete") - - // Verify .meta and .id files are also gone - metaPath := ms.GetMetadataFilePath(virtualPath) assert.NoFileExists(t, metaPath) - assert.NoFileExists(t, metaPath+".id") } func TestDeleteFileMetadataWithSourceNzb_NoIDSidecar_NoError(t *testing.T) { @@ -79,7 +40,6 @@ func TestDeleteFileMetadataWithSourceNzb_NoIDSidecar_NoError(t *testing.T) { virtualPath := filepath.Join("movies", "no_id_movie.mkv") - // Write metadata without nzbdavID (no .id sidecar) meta := ms.CreateFileMetadata( 512, "test.nzb", metapb.FileStatus_FILE_STATUS_HEALTHY, nil, metapb.Encryption_NONE, "", "", nil, nil, 0, nil, "", @@ -90,31 +50,24 @@ func TestDeleteFileMetadataWithSourceNzb_NoIDSidecar_NoError(t *testing.T) { err := ms.DeleteFileMetadataWithSourceNzb(ctx, virtualPath, false) assert.NoError(t, err, "delete should succeed even without .id sidecar") - // Meta file gone assert.NoFileExists(t, ms.GetMetadataFilePath(virtualPath)) } -func TestMoveToCorrupted_RemovesIDSymlink(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("symlinks not supported on Windows") - } +func TestMoveToCorrupted_MovesMetadata(t *testing.T) { + root := t.TempDir() + ms := NewMetadataService(root) virtualPath := filepath.Join("movies", "corrupted_movie.mkv") - nzbdavID := "fghij67890" - ms, root := setupMetaWithIDSymlink(t, virtualPath, nzbdavID) - - linkPath := idSymlinkPath(root, nzbdavID) - _, err := os.Lstat(linkPath) - require.NoError(t, err, "symlink should exist before move") + meta := ms.CreateFileMetadata( + 1024, "test.nzb", metapb.FileStatus_FILE_STATUS_HEALTHY, + nil, metapb.Encryption_NONE, "", "", nil, nil, 0, nil, "fghij67890", + ) + require.NoError(t, ms.WriteFileMetadata(virtualPath, meta)) ctx := context.Background() require.NoError(t, ms.MoveToCorrupted(ctx, virtualPath)) - // Symlink removed - _, err = os.Lstat(linkPath) - assert.True(t, os.IsNotExist(err), "symlink should be removed after move to corrupted") - // Original location gone assert.NoFileExists(t, ms.GetMetadataFilePath(virtualPath)) @@ -131,7 +84,7 @@ func TestCleanupOrphanedIDSymlinks(t *testing.T) { root := t.TempDir() ms := NewMetadataService(root) - // Create a valid metadata + symlink + // Create a valid metadata file and manually plant a valid .ids/ symlink for it validPath := filepath.Join("movies", "valid.mkv") validID := "valid12345" meta := ms.CreateFileMetadata( @@ -139,7 +92,13 @@ func TestCleanupOrphanedIDSymlinks(t *testing.T) { nil, metapb.Encryption_NONE, "", "", nil, nil, 0, nil, validID, ) require.NoError(t, ms.WriteFileMetadata(validPath, meta)) - require.NoError(t, ms.UpdateIDSymlink(validID, validPath)) + + // Manually create a valid .ids/ symlink pointing at the .meta file + validMetaPath := ms.GetMetadataFilePath(validPath) + validShardDir := filepath.Join(root, ".ids", "v", "a", "l", "i", "d") + require.NoError(t, os.MkdirAll(validShardDir, 0755)) + validLink := filepath.Join(validShardDir, validID+".meta") + require.NoError(t, os.Symlink(validMetaPath, validLink)) // Create a broken symlink (target does not exist) brokenID := "broke12345" @@ -158,7 +117,6 @@ func TestCleanupOrphanedIDSymlinks(t *testing.T) { assert.True(t, os.IsNotExist(err), "broken symlink should be removed") // Valid symlink still present - validLink := idSymlinkPath(root, validID) _, err = os.Lstat(validLink) assert.NoError(t, err, "valid symlink should still exist") } diff --git a/internal/nzbdav/parser.go b/internal/nzbdav/parser.go index 12930ef80..5934d72fe 100644 --- a/internal/nzbdav/parser.go +++ b/internal/nzbdav/parser.go @@ -184,11 +184,17 @@ func (t *davTree) extractedFilesUnder(releaseID string) []ExtractedFileInfo { return out } +// blobRow holds one row from the parseBlobs query. +type blobRow struct { + id, fileName, davName, releasePath, blobId string +} + func (p *Parser) parseBlobs(db *sql.DB, tree *davTree, out chan<- *ParsedNzb, errChan chan<- error) { rows, err := db.Query(` SELECT d.Id, n.FileName, + COALESCE(d.Name, '') as DavName, COALESCE(d.Path, '/') as ReleasePath, d.NzbBlobId FROM DavItems d @@ -204,20 +210,50 @@ func (p *Parser) parseBlobs(db *sql.DB, tree *davTree, out chan<- *ParsedNzb, er } defer rows.Close() - count := 0 + // Pass 1: collect all rows grouped by blobId, preserving first-seen order. + var blobOrder []string + rowsByBlob := make(map[string][]blobRow) for rows.Next() { - var id, fileName, releasePath, blobId string - if err := rows.Scan(&id, &fileName, &releasePath, &blobId); err != nil { + var r blobRow + if err := rows.Scan(&r.id, &r.fileName, &r.davName, &r.releasePath, &r.blobId); err != nil { slog.ErrorContext(context.Background(), "Failed to scan blob row", "error", err) continue } + if _, seen := rowsByBlob[r.blobId]; !seen { + blobOrder = append(blobOrder, r.blobId) + } + rowsByBlob[r.blobId] = append(rowsByBlob[r.blobId], r) + } + + // Pass 2: emit one ParsedNzb per blob group. + // The first row in each group becomes the canonical item; additional rows + // become ParsedNzbAlias entries so the scanner can register migration rows + // for every DavItem ID that shares the blob. + count := 0 + for _, blobId := range blobOrder { + group := rowsByBlob[blobId] + canonical := group[0] if len(blobId) < 4 { slog.WarnContext(context.Background(), "Invalid blob ID", "id", blobId) continue } - blobPath := filepath.Join(p.blobsPath, blobId[0:2], blobId[2:4], blobId) + release := tree.releaseFor(canonical.id) + releaseName := strings.TrimSuffix(canonical.fileName, ".nzb") + releaseParentPath := canonical.releasePath + releaseID := canonical.id + if release != nil { + releaseID = release.ID + if release.Name != "" { + releaseName = release.Name + } + releaseParentPath = release.Path + } + parentPath := trimLastSegment(releaseParentPath) + category, relPath := p.splitPath(parentPath) + + blobPath := filepath.Join(p.blobsPath, blobId[0:2], blobId[2:4], blobId) blobFile, err := os.Open(blobPath) if err != nil { slog.ErrorContext(context.Background(), "Failed to open blob file", "path", blobPath, "error", err) @@ -250,31 +286,23 @@ func (p *Parser) parseBlobs(db *sql.DB, tree *davTree, out chan<- *ParsedNzb, er pw.Close() }() - // The SubType=203 item is the virtual "output" file. Its parent folder - // is the release directory in nzbdav's tree; we want AltMount's mount - // layout to match that directory layout exactly. - release := tree.releaseFor(id) - releaseName := strings.TrimSuffix(fileName, ".nzb") - releaseParentPath := releasePath - releaseID := id - if release != nil { - releaseID = release.ID - if release.Name != "" { - releaseName = release.Name - } - releaseParentPath = release.Path + // Build alias list from remaining rows. Rows with the same DavName as the + // canonical are duplicates (nzbdav nested-folder bug); rows with distinct + // DavNames represent individual episode files within a season-pack blob. + var aliases []ParsedNzbAlias + for _, r := range group[1:] { + aliases = append(aliases, ParsedNzbAlias{ID: r.id, Name: r.davName}) } - // Strip the release folder name to get its parent path. - parentPath := trimLastSegment(releaseParentPath) - category, relPath := p.splitPath(parentPath) out <- &ParsedNzb{ - ID: id, + ID: canonical.id, Name: releaseName, Category: category, RelPath: relPath, Content: pr, ExtractedFiles: tree.extractedFilesUnder(releaseID), + DavItemName: canonical.davName, + AliasDavItems: aliases, } count++ } diff --git a/internal/nzbdav/parser_test.go b/internal/nzbdav/parser_test.go index 910acab89..4d65b1fb4 100644 --- a/internal/nzbdav/parser_test.go +++ b/internal/nzbdav/parser_test.go @@ -320,6 +320,51 @@ func TestParser_Parse_Blobs_PreservesArbitraryFolderStructure(t *testing.T) { assert.Equal(t, "uncategorized", got[0].RelPath) } +// TestParser_Parse_Blobs_DeduplicateNestedSubType203 reproduces the real-world +// nzbdav bug where a release with many subfiles produces two SubType=203 +// DavItems (e.g. RUNE release): one as a direct child of the release folder, +// and a second nested under the first. Without deduplication both would be +// emitted as separate ParsedNzb values, causing a double import. +func TestParser_Parse_Blobs_DeduplicateNestedSubType203(t *testing.T) { + tmpDir := t.TempDir() + blobsDir := filepath.Join(tmpDir, "blobs") + dbPath := filepath.Join(tmpDir, "dedup.db") + db, err := sql.Open("sqlite3", dbPath) + require.NoError(t, err) + defer db.Close() + + _, err = db.Exec(` + CREATE TABLE DavItems ( + Id TEXT PRIMARY KEY, ParentId TEXT, Name TEXT, FileSize INTEGER, + Path TEXT, NzbBlobId TEXT, SubType INTEGER + ); + CREATE TABLE NzbNames (Id TEXT PRIMARY KEY, FileName TEXT); + `) + require.NoError(t, err) + + blobId := "aabbccddee001122" + writeZstdBlob(t, filepath.Join(blobsDir, "aa", "bb", blobId), []byte("")) + + // Two SubType=203 rows for the same release/blob — the second is nested + // under the first, mirroring the malformed nzbdav DB structure. + _, err = db.Exec(` + INSERT INTO NzbNames (Id, FileName) VALUES ('aabbccddee001122', 'Big.Release.nzb'); + INSERT INTO DavItems (Id, ParentId, Name, Path, NzbBlobId, SubType) VALUES + ('root', NULL, '/', '/', NULL, 1), + ('uncat', 'root', 'uncategorized', '/uncategorized', NULL, 1), + ('folder', 'uncat', 'Big.Release', '/uncategorized/Big.Release', NULL, 1), + ('item1', 'folder', 'Big.Release', '/uncategorized/Big.Release/Big.Release', 'aabbccddee001122', 203), + ('item2', 'item1', 'Big.Release', '/uncategorized/Big.Release/Big.Release/Big.Release','aabbccddee001122', 203); + `) + require.NoError(t, err) + + out, errChan := NewParser(dbPath, blobsDir).Parse() + got := collect(t, out, errChan) + + require.Len(t, got, 1, "expected exactly one ParsedNzb despite two SubType=203 rows for the same release") + assert.Equal(t, "Big.Release", got[0].Name) +} + func TestParser_Parse_Blobs_WithExtracted(t *testing.T) { tmpDir := t.TempDir() blobsDir := filepath.Join(tmpDir, "blobs") diff --git a/internal/nzbdav/types.go b/internal/nzbdav/types.go index 584bb83b6..7a00df8f6 100644 --- a/internal/nzbdav/types.go +++ b/internal/nzbdav/types.go @@ -4,6 +4,14 @@ import ( "io" ) +// ParsedNzbAlias represents a DavItem that shares the same NZB blob as the +// canonical ParsedNzb but refers to a different virtual output file (typically +// a different episode within the same season-pack NZB). +type ParsedNzbAlias struct { + ID string // DavItem.Id + Name string // DavItem.Name (episode filename) +} + type ParsedNzb struct { ID string Category string @@ -11,6 +19,13 @@ type ParsedNzb struct { RelPath string Content io.Reader ExtractedFiles []ExtractedFileInfo + // DavItemName is the DavItem.Name for the canonical DavItem (may be empty for + // legacy parseLegacy items or when Name is unavailable in the DB). + DavItemName string + // AliasDavItems contains other DavItems that share the same NZB blob. + // Non-empty only for multi-file season-pack blobs where each DavItem + // represents a distinct episode file. + AliasDavItems []ParsedNzbAlias } type ExtractedFileInfo struct { diff --git a/internal/nzbfilesystem/metadata_remote_file.go b/internal/nzbfilesystem/metadata_remote_file.go index 7be658955..c3cc508c0 100644 --- a/internal/nzbfilesystem/metadata_remote_file.go +++ b/internal/nzbfilesystem/metadata_remote_file.go @@ -384,16 +384,6 @@ func (mrf *MetadataRemoteFile) RenameFile(ctx context.Context, oldName, newName } } - // Update ID symlinks for all files with NzbdavId under the renamed directory - _ = mrf.metadataService.WalkDirectoryFiles(normalizedNew, func(fileVirtualPath string, meta *metapb.FileMetadata) error { - if meta.NzbdavId != "" { - if err := mrf.metadataService.UpdateIDSymlink(meta.NzbdavId, fileVirtualPath); err != nil { - slog.WarnContext(ctx, "Failed to update ID symlink after directory rename", "id", meta.NzbdavId, "path", fileVirtualPath, "error", err) - } - } - return nil - }) - return true, nil } @@ -404,24 +394,11 @@ func (mrf *MetadataRemoteFile) RenameFile(ctx context.Context, oldName, newName return false, nil } - // Read metadata first to get NzbdavId before rename - fileMeta, err := mrf.metadataService.ReadFileMetadata(normalizedOld) - if err != nil { - return false, fmt.Errorf("failed to read old metadata: %w", err) - } - // Use atomic rename instead of read-write-delete if err := mrf.metadataService.RenameFileMetadata(normalizedOld, normalizedNew); err != nil { return false, fmt.Errorf("failed to rename metadata: %w", err) } - // Update ID symlink if file has a NzbdavId - if fileMeta != nil && fileMeta.NzbdavId != "" { - if err := mrf.metadataService.UpdateIDSymlink(fileMeta.NzbdavId, normalizedNew); err != nil { - slog.WarnContext(ctx, "Failed to update ID symlink during MOVE", "id", fileMeta.NzbdavId, "error", err) - } - } - // Update health records and resolve pending repairs if mrf.healthRepository != nil { if err := mrf.healthRepository.RenameHealthRecord(ctx, normalizedOld, normalizedNew); err != nil {

{/* Icon */} + + {/* Icon */} Name Size Modified