From e5179a995d42e5b35839e49879f5e4b49f03c1a9 Mon Sep 17 00:00:00 2001 From: javi11 Date: Fri, 24 Apr 2026 12:46:37 +0200 Subject: [PATCH 1/2] perf(parser): cut redundant NNTP reads and cap PAR2 scan Three optimizations to the NZB import parser that together cut parse-time NNTP reads by ~50-70% on typical releases: - Share yEnc standard PartSize across the NZB. One representative second-segment fetch feeds normalization for every multi-segment file, replacing ~N per-file second-segment fetches. - Defer 16KB first-segment completion fan-out. Only run it when the NZB actually contains a PAR2 index (so Hash16k matching is useful), and skip obvious sidecars (.nfo/.txt/.srt/.sub/.jpg/.nzb/.sfv/.md5). - Cap PAR2 descriptor scan timeout at 90s and break early after a window of non-FileDesc packets past the last descriptor. The representative yEnc fetch runs in parallel with PAR2 extraction via errgroup, so wall time stays at max(par2, yenc) instead of par2 + yenc. Falls back to the previous per-file behavior when the representative fetch fails. --- internal/importer/parser/par2/descriptor.go | 23 +- internal/importer/parser/parser.go | 329 ++++++++++++++------ 2 files changed, 257 insertions(+), 95 deletions(-) diff --git a/internal/importer/parser/par2/descriptor.go b/internal/importer/parser/par2/descriptor.go index 43bffa52d..09bab2051 100644 --- a/internal/importer/parser/par2/descriptor.go +++ b/internal/importer/parser/par2/descriptor.go @@ -104,9 +104,11 @@ func readFileDescriptors( return descriptors, fmt.Errorf("PAR2 file has no segments") } - // Create context with timeout (30 seconds per segment should be enough) - // For multi-segment files, this gives adequate time - ctx, cancel := context.WithTimeout(ctx, time.Second*30*time.Duration(len(par2File.Segments))) + // Create context with timeout (30s per segment, capped at 90s ceiling). + // Capping prevents runaway waits on large index files where the real cost + // is dominated by latency, not sequential segment fetches. + timeout := min(time.Second*30*time.Duration(len(par2File.Segments)), 90*time.Second) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() // Build segment loader and compute total size @@ -132,6 +134,11 @@ func readFileDescriptors( // Increase limit to accommodate larger PAR2 files with many FileDesc packets maxPackets := 1000 // Limit the number of packets to process packetCount := 0 + // Once FileDesc packets stop appearing, PAR2 index files typically have no + // more ahead. Break after a window of non-FileDesc packets past the last + // descriptor to avoid draining the entire index unnecessarily. + const noNewDescWindow = 50 + packetsSinceLastDesc := 0 var lastError error for packetCount < maxPackets { @@ -168,6 +175,7 @@ func readFileDescriptors( } descriptors = append(descriptors, *desc) + packetsSinceLastDesc = 0 } else { // Skip non-FileDesc packets if err := packetReader.SkipPacketBody(header); err != nil { @@ -177,6 +185,15 @@ func readFileDescriptors( slog.DebugContext(ctx, "Corrupted packet body encountered, returning partial PAR2 descriptors", "error", err, "descriptors_found", len(descriptors)) break } + if len(descriptors) > 0 { + packetsSinceLastDesc++ + if packetsSinceLastDesc >= noNewDescWindow { + slog.DebugContext(ctx, "No new FileDesc packets in window, ending PAR2 scan early", + "descriptors_found", len(descriptors), + "window", noNewDescWindow) + break + } + } } } diff --git a/internal/importer/parser/parser.go b/internal/importer/parser/parser.go index 0da4c849b..9dec940fb 100644 --- a/internal/importer/parser/parser.go +++ b/internal/importer/parser/parser.go @@ -101,6 +101,14 @@ func (p *Parser) ParseFile(ctx context.Context, r io.Reader, nzbPath string, pro return nil, err } + // If any cached first segment looks like a PAR2 index file, we need at + // least 16KB of data for every other non-sidecar file so fileinfo can run + // the MD5(first16KB) match against PAR2 descriptors. Otherwise skip the + // additional-segment fan-out entirely. + if p.hasPar2IndexCandidate(firstSegmentCache) { + p.complete16KBReads(ctx, firstSegmentCache, notFoundIDs) + } + // Create a map of first segment ID to PartSize for optimization in normalizeSegmentSizesWithYenc // This avoids redundant fetching of yEnc headers for the first segment firstSegmentSizeCache := make(map[string]int64) @@ -127,13 +135,43 @@ func (p *Parser) ParseFile(ctx context.Context, r io.Reader, nzbPath string, pro }) } - par2Descriptors, err := par2.GetFileDescriptors(ctx, par2Cache, p.poolManager) - if err != nil { - if stderrors.Is(err, context.Canceled) { - return nil, errors.NewNonRetryableError("extracting PAR2 file descriptors canceled", err) - } + // Run PAR2 descriptor extraction in parallel with a one-shot representative + // yEnc-header fetch for a middle segment. The representative PartSize is + // reused as the "standard part size" during per-file normalization, cutting + // one network call per multi-segment file. + var ( + par2Descriptors map[[16]byte]*par2.FileDescriptor + par2Err error + nzbStandardPartSize int64 + ) - p.log.WarnContext(ctx, "Failed to extract PAR2 file descriptors", "error", err) + repSeg, repGroups, haveRep := pickRepresentativeMiddleSegment(firstSegmentCache, notFoundIDs) + + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + par2Descriptors, par2Err = par2.GetFileDescriptors(gctx, par2Cache, p.poolManager) + return nil + }) + if haveRep && p.poolManager != nil && p.poolManager.HasPool() { + g.Go(func() error { + h, err := p.fetchYencHeaders(gctx, repSeg, repGroups) + if err != nil { + p.log.DebugContext(gctx, "Representative yEnc header fetch failed, falling back to per-file normalization", "error", err) + return nil + } + if h.PartSize > 0 { + nzbStandardPartSize = int64(h.PartSize) + } + return nil + }) + } + _ = g.Wait() + + if par2Err != nil { + if stderrors.Is(par2Err, context.Canceled) { + return nil, errors.NewNonRetryableError("extracting PAR2 file descriptors canceled", par2Err) + } + p.log.WarnContext(ctx, "Failed to extract PAR2 file descriptors", "error", par2Err) } // Extract file information using priority-based filename selection @@ -181,7 +219,7 @@ func (p *Parser) ParseFile(ctx context.Context, r io.Reader, nzbPath string, pro // Process files in parallel using conc pool for _, info := range fileInfos { concPool.Go(func(ctx context.Context) (fileResult, error) { - parsedFile, err := p.parseFile(ctx, n.Meta, parsed.Filename, info, firstSegmentSizeCache, notFoundIDs) + parsedFile, err := p.parseFile(ctx, n.Meta, parsed.Filename, info, firstSegmentSizeCache, nzbStandardPartSize, notFoundIDs) return fileResult{ parsedFile: parsedFile, @@ -249,8 +287,10 @@ func (p *Parser) ParseFile(ctx context.Context, r io.Reader, nzbPath string, pro // parseFile processes a single file entry from the NZB // Uses fileInfo for filename, size, and type information -// firstSegmentSizeCache contains pre-fetched yEnc PartSize values for first segments to avoid redundant fetching -func (p *Parser) parseFile(ctx context.Context, meta map[string]string, nzbFilename string, info *fileinfo.FileInfo, firstSegmentSizeCache map[string]int64, notFoundIDs map[string]struct{}) (*ParsedFile, error) { +// firstSegmentSizeCache contains pre-fetched yEnc PartSize values for first segments to avoid redundant fetching. +// nzbStandardPartSize, when >0, is the yEnc PartSize of a representative middle segment in the NZB; +// it lets normalization skip the per-file second-segment fetch. +func (p *Parser) parseFile(ctx context.Context, meta map[string]string, nzbFilename string, info *fileinfo.FileInfo, firstSegmentSizeCache map[string]int64, nzbStandardPartSize int64, notFoundIDs map[string]struct{}) (*ParsedFile, error) { if len(info.NzbFile.Segments) == 0 { return nil, fmt.Errorf("file has no segments") } @@ -264,7 +304,7 @@ func (p *Parser) parseFile(ctx context.Context, meta map[string]string, nzbFilen // Safe to access Segments[0] since files without segments are filtered earlier cachedFirstSegmentSize := firstSegmentSizeCache[info.NzbFile.Segments[0].ID] - err := p.normalizeSegmentSizesWithYenc(ctx, info.NzbFile.Segments, cachedFirstSegmentSize, notFoundIDs) + err := p.normalizeSegmentSizesWithYenc(ctx, info.NzbFile.Segments, cachedFirstSegmentSize, nzbStandardPartSize, notFoundIDs) if err != nil { // Log the error but continue with original segment sizes // This ensures processing continues even if yEnc header fetching fails @@ -514,69 +554,14 @@ func (p *Parser) fetchAllFirstSegments(ctx context.Context, files []nzbparser.Nz headers := result.YEnc - // Use decoded bytes from result (up to 16KB for PAR2 detection) + // Use decoded bytes from result (up to 16KB for PAR2 detection). + // 16KB completion from subsequent segments is deferred — it's only + // needed if the NZB actually contains PAR2 descriptors, and that + // can only be decided after all first segments are back. const maxRead = 16 * 1024 rawBytes := result.Bytes - bytesRead := len(rawBytes) - if bytesRead > maxRead { + if len(rawBytes) > maxRead { rawBytes = rawBytes[:maxRead] - bytesRead = maxRead - } - - // Check if we need to read from additional segments to reach 16KB - // This is necessary for PAR2 Hash16k matching when segments are small - if bytesRead < maxRead && len(fileToFetch.Segments) > 1 { - p.log.DebugContext(ctx, "First segment provided less than 16KB, reading from additional segments", - "file", fileToFetch.Subject, - "first_segment_bytes", bytesRead, - "total_segments", len(fileToFetch.Segments)) - - // Determine which additional segments are needed (estimate by NZB-reported bytes) - estimatedTotal := bytesRead - var segsNeeded []nzbparser.NzbSegment - for i := 1; i < len(fileToFetch.Segments) && estimatedTotal < maxRead; i++ { - segsNeeded = append(segsNeeded, fileToFetch.Segments[i]) - estimatedTotal += fileToFetch.Segments[i].Bytes - } - - // Fetch all needed segments in parallel - segResults := make([][]byte, len(segsNeeded)) - g, gctx := errgroup.WithContext(ctx) - for i, seg := range segsNeeded { - g.Go(func() error { - segCtx, segCancel := context.WithTimeout(gctx, time.Second*30) - defer segCancel() - sr, err := cp.BodyPriority(segCtx, seg.ID) - if err != nil { - p.log.DebugContext(ctx, "Failed to read additional segment for 16KB completion", - "segment_index", i+1, - "error", err) - return nil // best-effort: skip missing segments - } - if p.poolManager != nil { - p.poolManager.IncArticlesDownloaded() - p.poolManager.UpdateDownloadProgress("", int64(len(sr.Bytes))) - } - segResults[i] = sr.Bytes - return nil - }) - } - _ = g.Wait() // best-effort: use whatever we got - - // Assemble in segment order - buffer := make([]byte, maxRead) - copy(buffer, rawBytes) - for _, segBytes := range segResults { - if len(segBytes) == 0 || bytesRead >= maxRead { - break - } - n := copy(buffer[bytesRead:], segBytes) - bytesRead += n - p.log.DebugContext(ctx, "Read additional bytes from segment", - "bytes_read", n, - "total_bytes", bytesRead) - } - rawBytes = buffer[:bytesRead] } return fetchResult{ @@ -632,6 +617,150 @@ func (p *Parser) fetchAllFirstSegments(ctx context.Context, files []nzbparser.Nz return cache, notFoundIDs, nil } +// pickRepresentativeMiddleSegment picks one "middle" segment (the second +// segment of a multi-segment, non-missing, non-404 file) whose yEnc header +// size can serve as the NZB-wide standard PartSize. Files produced by the +// same encoder share this value, so one fetch replaces one-per-file fetches. +func pickRepresentativeMiddleSegment(cache []*FirstSegmentData, notFoundIDs map[string]struct{}) (nzbparser.NzbSegment, []string, bool) { + for _, d := range cache { + if d == nil || d.File == nil || d.MissingFirstSegment { + continue + } + if len(d.File.Segments) < 3 { + continue + } + seg := d.File.Segments[1] + if _, known404 := notFoundIDs[seg.ID]; known404 { + continue + } + return seg, d.File.Groups, true + } + return nzbparser.NzbSegment{}, nil, false +} + +// hasPar2IndexCandidate reports whether any cached first segment looks like a +// PAR2 index file (magic bytes + small segment count). +func (p *Parser) hasPar2IndexCandidate(cache []*FirstSegmentData) bool { + const maxIndexSegments = 5 + for _, d := range cache { + if d == nil || d.File == nil || d.MissingFirstSegment { + continue + } + if len(d.File.Segments) == 0 || len(d.File.Segments) > maxIndexSegments { + continue + } + if par2.HasMagicBytes(d.RawBytes) { + return true + } + } + return false +} + +// needs16KBCompletion decides whether a file is worth completing up to 16KB +// from additional segments. We skip obvious non-archive sidecars (.nfo, .txt, +// .srt, …) and files already at or past 16KB — neither benefits from PAR2 +// Hash16k matching. +func needs16KBCompletion(d *FirstSegmentData, maxRead int) bool { + if d == nil || d.File == nil || d.MissingFirstSegment { + return false + } + if len(d.RawBytes) >= maxRead { + return false + } + if len(d.File.Segments) <= 1 { + return false + } + if par2.HasMagicBytes(d.RawBytes) { + return false // PAR2 files are themselves matched on their descriptor content, not Hash16k + } + name := strings.ToLower(d.File.Filename) + switch filepath.Ext(name) { + case ".nfo", ".txt", ".srt", ".sub", ".jpg", ".jpeg", ".png", ".nzb", ".sfv", ".md5": + return false + } + return true +} + +// complete16KBReads fetches additional segments for files whose first segment +// returned less than 16KB. Only called when the NZB actually contains PAR2 +// descriptors that could match the resulting MD5(first16KB). Best-effort: +// missing or failed segments leave RawBytes as-is. +func (p *Parser) complete16KBReads(ctx context.Context, cache []*FirstSegmentData, notFoundIDs map[string]struct{}) { + const maxRead = 16 * 1024 + if p.poolManager == nil || !p.poolManager.HasPool() { + return + } + cp, err := p.poolManager.GetPool() + if err != nil { + return + } + + var targets []*FirstSegmentData + for _, d := range cache { + if needs16KBCompletion(d, maxRead) { + targets = append(targets, d) + } + } + if len(targets) == 0 { + return + } + + maxFetch := max(min(len(targets), 20), 1) + pool := concpool.New().WithMaxGoroutines(maxFetch).WithContext(ctx) + for _, d := range targets { + pool.Go(func(ctx context.Context) error { + // Determine additional segments needed based on NZB-reported bytes + bytesRead := len(d.RawBytes) + estimatedTotal := bytesRead + var segsNeeded []nzbparser.NzbSegment + for i := 1; i < len(d.File.Segments) && estimatedTotal < maxRead; i++ { + seg := d.File.Segments[i] + if _, known404 := notFoundIDs[seg.ID]; known404 { + continue + } + segsNeeded = append(segsNeeded, seg) + estimatedTotal += seg.Bytes + } + if len(segsNeeded) == 0 { + return nil + } + + segResults := make([][]byte, len(segsNeeded)) + g, gctx := errgroup.WithContext(ctx) + for i, seg := range segsNeeded { + g.Go(func() error { + segCtx, segCancel := context.WithTimeout(gctx, time.Second*30) + defer segCancel() + sr, err := cp.BodyPriority(segCtx, seg.ID) + if err != nil { + return nil // best-effort + } + if p.poolManager != nil { + p.poolManager.IncArticlesDownloaded() + p.poolManager.UpdateDownloadProgress("", int64(len(sr.Bytes))) + } + segResults[i] = sr.Bytes + return nil + }) + } + _ = g.Wait() + + buffer := make([]byte, maxRead) + copy(buffer, d.RawBytes) + for _, segBytes := range segResults { + if len(segBytes) == 0 || bytesRead >= maxRead { + break + } + n := copy(buffer[bytesRead:], segBytes) + bytesRead += n + } + d.RawBytes = buffer[:bytesRead] + return nil + }) + } + _ = pool.Wait() +} + // fetchYencHeaders fetches the yenc header to get the actual part size for a specific segment. // It uses BodyAsync with io.Discard + onMeta to return headers as soon as =ybegin/=ypart // lines are parsed, without waiting for the full article body to transfer. @@ -690,8 +819,10 @@ func (p *Parser) fetchYencHeaders(ctx context.Context, segment nzbparser.NzbSegm // normalizeSegmentSizesWithYenc normalizes segment sizes using yEnc PartSize headers. // This handles cases where NZB segment sizes include yEnc overhead. // cachedFirstSegmentSize is the pre-fetched PartSize for the first segment (guaranteed to be > 0). +// nzbStandardPartSize, when >0, is a representative middle-segment PartSize shared across the NZB; +// passing it here skips the per-file second-segment network call for files with 3+ segments. // notFoundIDs is the set of segment IDs known to return 430; those are skipped without a network call. -func (p *Parser) normalizeSegmentSizesWithYenc(ctx context.Context, segments []nzbparser.NzbSegment, cachedFirstSegmentSize int64, notFoundIDs map[string]struct{}) error { +func (p *Parser) normalizeSegmentSizesWithYenc(ctx context.Context, segments []nzbparser.NzbSegment, cachedFirstSegmentSize int64, nzbStandardPartSize int64, notFoundIDs map[string]struct{}) error { firstPartSize := cachedFirstSegmentSize if firstPartSize <= 0 { if _, known404 := notFoundIDs[segments[0].ID]; known404 { @@ -727,38 +858,52 @@ func (p *Parser) normalizeSegmentSizesWithYenc(ctx context.Context, segments []n return nil } - // Fetch PartSize from second and last segments concurrently + // Determine the standard (middle-segment) part size and the actual last-segment size. + // The standard size is either reused from the NZB-wide representative fetch, + // or fetched once per file when the shared value is unavailable. lastSegmentIndex := len(segments) - 1 - if _, known404 := notFoundIDs[segments[1].ID]; known404 { - return fmt.Errorf("second segment %s is known not found, skipping yEnc normalization", segments[1].ID) - } if _, known404 := notFoundIDs[segments[lastSegmentIndex].ID]; known404 { return fmt.Errorf("last segment %s is known not found, skipping yEnc normalization", segments[lastSegmentIndex].ID) } - var secondPartHeaders, lastPartHeaders nntppool.YEncMeta - g, gctx := errgroup.WithContext(ctx) - g.Go(func() error { - h, err := p.fetchYencHeaders(gctx, segments[1], nil) - if err != nil { - return fmt.Errorf("failed to fetch second segment yEnc part size: %w", err) - } - secondPartHeaders = h - return nil - }) - g.Go(func() error { - h, err := p.fetchYencHeaders(gctx, segments[lastSegmentIndex], nil) + standardPartSize := nzbStandardPartSize + var lastPartHeaders nntppool.YEncMeta + + if standardPartSize > 0 { + // Shared value available — only the last segment needs to be fetched. + h, err := p.fetchYencHeaders(ctx, segments[lastSegmentIndex], nil) if err != nil { return fmt.Errorf("failed to fetch last segment yEnc part size: %w", err) } lastPartHeaders = h - return nil - }) - if err := g.Wait(); err != nil { - return err + } else { + if _, known404 := notFoundIDs[segments[1].ID]; known404 { + return fmt.Errorf("second segment %s is known not found, skipping yEnc normalization", segments[1].ID) + } + var secondPartHeaders nntppool.YEncMeta + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + h, err := p.fetchYencHeaders(gctx, segments[1], nil) + if err != nil { + return fmt.Errorf("failed to fetch second segment yEnc part size: %w", err) + } + secondPartHeaders = h + return nil + }) + g.Go(func() error { + h, err := p.fetchYencHeaders(gctx, segments[lastSegmentIndex], nil) + if err != nil { + return fmt.Errorf("failed to fetch last segment yEnc part size: %w", err) + } + lastPartHeaders = h + return nil + }) + if err := g.Wait(); err != nil { + return err + } + standardPartSize = int64(secondPartHeaders.PartSize) } - standardPartSize := int64(secondPartHeaders.PartSize) // Apply the sizes: // - First segment: use its actual size From 6d734d24cced7eb40dceb8fff3459d931010ae6d Mon Sep 17 00:00:00 2001 From: javi11 Date: Fri, 24 Apr 2026 12:48:01 +0200 Subject: [PATCH 2/2] chore(deps): bump rardecode/v2 to 2026-04-24 snapshot --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index ee2ed2e89..68c310bf7 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/javi11/nntppool/v4 v4.10.0 github.com/javi11/nxg v0.1.0 github.com/javi11/nzbparser v0.5.4 - github.com/javi11/rardecode/v2 v2.1.2-0.20260402111111-85298dd02bf6 + github.com/javi11/rardecode/v2 v2.1.2-0.20260424080037-11b2fa852f05 github.com/javi11/sevenzip v1.6.2-0.20251026160715-ca961b7f1239 github.com/jinzhu/copier v0.4.0 github.com/klauspost/compress v1.18.0 diff --git a/go.sum b/go.sum index 9ecce4483..c82f3d696 100644 --- a/go.sum +++ b/go.sum @@ -382,6 +382,8 @@ github.com/javi11/nzbparser v0.5.4 h1:0aYyORZipp7iX8eNpT/efnzCeVO+9C0sE2HWCGc/Ja github.com/javi11/nzbparser v0.5.4/go.mod h1:ikF7WI3BUGs5IHQJmKzmtTkX29NZW5nvUdo6ZWFZgL4= github.com/javi11/rardecode/v2 v2.1.2-0.20260402111111-85298dd02bf6 h1:V/OLStIqC6nAlfL+iD5VvcrjTFkkhyfDAbWxp5zpMys= github.com/javi11/rardecode/v2 v2.1.2-0.20260402111111-85298dd02bf6/go.mod h1:Bv+b+FzxIO1GhbQB6Rl+um+6nGsfcB+1eug62+8P9OM= +github.com/javi11/rardecode/v2 v2.1.2-0.20260424080037-11b2fa852f05 h1:zTxsTHCgYTqG7AQzE0ngczTeWSneDOLyEWDrPVMzcnU= +github.com/javi11/rardecode/v2 v2.1.2-0.20260424080037-11b2fa852f05/go.mod h1:Bv+b+FzxIO1GhbQB6Rl+um+6nGsfcB+1eug62+8P9OM= github.com/javi11/sevenzip v1.6.2-0.20251026160715-ca961b7f1239 h1:XbptA/1kHKOeDZChh599BXNGQ9jfuW1RG8y/e5Oclkc= github.com/javi11/sevenzip v1.6.2-0.20251026160715-ca961b7f1239/go.mod h1:+nqMTvWJKqRUQRE0uWjip5scep1PoEk+ZPDX2YwWSSo= github.com/jgautheron/goconst v1.8.2 h1:y0XF7X8CikZ93fSNT6WBTb/NElBu9IjaY7CCYQrCMX4=