diff --git a/cmd/migrate_nzbdav/main.go b/cmd/migrate_nzbdav/main.go new file mode 100644 index 00000000..bebf2da0 --- /dev/null +++ b/cmd/migrate_nzbdav/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "encoding/json" +) + +// Simplified structures for nzbdav API +type WebDavItem struct { + Id string `json:"id"` + Name string `json:"name"` + Type int `json:"type"` // 1 = folder, 0 = file +} + +func main() { + // CONFIGURATION + apiBase := "http://localhost:8080" // Update to your nzbdav address + targetDir := "/opt/altmount/config_test/.nzbs/migrated" + os.MkdirAll(targetDir, 0755) + + // Get all items from the root + fmt.Println("Fetching releases from nzbdav API...") + items, err := fetchItems(apiBase, "/") + if err != nil { + panic(err) + } + + for _, item := range items { + if item.Type == 1 { // If it's a folder/release + fmt.Printf("Fetching NZB for: %s\n", item.Name) + err := downloadNzb(apiBase, item.Id, filepath.Join(targetDir, sanitizeFilename(item.Name)+".nzb")) + if err != nil { + fmt.Printf("Failed to download %s: %v\n", item.Name, err) + } + } + } +} + +func fetchItems(apiBase, path string) ([]WebDavItem, error) { + resp, err := http.Get(fmt.Sprintf("%s/api/list?path=%s", apiBase, path)) + if err != nil { return nil, err } + defer resp.Body.Close() + + var items []WebDavItem + json.NewDecoder(resp.Body).Decode(&items) + return items, nil +} + +func downloadNzb(apiBase, id, outputPath string) error { + // Adjust endpoint based on nzbdav API design + resp, err := http.Get(fmt.Sprintf("%s/api/download?id=%s", apiBase, id)) + if err != nil { return err } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("status code: %d", resp.StatusCode) + } + + out, _ := os.Create(outputPath) + defer out.Close() + _, err = io.Copy(out, resp.Body) + return err +} + +func sanitizeFilename(name string) string { + return strings.ReplaceAll(name, "/", "_") +} diff --git a/conductor/paginated-history-search.md b/conductor/paginated-history-search.md new file mode 100644 index 00000000..4c7c4e73 --- /dev/null +++ b/conductor/paginated-history-search.md @@ -0,0 +1,22 @@ +# Implementation Plan: Paginated History Search for Reliable Blocklisting + +## Objective +Implement a paginated search mechanism in `blocklistRadarrMovieFile` and `blocklistSonarrEpisodeFile` so the scanner can search through multiple history pages (e.g., up to 5,000 records) to find grab events for legacy files, ensuring the blocklist trigger works even for older releases. + +## Scope & Impact +- **Manager:** Modify `internal/arrs/scanner/manager.go` to add a pagination loop for history fetching. +- **Reliability:** This will make the blocklisting mechanism significantly more reliable for existing media files that don't yet have an associated `DownloadID` in the `file_health` table. + +## Implementation Steps +### 1. Paginated History Fetcher +- Create a helper function or logic block within `blocklistRadarrMovieFile` and `blocklistSonarrEpisodeFile` to iterate through pages. +- Use `starr.PageReq` with `PageSize=1000`. +- Continue fetching pages until the `DownloadID` (or `grabbed` event) is found or the total history limit (e.g., 5,000 records) is reached. + +### 2. Update logic +- Integrate the loop into both ARR scanners. +- Ensure the loop handles termination correctly when history pages are exhausted. + +## Verification +- Confirm that AltMount can now find the `09bcb934-b84e-4a1c-b6d0-e49045c98e47` grab event for the Abbott Elementary FLUX release when triggering a manual repair. +- Verify through logs that the scanner is now checking multiple pages. diff --git a/conductor/reliable-blocklist.md b/conductor/reliable-blocklist.md new file mode 100644 index 00000000..3161908b --- /dev/null +++ b/conductor/reliable-blocklist.md @@ -0,0 +1,31 @@ +# Implementation Plan: Reliable Blocklisting via DownloadID Capture + +## Objective +Enable reliable blocklisting of corrupted releases by capturing and storing the `downloadID` from ARR webhooks at import time, and using this stored ID for blocklisting (failing) the release during streaming failures, bypassing unreliable history lookups. + +## Scope & Impact +- **Database:** Update `file_health` table to store `download_id`. +- **Webhook:** Update `internal/api/arrs_handlers.go` to capture `downloadId` from incoming webhooks. +- **Health Worker:** Update repair logic to use the stored `download_id` for triggering ARR failures. + +## Implementation Steps + +### 1. Database Schema +- Modify `internal/database/models.go` to add `DownloadID` field to `FileHealth` struct. +- Add migration (SQL) to add `download_id` column to `file_health` table. + +### 2. Webhook Handler +- Update `internal/api/arrs_handlers.go`: + - Extract `downloadId` from `ArrsWebhookRequest`. + - Pass this ID to `healthRepo.AddFileToHealthCheck`. + +### 3. Health Worker Repair Logic +- Modify `internal/arrs/scanner/manager.go`: + - Update `blocklistSonarrEpisodeFile` and `blocklistRadarrMovieFile` to accept an optional pre-known `downloadID`. + - If `downloadID` is provided, use it directly (skip history search if possible, or use it to filter history). +- Update `internal/health/worker.go`: + - Ensure `triggerFileRepair` retrieves the `downloadID` from the database record. + +## Verification +- Add a test case in `internal/database/health_repository_test.go` to verify `download_id` persistence. +- Manual test: Import a file, trigger a streaming failure, and verify that AltMount calls the ARR's `FailContext` API using the correct `downloadID` even if the history record has been rotated out. diff --git a/internal/api/arrs_handlers.go b/internal/api/arrs_handlers.go index 5f914372..dac028d5 100644 --- a/internal/api/arrs_handlers.go +++ b/internal/api/arrs_handlers.go @@ -57,6 +57,9 @@ type ArrsWebhookRequest struct { EpisodeFile struct { Path string `json:"path"` } `json:"episodeFile"` + Download struct { + ID string `json:"id"` + } `json:"download"` DeletedFiles ArrsDeletedFiles `json:"deletedFiles,omitempty"` } @@ -449,7 +452,7 @@ func (s *Server) handleArrsWebhook(c *fiber.Ctx) error { // Add to health check (pending status) with high priority (Next) to ensure it's processed right away cfg := s.configManager.GetConfigGetter()() - err := s.healthRepo.AddFileToHealthCheckWithMetadata(c.Context(), normalizedPath, &path, cfg.GetMaxRetries(), cfg.GetMaxRepairRetries(), sourceNzb, database.HealthPriorityNext, releaseDate) + err := s.healthRepo.AddFileToHealthCheckWithMetadata(c.Context(), normalizedPath, &path, cfg.GetMaxRetries(), cfg.GetMaxRepairRetries(), sourceNzb, req.Download.ID, database.HealthPriorityNext, releaseDate) if err != nil { slog.ErrorContext(c.Context(), "Failed to add webhook file to health check", "path", normalizedPath, "error", err) } else { diff --git a/internal/api/health_handlers.go b/internal/api/health_handlers.go index a547940f..6c3b6461 100644 --- a/internal/api/health_handlers.go +++ b/internal/api/health_handlers.go @@ -439,7 +439,7 @@ func (s *Server) handleRepairHealth(c *fiber.Ctx) error { } // Trigger rescan with the resolved path - err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath) + err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath, "") if err != nil { // Check if this is a "no ARR instance found" error if strings.Contains(err.Error(), "no ARR instance found") { @@ -546,7 +546,7 @@ func (s *Server) handleRepairHealthBulk(c *fiber.Ctx) error { } // Trigger rescan - err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath) + err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath, "") if err != nil { failedCount++ errors[filePath] = fmt.Sprintf("Failed to trigger repair: %v", err) @@ -879,7 +879,7 @@ func (s *Server) handleAddHealthCheck(c *fiber.Ctx) error { } // Add file to health database - err := s.healthRepo.AddFileToHealthCheck(c.Context(), req.FilePath, req.LibraryPath, maxRetries, cfg.GetMaxRepairRetries(), req.SourceNzb, req.Priority) + err := s.healthRepo.AddFileToHealthCheck(c.Context(), req.FilePath, req.LibraryPath, maxRetries, cfg.GetMaxRepairRetries(), req.SourceNzb, "", req.Priority) if err != nil { return RespondInternalError(c, "Failed to add file for health check", err.Error()) } diff --git a/internal/arrs/scanner/manager.go b/internal/arrs/scanner/manager.go index f7288dca..2fe27f46 100644 --- a/internal/arrs/scanner/manager.go +++ b/internal/arrs/scanner/manager.go @@ -297,9 +297,9 @@ func (m *Manager) sonarrHasFile(ctx context.Context, client *sonarr.Sonarr, inst } // TriggerFileRescan triggers a rescan for a specific file path through the appropriate ARR instance -func (m *Manager) TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string) error { +func (m *Manager) TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string, downloadID string) error { res, err, _ := m.sf.Do(fmt.Sprintf("rescan:%s", pathForRescan), func() (interface{}, error) { - slog.InfoContext(ctx, "Triggering ARR rescan", "path", pathForRescan, "relative_path", relativePath) + slog.InfoContext(ctx, "Triggering ARR rescan", "path", pathForRescan, "relative_path", relativePath, "download_id", downloadID) // Find which ARR instance manages this file path instanceType, instanceName, err := m.findInstanceForFilePath(ctx, pathForRescan, relativePath) @@ -325,14 +325,14 @@ func (m *Manager) TriggerFileRescan(ctx context.Context, pathForRescan string, r if err != nil { return nil, fmt.Errorf("failed to create Radarr client: %w", err) } - return nil, m.triggerRadarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName) + return nil, m.triggerRadarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName, downloadID) case "sonarr": client, err := m.clients.GetOrCreateSonarrClient(instanceName, instanceConfig.URL, instanceConfig.APIKey) if err != nil { return nil, fmt.Errorf("failed to create Sonarr client: %w", err) } - return nil, m.triggerSonarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName) + return nil, m.triggerSonarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName, downloadID) case "lidarr", "readarr", "whisparr": // For now, we only support RefreshMonitoredDownloads for these @@ -492,11 +492,12 @@ func (m *Manager) TriggerDownloadScan(ctx context.Context, instanceType string) } // triggerRadarrRescanByPath triggers a rescan in Radarr for the given file path -func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr.Radarr, filePath, relativePath, instanceName string) error { +func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr.Radarr, filePath, relativePath, instanceName string, downloadID string) error { slog.InfoContext(ctx, "Searching Radarr for matching movie", "instance", instanceName, "file_path", filePath, - "relative_path", relativePath) + "relative_path", relativePath, + "download_id", downloadID) // Get all movies to find the one with matching file path movies, err := m.data.GetMovies(ctx, client, instanceName) @@ -573,7 +574,7 @@ func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr. // But we can still trigger search if targetMovie.HasFile && targetMovie.MovieFile != nil { // Try to blocklist the release associated with this file - if err := m.blocklistRadarrMovieFile(ctx, client, targetMovie.ID, targetMovie.MovieFile.ID); err != nil { + if err := m.blocklistRadarrMovieFile(ctx, client, targetMovie.ID, targetMovie.MovieFile.ID, downloadID); err != nil { slog.WarnContext(ctx, "Failed to blocklist Radarr release", "error", err) } @@ -611,7 +612,7 @@ func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr. } // triggerSonarrRescanByPath triggers a rescan in Sonarr for the given file path -func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr.Sonarr, filePath, relativePath, instanceName string) error { +func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr.Sonarr, filePath, relativePath, instanceName string, downloadID string) error { cfg := m.configGetter() // Get library directory from health config @@ -624,7 +625,8 @@ func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr. "instance", instanceName, "file_path", filePath, "relative_path", relativePath, - "library_dir", libraryDir) + "library_dir", libraryDir, + "download_id", downloadID) // Get all series to find the one that contains this file path series, err := m.data.GetSeries(ctx, client, instanceName) @@ -738,7 +740,7 @@ func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr. "episode_file_id", targetEpisodeFile.ID) // Try to blocklist the release associated with this file - if err := m.blocklistSonarrEpisodeFile(ctx, client, targetSeries.ID, targetEpisodeFile.ID); err != nil { + if err := m.blocklistSonarrEpisodeFile(ctx, client, targetSeries.ID, targetEpisodeFile.ID, downloadID); err != nil { slog.WarnContext(ctx, "Failed to blocklist Sonarr release", "error", err) } @@ -843,35 +845,50 @@ func (m *Manager) failSonarrQueueItemByPath(ctx context.Context, client *sonarr. } // blocklistRadarrMovieFile finds the history event for the given file and marks it as failed (blocklisting the release) -func (m *Manager) blocklistRadarrMovieFile(ctx context.Context, client *radarr.Radarr, movieID int64, fileID int64) error { - slog.DebugContext(ctx, "Attempting to find and blocklist release for movie file", "movie_id", movieID, "file_id", fileID) +func (m *Manager) blocklistRadarrMovieFile(ctx context.Context, client *radarr.Radarr, movieID int64, fileID int64, knownDownloadID string) error { + slog.DebugContext(ctx, "Attempting to find and blocklist release for movie file", "movie_id", movieID, "file_id", fileID, "known_download_id", knownDownloadID) - // Fetch history for this specific movie - req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend} - req.Set("movieId", strconv.FormatInt(movieID, 10)) + var downloadID string = knownDownloadID + var history *radarr.History - history, err := client.GetHistoryPageContext(ctx, req) - if err != nil { - return fmt.Errorf("failed to fetch Radarr history: %w", err) - } + if downloadID == "" { + // Fetch history for this specific movie + req := &starr.PageReq{PageSize: 1000, SortKey: "date", SortDir: starr.SortDescend} + req.Set("movieId", strconv.FormatInt(movieID, 10)) + + var err error + history, err = client.GetHistoryPageContext(ctx, req) + if err != nil { + return fmt.Errorf("failed to fetch Radarr history: %w", err) + } - targetFileID := strconv.FormatInt(fileID, 10) - var downloadID string + targetFileID := strconv.FormatInt(fileID, 10) - // 1. Find the import event to get the downloadId - for _, record := range history.Records { - if record.Data.FileID == targetFileID && (record.EventType == "movieFileImported" || record.EventType == "downloadFolderImported") { - downloadID = record.DownloadID - break + // 1. Find the import event to get the downloadId + for _, record := range history.Records { + if record.Data.FileID == targetFileID && (record.EventType == "movieFileImported" || record.EventType == "downloadFolderImported") { + downloadID = record.DownloadID + break + } } - } - if downloadID == "" { - slog.WarnContext(ctx, "Could not find import event in Radarr history for file", "movie_id", movieID, "file_id", fileID) - return nil + if downloadID == "" { + slog.WarnContext(ctx, "Could not find import event in Radarr history for file", "movie_id", movieID, "file_id", fileID) + return nil + } } // 2. Find the original grab event using the downloadId + if history == nil { + req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend} + req.Set("movieId", strconv.FormatInt(movieID, 10)) + var err error + history, err = client.GetHistoryPageContext(ctx, req) + if err != nil { + return fmt.Errorf("failed to fetch Radarr history: %w", err) + } + } + for _, record := range history.Records { if record.DownloadID == downloadID && record.EventType == "grabbed" { slog.InfoContext(ctx, "Found grabbed history record, marking as failed to blocklist release", @@ -888,35 +905,50 @@ func (m *Manager) blocklistRadarrMovieFile(ctx context.Context, client *radarr.R } // blocklistSonarrEpisodeFile finds the grabbed history event for the given file and marks it as failed (blocklisting the release) -func (m *Manager) blocklistSonarrEpisodeFile(ctx context.Context, client *sonarr.Sonarr, seriesID int64, fileID int64) error { - slog.DebugContext(ctx, "Attempting to find and blocklist release for episode file", "series_id", seriesID, "file_id", fileID) +func (m *Manager) blocklistSonarrEpisodeFile(ctx context.Context, client *sonarr.Sonarr, seriesID int64, fileID int64, knownDownloadID string) error { + slog.DebugContext(ctx, "Attempting to find and blocklist release for episode file", "series_id", seriesID, "file_id", fileID, "known_download_id", knownDownloadID) - // Fetch history for this specific series - req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend} - req.Set("seriesId", strconv.FormatInt(seriesID, 10)) + var downloadID string = knownDownloadID + var history *sonarr.History - history, err := client.GetHistoryPageContext(ctx, req) - if err != nil { - return fmt.Errorf("failed to fetch Sonarr history: %w", err) - } + if downloadID == "" { + // Fetch history for this specific series + req := &starr.PageReq{PageSize: 1000, SortKey: "date", SortDir: starr.SortDescend} + req.Set("seriesId", strconv.FormatInt(seriesID, 10)) - targetFileID := strconv.FormatInt(fileID, 10) - var downloadID string + var err error + history, err = client.GetHistoryPageContext(ctx, req) + if err != nil { + return fmt.Errorf("failed to fetch Sonarr history: %w", err) + } - // 1. Find the import event to get the downloadId - for _, record := range history.Records { - if record.Data.FileID == targetFileID && record.EventType == "downloadFolderImported" { - downloadID = record.DownloadID - break + targetFileID := strconv.FormatInt(fileID, 10) + + // 1. Find the import event to get the downloadId + for _, record := range history.Records { + if record.Data.FileID == targetFileID && record.EventType == "downloadFolderImported" { + downloadID = record.DownloadID + break + } } - } - if downloadID == "" { - slog.WarnContext(ctx, "Could not find import event in Sonarr history for file", "series_id", seriesID, "file_id", fileID) - return nil + if downloadID == "" { + slog.WarnContext(ctx, "Could not find import event in Sonarr history for file", "series_id", seriesID, "file_id", fileID) + return nil + } } // 2. Find the original grab event using the downloadId + if history == nil { + req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend} + req.Set("seriesId", strconv.FormatInt(seriesID, 10)) + var err error + history, err = client.GetHistoryPageContext(ctx, req) + if err != nil { + return fmt.Errorf("failed to fetch Sonarr history: %w", err) + } + } + for _, record := range history.Records { if record.DownloadID == downloadID && record.EventType == "grabbed" { slog.InfoContext(ctx, "Found grabbed history record, marking as failed to blocklist release", diff --git a/internal/arrs/service.go b/internal/arrs/service.go index a02c38e1..b9917413 100644 --- a/internal/arrs/service.go +++ b/internal/arrs/service.go @@ -173,8 +173,8 @@ func (s *Service) CleanupQueue(ctx context.Context) error { } // TriggerFileRescan triggers a rescan for a specific file path through the appropriate ARR instance -func (s *Service) TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string) error { - return s.scanner.TriggerFileRescan(ctx, pathForRescan, relativePath) +func (s *Service) TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string, downloadID string) error { + return s.scanner.TriggerFileRescan(ctx, pathForRescan, relativePath, downloadID) } // TriggerScanForFile finds the ARR instance managing the file and triggers a download scan on it. diff --git a/internal/database/health_repository.go b/internal/database/health_repository.go index 0d3de34e..3cf8bfcc 100644 --- a/internal/database/health_repository.go +++ b/internal/database/health_repository.go @@ -681,23 +681,24 @@ func (r *HealthRepository) RegisterCorruptedFile(ctx context.Context, filePath s } // AddFileToHealthCheck adds a file to the health database for checking -func (r *HealthRepository) AddFileToHealthCheck(ctx context.Context, filePath string, libraryPath *string, maxRetries int, maxRepairRetries int, sourceNzbPath *string, priority HealthPriority) error { - return r.AddFileToHealthCheckWithMetadata(ctx, filePath, libraryPath, maxRetries, maxRepairRetries, sourceNzbPath, priority, nil) +func (r *HealthRepository) AddFileToHealthCheck(ctx context.Context, filePath string, libraryPath *string, maxRetries int, maxRepairRetries int, sourceNzbPath *string, downloadID string, priority HealthPriority) error { + return r.AddFileToHealthCheckWithMetadata(ctx, filePath, libraryPath, maxRetries, maxRepairRetries, sourceNzbPath, downloadID, priority, nil) } // AddFileToHealthCheckWithMetadata adds a file to the health database for checking with metadata -func (r *HealthRepository) AddFileToHealthCheckWithMetadata(ctx context.Context, filePath string, libraryPath *string, maxRetries int, maxRepairRetries int, sourceNzbPath *string, priority HealthPriority, releaseDate *time.Time) error { +func (r *HealthRepository) AddFileToHealthCheckWithMetadata(ctx context.Context, filePath string, libraryPath *string, maxRetries int, maxRepairRetries int, sourceNzbPath *string, downloadID string, priority HealthPriority, releaseDate *time.Time) error { var releaseDateStr any = nil if releaseDate != nil { releaseDateStr = releaseDate.UTC().Format("2006-01-02 15:04:05") } query := ` - INSERT INTO file_health (file_path, library_path, status, last_checked, retry_count, max_retries, repair_retry_count, max_repair_retries, source_nzb_path, priority, release_date, created_at, updated_at, scheduled_check_at) - VALUES (?, ?, ?, datetime('now'), 0, ?, 0, ?, ?, ?, ?, datetime('now'), datetime('now'), datetime('now')) + INSERT INTO file_health (file_path, library_path, status, last_checked, retry_count, max_retries, repair_retry_count, max_repair_retries, source_nzb_path, download_id, priority, release_date, created_at, updated_at, scheduled_check_at) + VALUES (?, ?, ?, datetime('now'), 0, ?, 0, ?, ?, ?, ?, ?, datetime('now'), datetime('now'), datetime('now')) ON CONFLICT(file_path) DO UPDATE SET library_path = COALESCE(excluded.library_path, library_path), + download_id = COALESCE(excluded.download_id, download_id), status = excluded.status, retry_count = 0, repair_retry_count = 0, @@ -712,7 +713,7 @@ func (r *HealthRepository) AddFileToHealthCheckWithMetadata(ctx context.Context, scheduled_check_at = datetime('now') ` - _, err := r.db.ExecContext(ctx, query, filePath, libraryPath, HealthStatusPending, maxRetries, maxRepairRetries, sourceNzbPath, priority, releaseDateStr) + _, err := r.db.ExecContext(ctx, query, filePath, libraryPath, HealthStatusPending, maxRetries, maxRepairRetries, sourceNzbPath, downloadID, priority, releaseDateStr) if err != nil { return fmt.Errorf("failed to add file to health check: %w", err) diff --git a/internal/database/health_repository_test.go b/internal/database/health_repository_test.go index 7683c9ab..d8525852 100644 --- a/internal/database/health_repository_test.go +++ b/internal/database/health_repository_test.go @@ -35,7 +35,8 @@ func setupTestDB(t *testing.T) *HealthRepository { scheduled_check_at DATETIME, priority INTEGER DEFAULT 0, streaming_failure_count INTEGER DEFAULT 0, - is_masked BOOLEAN DEFAULT FALSE + is_masked BOOLEAN DEFAULT FALSE, + download_id TEXT DEFAULT '' ); `) require.NoError(t, err) @@ -306,7 +307,7 @@ func TestAddFileToHealthCheckWithMetadata_StoresLibraryPath(t *testing.T) { sourceNzb := "Dune.nzb" // Add the file - err := repo.AddFileToHealthCheckWithMetadata(ctx, filePath, &libraryPath, 3, 3, &sourceNzb, HealthPriorityNormal, nil) + err := repo.AddFileToHealthCheckWithMetadata(ctx, filePath, &libraryPath, 3, 3, &sourceNzb, "", HealthPriorityNormal, nil) require.NoError(t, err) // Verify it was stored diff --git a/internal/database/migrations/postgres/023_add_download_id_to_health.sql b/internal/database/migrations/postgres/023_add_download_id_to_health.sql new file mode 100644 index 00000000..f60c86c1 --- /dev/null +++ b/internal/database/migrations/postgres/023_add_download_id_to_health.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE file_health ADD COLUMN download_id TEXT NOT NULL DEFAULT ''; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE file_health DROP COLUMN download_id; +-- +goose StatementEnd diff --git a/internal/database/migrations/sqlite/023_add_download_id_to_health.sql b/internal/database/migrations/sqlite/023_add_download_id_to_health.sql new file mode 100644 index 00000000..f60c86c1 --- /dev/null +++ b/internal/database/migrations/sqlite/023_add_download_id_to_health.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE file_health ADD COLUMN download_id TEXT NOT NULL DEFAULT ''; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE file_health DROP COLUMN download_id; +-- +goose StatementEnd diff --git a/internal/database/models.go b/internal/database/models.go index be5e56ba..7495dd38 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -99,6 +99,7 @@ type FileHealth struct { RepairRetryCount int `db:"repair_retry_count"` // Repair retry count MaxRepairRetries int `db:"max_repair_retries"` // Max repair retries SourceNzbPath *string `db:"source_nzb_path"` + DownloadID string `db:"download_id"` ErrorDetails *string `db:"error_details"` // JSON error details CreatedAt time.Time `db:"created_at"` UpdatedAt time.Time `db:"updated_at"` diff --git a/internal/health/worker.go b/internal/health/worker.go index d273d3a4..344610ac 100644 --- a/internal/health/worker.go +++ b/internal/health/worker.go @@ -22,9 +22,9 @@ import ( "github.com/sourcegraph/conc/pool" ) -// ARRsRepairService abstracts the ARR repair operations needed by HealthWorker. +// ARRsRepairService abstracts the ARR repair operations needed by the filesystem. type ARRsRepairService interface { - TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string) error + TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string, downloadID string) error } // WorkerStatus represents the current status of the health worker @@ -961,7 +961,7 @@ func (hw *HealthWorker) triggerFileRepair(ctx context.Context, item *database.Fi pathForRescan := hw.resolvePathForRescan(item) - err := hw.arrsService.TriggerFileRescan(ctx, pathForRescan, filePath) + err := hw.arrsService.TriggerFileRescan(ctx, pathForRescan, filePath, item.DownloadID) if err != nil { if errors.Is(err, arrs.ErrEpisodeAlreadySatisfied) || errors.Is(err, arrs.ErrPathMatchFailed) { slog.WarnContext(ctx, "File no longer tracked by ARR, removing from AltMount", @@ -1010,7 +1010,7 @@ func (hw *HealthWorker) retriggerFileRepair(ctx context.Context, item *database. slog.InfoContext(ctx, "Re-triggering ARR rescan for file in repair", "file_path", filePath, "path_for_rescan", pathForRescan) - err := hw.arrsService.TriggerFileRescan(ctx, pathForRescan, filePath) + err := hw.arrsService.TriggerFileRescan(ctx, pathForRescan, filePath, item.DownloadID) if err != nil { if errors.Is(err, arrs.ErrEpisodeAlreadySatisfied) || errors.Is(err, arrs.ErrPathMatchFailed) { slog.WarnContext(ctx, "File no longer tracked by ARR during re-trigger, removing from AltMount", "file_path", filePath) diff --git a/internal/importer/parser/namespace_test.go b/internal/importer/parser/namespace_test.go new file mode 100644 index 00000000..c2828f30 --- /dev/null +++ b/internal/importer/parser/namespace_test.go @@ -0,0 +1,54 @@ +package parser + +import ( + "strings" + "testing" + + "github.com/javi11/nzbparser" + "github.com/stretchr/testify/assert" +) + +func TestNzbParserNamespaces(t *testing.T) { + tests := []struct { + name string + namespace string + }{ + { + name: "2003 Namespace", + namespace: "http://www.newzbin.com/DTD/2003/nzb", + }, + { + name: "1.1 Namespace", + namespace: "http://www.newzbin.com/DTD/nzb/nzb-1.1.dtd", + }, + { + name: "No Namespace", + namespace: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nzbXML := `` + if tt.namespace != "" { + nzbXML += `` + } else { + nzbXML += `` + } + nzbXML += ` + + + alt.binaries.test + + + seg1 + + +` + r := strings.NewReader(nzbXML) + n, err := nzbparser.Parse(r) + assert.NoError(t, err) + assert.Equal(t, 1, len(n.Files), "Should find 1 file with namespace %s", tt.namespace) + }) + } +} diff --git a/internal/importer/postprocessor/health_scheduler.go b/internal/importer/postprocessor/health_scheduler.go index 7e262883..68fdb6c2 100644 --- a/internal/importer/postprocessor/health_scheduler.go +++ b/internal/importer/postprocessor/health_scheduler.go @@ -29,7 +29,7 @@ func (c *Coordinator) ScheduleHealthCheck(ctx context.Context, resultingPath str // Add/Update health record with high priority cfg := c.configGetter() - err = c.healthRepo.AddFileToHealthCheck(ctx, resultingPath, &resultingPath, cfg.GetMaxRetries(), cfg.GetMaxRepairRetries(), &fileMeta.SourceNzbPath, database.HealthPriorityNext) + err = c.healthRepo.AddFileToHealthCheck(ctx, resultingPath, &resultingPath, cfg.GetMaxRetries(), cfg.GetMaxRepairRetries(), &fileMeta.SourceNzbPath, "", database.HealthPriorityNext) if err != nil { slog.ErrorContext(ctx, "Failed to schedule immediate health check for imported file", "path", resultingPath, diff --git a/internal/nzbdav/deserializer_test.go b/internal/nzbdav/deserializer_test.go new file mode 100644 index 00000000..442aaea6 --- /dev/null +++ b/internal/nzbdav/deserializer_test.go @@ -0,0 +1,23 @@ +package nzbdav + +import ( + "os" + "testing" + "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/assert" +) + +func TestBlobDecompression(t *testing.T) { + blobPath := "/media/docker/data/nzbdav/blobs/4e/49/4e4944aa-0ab7-4001-9bc0-aa44ce5b1d01" + data, err := os.ReadFile(blobPath) + assert.NoError(t, err) + + zr, err := zstd.NewReader(nil) + assert.NoError(t, err) + decompressed, err := zr.DecodeAll(data, nil) + assert.NoError(t, err) + + t.Logf("Decompressed size: %d", len(decompressed)) + // Look for typical structures, e.g., strings or message ID patterns + t.Logf("First 200 bytes (hex): %x", decompressed[:200]) +} diff --git a/internal/nzbdav/scrape_test.go b/internal/nzbdav/scrape_test.go new file mode 100644 index 00000000..540dbd00 --- /dev/null +++ b/internal/nzbdav/scrape_test.go @@ -0,0 +1,62 @@ +package nzbdav + +import ( + "encoding/binary" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestScrapeMessageIds(t *testing.T) { + p := &Parser{} + + // Construct a blob similar to the one we saw + // Header: 02 10 82 28 ... + // Length: 27 00 00 00 (39) + // MsgId: 750aadf553d4f97bad53285a233b53a@ngPost + msgId := "750aadf553d4f97bad53285a233b53a@ngPost" + data, _ := os.ReadFile("/tmp/walter_boys.raw") + copy(data[0:4], []byte{0x02, 0x10, 0x82, 0x28}) + binary.LittleEndian.PutUint32(data[4:8], uint32(len(msgId))) + copy(data[8:], []byte(msgId)) + + results := p.scrapeMessageIds(data) + t.Logf("Found %d IDs", len(results)) + assert.Greater(t, len(results), 0, "Should have found some IDs") + +} + +func TestGetBlobPath(t *testing.T) { + // Create a temp blobs directory + tempDir, err := os.MkdirTemp("", "blobs") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + blobId := "4E4944AA-0AB7-4001-9BC0-AA44CE5B1D01" + id := "4e4944aa-0ab7-4001-9bc0-aa44ce5b1d01" + + // Create the nested directory structure + blobDir := filepath.Join(tempDir, id[0:2], id[2:4]) + if err := os.MkdirAll(blobDir, 0755); err != nil { + t.Fatal(err) + } + + // Create a lowercase file on disk + blobPath := filepath.Join(blobDir, id) + if err := os.WriteFile(blobPath, []byte("test"), 0644); err != nil { + t.Fatal(err) + } + + p := &Parser{ + blobsPath: tempDir, + } + + // Should find it even with uppercase blobId + foundPath := p.getBlobPath(blobId) + assert.Equal(t, blobPath, foundPath) +} + diff --git a/internal/nzbfilesystem/types.go b/internal/nzbfilesystem/types.go index bc43f0ee..9b949d90 100644 --- a/internal/nzbfilesystem/types.go +++ b/internal/nzbfilesystem/types.go @@ -32,7 +32,7 @@ type ActiveStream struct { // ARRsRepairService abstracts the ARR repair operations needed by the filesystem. type ARRsRepairService interface { - TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string) error + TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string, downloadID string) error } // StreamTracker interface for tracking active streams