Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions cmd/migrate_nzbdav/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"encoding/json"
)

// Simplified structures for nzbdav API
type WebDavItem struct {
Id string `json:"id"`
Name string `json:"name"`
Type int `json:"type"` // 1 = folder, 0 = file
}

func main() {
// CONFIGURATION
apiBase := "http://localhost:8080" // Update to your nzbdav address
targetDir := "/opt/altmount/config_test/.nzbs/migrated"
os.MkdirAll(targetDir, 0755)

// Get all items from the root
fmt.Println("Fetching releases from nzbdav API...")
items, err := fetchItems(apiBase, "/")
if err != nil {
panic(err)
}

for _, item := range items {
if item.Type == 1 { // If it's a folder/release
fmt.Printf("Fetching NZB for: %s\n", item.Name)
err := downloadNzb(apiBase, item.Id, filepath.Join(targetDir, sanitizeFilename(item.Name)+".nzb"))
if err != nil {
fmt.Printf("Failed to download %s: %v\n", item.Name, err)
}
}
}
}

func fetchItems(apiBase, path string) ([]WebDavItem, error) {
resp, err := http.Get(fmt.Sprintf("%s/api/list?path=%s", apiBase, path))
if err != nil { return nil, err }
defer resp.Body.Close()

var items []WebDavItem
json.NewDecoder(resp.Body).Decode(&items)
return items, nil
}

func downloadNzb(apiBase, id, outputPath string) error {
// Adjust endpoint based on nzbdav API design
resp, err := http.Get(fmt.Sprintf("%s/api/download?id=%s", apiBase, id))
if err != nil { return err }
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("status code: %d", resp.StatusCode)
}

out, _ := os.Create(outputPath)
defer out.Close()
_, err = io.Copy(out, resp.Body)
return err
}

func sanitizeFilename(name string) string {
return strings.ReplaceAll(name, "/", "_")
}
22 changes: 22 additions & 0 deletions conductor/paginated-history-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Implementation Plan: Paginated History Search for Reliable Blocklisting

## Objective
Implement a paginated search mechanism in `blocklistRadarrMovieFile` and `blocklistSonarrEpisodeFile` so the scanner can search through multiple history pages (e.g., up to 5,000 records) to find grab events for legacy files, ensuring the blocklist trigger works even for older releases.

## Scope & Impact
- **Manager:** Modify `internal/arrs/scanner/manager.go` to add a pagination loop for history fetching.
- **Reliability:** This will make the blocklisting mechanism significantly more reliable for existing media files that don't yet have an associated `DownloadID` in the `file_health` table.

## Implementation Steps
### 1. Paginated History Fetcher
- Create a helper function or logic block within `blocklistRadarrMovieFile` and `blocklistSonarrEpisodeFile` to iterate through pages.
- Use `starr.PageReq` with `PageSize=1000`.
- Continue fetching pages until the `DownloadID` (or `grabbed` event) is found or the total history limit (e.g., 5,000 records) is reached.

### 2. Update logic
- Integrate the loop into both ARR scanners.
- Ensure the loop handles termination correctly when history pages are exhausted.

## Verification
- Confirm that AltMount can now find the `09bcb934-b84e-4a1c-b6d0-e49045c98e47` grab event for the Abbott Elementary FLUX release when triggering a manual repair.
- Verify through logs that the scanner is now checking multiple pages.
31 changes: 31 additions & 0 deletions conductor/reliable-blocklist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Implementation Plan: Reliable Blocklisting via DownloadID Capture

## Objective
Enable reliable blocklisting of corrupted releases by capturing and storing the `downloadID` from ARR webhooks at import time, and using this stored ID for blocklisting (failing) the release during streaming failures, bypassing unreliable history lookups.

## Scope & Impact
- **Database:** Update `file_health` table to store `download_id`.
- **Webhook:** Update `internal/api/arrs_handlers.go` to capture `downloadId` from incoming webhooks.
- **Health Worker:** Update repair logic to use the stored `download_id` for triggering ARR failures.

## Implementation Steps

### 1. Database Schema
- Modify `internal/database/models.go` to add `DownloadID` field to `FileHealth` struct.
- Add migration (SQL) to add `download_id` column to `file_health` table.

### 2. Webhook Handler
- Update `internal/api/arrs_handlers.go`:
- Extract `downloadId` from `ArrsWebhookRequest`.
- Pass this ID to `healthRepo.AddFileToHealthCheck`.

### 3. Health Worker Repair Logic
- Modify `internal/arrs/scanner/manager.go`:
- Update `blocklistSonarrEpisodeFile` and `blocklistRadarrMovieFile` to accept an optional pre-known `downloadID`.
- If `downloadID` is provided, use it directly (skip history search if possible, or use it to filter history).
- Update `internal/health/worker.go`:
- Ensure `triggerFileRepair` retrieves the `downloadID` from the database record.

## Verification
- Add a test case in `internal/database/health_repository_test.go` to verify `download_id` persistence.
- Manual test: Import a file, trigger a streaming failure, and verify that AltMount calls the ARR's `FailContext` API using the correct `downloadID` even if the history record has been rotated out.
5 changes: 4 additions & 1 deletion internal/api/arrs_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type ArrsWebhookRequest struct {
EpisodeFile struct {
Path string `json:"path"`
} `json:"episodeFile"`
Download struct {
ID string `json:"id"`
} `json:"download"`
DeletedFiles ArrsDeletedFiles `json:"deletedFiles,omitempty"`
}

Expand Down Expand Up @@ -449,7 +452,7 @@ func (s *Server) handleArrsWebhook(c *fiber.Ctx) error {

// Add to health check (pending status) with high priority (Next) to ensure it's processed right away
cfg := s.configManager.GetConfigGetter()()
err := s.healthRepo.AddFileToHealthCheckWithMetadata(c.Context(), normalizedPath, &path, cfg.GetMaxRetries(), cfg.GetMaxRepairRetries(), sourceNzb, database.HealthPriorityNext, releaseDate)
err := s.healthRepo.AddFileToHealthCheckWithMetadata(c.Context(), normalizedPath, &path, cfg.GetMaxRetries(), cfg.GetMaxRepairRetries(), sourceNzb, req.Download.ID, database.HealthPriorityNext, releaseDate)
if err != nil {
slog.ErrorContext(c.Context(), "Failed to add webhook file to health check", "path", normalizedPath, "error", err)
} else {
Expand Down
6 changes: 3 additions & 3 deletions internal/api/health_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (s *Server) handleRepairHealth(c *fiber.Ctx) error {
}

// Trigger rescan with the resolved path
err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath)
err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath, "")
if err != nil {
// Check if this is a "no ARR instance found" error
if strings.Contains(err.Error(), "no ARR instance found") {
Expand Down Expand Up @@ -546,7 +546,7 @@ func (s *Server) handleRepairHealthBulk(c *fiber.Ctx) error {
}

// Trigger rescan
err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath)
err = s.arrsService.TriggerFileRescan(ctx, pathForRescan, item.FilePath, "")
if err != nil {
failedCount++
errors[filePath] = fmt.Sprintf("Failed to trigger repair: %v", err)
Expand Down Expand Up @@ -879,7 +879,7 @@ func (s *Server) handleAddHealthCheck(c *fiber.Ctx) error {
}

// Add file to health database
err := s.healthRepo.AddFileToHealthCheck(c.Context(), req.FilePath, req.LibraryPath, maxRetries, cfg.GetMaxRepairRetries(), req.SourceNzb, req.Priority)
err := s.healthRepo.AddFileToHealthCheck(c.Context(), req.FilePath, req.LibraryPath, maxRetries, cfg.GetMaxRepairRetries(), req.SourceNzb, "", req.Priority)
if err != nil {
return RespondInternalError(c, "Failed to add file for health check", err.Error())
}
Expand Down
132 changes: 82 additions & 50 deletions internal/arrs/scanner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ func (m *Manager) sonarrHasFile(ctx context.Context, client *sonarr.Sonarr, inst
}

// TriggerFileRescan triggers a rescan for a specific file path through the appropriate ARR instance
func (m *Manager) TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string) error {
func (m *Manager) TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string, downloadID string) error {
res, err, _ := m.sf.Do(fmt.Sprintf("rescan:%s", pathForRescan), func() (interface{}, error) {
slog.InfoContext(ctx, "Triggering ARR rescan", "path", pathForRescan, "relative_path", relativePath)
slog.InfoContext(ctx, "Triggering ARR rescan", "path", pathForRescan, "relative_path", relativePath, "download_id", downloadID)

// Find which ARR instance manages this file path
instanceType, instanceName, err := m.findInstanceForFilePath(ctx, pathForRescan, relativePath)
Expand All @@ -325,14 +325,14 @@ func (m *Manager) TriggerFileRescan(ctx context.Context, pathForRescan string, r
if err != nil {
return nil, fmt.Errorf("failed to create Radarr client: %w", err)
}
return nil, m.triggerRadarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName)
return nil, m.triggerRadarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName, downloadID)

case "sonarr":
client, err := m.clients.GetOrCreateSonarrClient(instanceName, instanceConfig.URL, instanceConfig.APIKey)
if err != nil {
return nil, fmt.Errorf("failed to create Sonarr client: %w", err)
}
return nil, m.triggerSonarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName)
return nil, m.triggerSonarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName, downloadID)

case "lidarr", "readarr", "whisparr":
// For now, we only support RefreshMonitoredDownloads for these
Expand Down Expand Up @@ -492,11 +492,12 @@ func (m *Manager) TriggerDownloadScan(ctx context.Context, instanceType string)
}

// triggerRadarrRescanByPath triggers a rescan in Radarr for the given file path
func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr.Radarr, filePath, relativePath, instanceName string) error {
func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr.Radarr, filePath, relativePath, instanceName string, downloadID string) error {
slog.InfoContext(ctx, "Searching Radarr for matching movie",
"instance", instanceName,
"file_path", filePath,
"relative_path", relativePath)
"relative_path", relativePath,
"download_id", downloadID)

// Get all movies to find the one with matching file path
movies, err := m.data.GetMovies(ctx, client, instanceName)
Expand Down Expand Up @@ -573,7 +574,7 @@ func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr.
// But we can still trigger search
if targetMovie.HasFile && targetMovie.MovieFile != nil {
// Try to blocklist the release associated with this file
if err := m.blocklistRadarrMovieFile(ctx, client, targetMovie.ID, targetMovie.MovieFile.ID); err != nil {
if err := m.blocklistRadarrMovieFile(ctx, client, targetMovie.ID, targetMovie.MovieFile.ID, downloadID); err != nil {
slog.WarnContext(ctx, "Failed to blocklist Radarr release", "error", err)
}

Expand Down Expand Up @@ -611,7 +612,7 @@ func (m *Manager) triggerRadarrRescanByPath(ctx context.Context, client *radarr.
}

// triggerSonarrRescanByPath triggers a rescan in Sonarr for the given file path
func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr.Sonarr, filePath, relativePath, instanceName string) error {
func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr.Sonarr, filePath, relativePath, instanceName string, downloadID string) error {
cfg := m.configGetter()

// Get library directory from health config
Expand All @@ -624,7 +625,8 @@ func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr.
"instance", instanceName,
"file_path", filePath,
"relative_path", relativePath,
"library_dir", libraryDir)
"library_dir", libraryDir,
"download_id", downloadID)

// Get all series to find the one that contains this file path
series, err := m.data.GetSeries(ctx, client, instanceName)
Expand Down Expand Up @@ -738,7 +740,7 @@ func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr.
"episode_file_id", targetEpisodeFile.ID)

// Try to blocklist the release associated with this file
if err := m.blocklistSonarrEpisodeFile(ctx, client, targetSeries.ID, targetEpisodeFile.ID); err != nil {
if err := m.blocklistSonarrEpisodeFile(ctx, client, targetSeries.ID, targetEpisodeFile.ID, downloadID); err != nil {
slog.WarnContext(ctx, "Failed to blocklist Sonarr release", "error", err)
}

Expand Down Expand Up @@ -843,35 +845,50 @@ func (m *Manager) failSonarrQueueItemByPath(ctx context.Context, client *sonarr.
}

// blocklistRadarrMovieFile finds the history event for the given file and marks it as failed (blocklisting the release)
func (m *Manager) blocklistRadarrMovieFile(ctx context.Context, client *radarr.Radarr, movieID int64, fileID int64) error {
slog.DebugContext(ctx, "Attempting to find and blocklist release for movie file", "movie_id", movieID, "file_id", fileID)
func (m *Manager) blocklistRadarrMovieFile(ctx context.Context, client *radarr.Radarr, movieID int64, fileID int64, knownDownloadID string) error {
slog.DebugContext(ctx, "Attempting to find and blocklist release for movie file", "movie_id", movieID, "file_id", fileID, "known_download_id", knownDownloadID)

// Fetch history for this specific movie
req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend}
req.Set("movieId", strconv.FormatInt(movieID, 10))
var downloadID string = knownDownloadID
var history *radarr.History

history, err := client.GetHistoryPageContext(ctx, req)
if err != nil {
return fmt.Errorf("failed to fetch Radarr history: %w", err)
}
if downloadID == "" {
// Fetch history for this specific movie
req := &starr.PageReq{PageSize: 1000, SortKey: "date", SortDir: starr.SortDescend}
req.Set("movieId", strconv.FormatInt(movieID, 10))

var err error
history, err = client.GetHistoryPageContext(ctx, req)
if err != nil {
return fmt.Errorf("failed to fetch Radarr history: %w", err)
}

targetFileID := strconv.FormatInt(fileID, 10)
var downloadID string
targetFileID := strconv.FormatInt(fileID, 10)

// 1. Find the import event to get the downloadId
for _, record := range history.Records {
if record.Data.FileID == targetFileID && (record.EventType == "movieFileImported" || record.EventType == "downloadFolderImported") {
downloadID = record.DownloadID
break
// 1. Find the import event to get the downloadId
for _, record := range history.Records {
if record.Data.FileID == targetFileID && (record.EventType == "movieFileImported" || record.EventType == "downloadFolderImported") {
downloadID = record.DownloadID
break
}
}
}

if downloadID == "" {
slog.WarnContext(ctx, "Could not find import event in Radarr history for file", "movie_id", movieID, "file_id", fileID)
return nil
if downloadID == "" {
slog.WarnContext(ctx, "Could not find import event in Radarr history for file", "movie_id", movieID, "file_id", fileID)
return nil
}
}

// 2. Find the original grab event using the downloadId
if history == nil {
req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend}
req.Set("movieId", strconv.FormatInt(movieID, 10))
var err error
history, err = client.GetHistoryPageContext(ctx, req)
if err != nil {
return fmt.Errorf("failed to fetch Radarr history: %w", err)
}
}

for _, record := range history.Records {
if record.DownloadID == downloadID && record.EventType == "grabbed" {
slog.InfoContext(ctx, "Found grabbed history record, marking as failed to blocklist release",
Expand All @@ -888,35 +905,50 @@ func (m *Manager) blocklistRadarrMovieFile(ctx context.Context, client *radarr.R
}

// blocklistSonarrEpisodeFile finds the grabbed history event for the given file and marks it as failed (blocklisting the release)
func (m *Manager) blocklistSonarrEpisodeFile(ctx context.Context, client *sonarr.Sonarr, seriesID int64, fileID int64) error {
slog.DebugContext(ctx, "Attempting to find and blocklist release for episode file", "series_id", seriesID, "file_id", fileID)
func (m *Manager) blocklistSonarrEpisodeFile(ctx context.Context, client *sonarr.Sonarr, seriesID int64, fileID int64, knownDownloadID string) error {
slog.DebugContext(ctx, "Attempting to find and blocklist release for episode file", "series_id", seriesID, "file_id", fileID, "known_download_id", knownDownloadID)

// Fetch history for this specific series
req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend}
req.Set("seriesId", strconv.FormatInt(seriesID, 10))
var downloadID string = knownDownloadID
var history *sonarr.History

history, err := client.GetHistoryPageContext(ctx, req)
if err != nil {
return fmt.Errorf("failed to fetch Sonarr history: %w", err)
}
if downloadID == "" {
// Fetch history for this specific series
req := &starr.PageReq{PageSize: 1000, SortKey: "date", SortDir: starr.SortDescend}
req.Set("seriesId", strconv.FormatInt(seriesID, 10))

targetFileID := strconv.FormatInt(fileID, 10)
var downloadID string
var err error
history, err = client.GetHistoryPageContext(ctx, req)
if err != nil {
return fmt.Errorf("failed to fetch Sonarr history: %w", err)
}

// 1. Find the import event to get the downloadId
for _, record := range history.Records {
if record.Data.FileID == targetFileID && record.EventType == "downloadFolderImported" {
downloadID = record.DownloadID
break
targetFileID := strconv.FormatInt(fileID, 10)

// 1. Find the import event to get the downloadId
for _, record := range history.Records {
if record.Data.FileID == targetFileID && record.EventType == "downloadFolderImported" {
downloadID = record.DownloadID
break
}
}
}

if downloadID == "" {
slog.WarnContext(ctx, "Could not find import event in Sonarr history for file", "series_id", seriesID, "file_id", fileID)
return nil
if downloadID == "" {
slog.WarnContext(ctx, "Could not find import event in Sonarr history for file", "series_id", seriesID, "file_id", fileID)
return nil
}
}

// 2. Find the original grab event using the downloadId
if history == nil {
req := &starr.PageReq{PageSize: 100, SortKey: "date", SortDir: starr.SortDescend}
req.Set("seriesId", strconv.FormatInt(seriesID, 10))
var err error
history, err = client.GetHistoryPageContext(ctx, req)
if err != nil {
return fmt.Errorf("failed to fetch Sonarr history: %w", err)
}
}

for _, record := range history.Records {
if record.DownloadID == downloadID && record.EventType == "grabbed" {
slog.InfoContext(ctx, "Found grabbed history record, marking as failed to blocklist release",
Expand Down
Loading
Loading