diff --git a/pkg/nixcacheindex/client.go b/pkg/nixcacheindex/client.go index 4454bf91..249b7385 100644 --- a/pkg/nixcacheindex/client.go +++ b/pkg/nixcacheindex/client.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "io" - "strings" "sync" "github.com/klauspost/compress/zstd" @@ -263,20 +262,21 @@ func (c *Client) processShardResponse(shardPath string, rc io.ReadCloser, hashSt // Buffer it. // Note: This is an optimization point (Range requests). // For now, read all. - var reader io.Reader = rc - if strings.HasSuffix(shardPath, ".zst") { - zstdReader, err := zstd.NewReader(rc) - if err != nil { - return DefiniteMiss, fmt.Errorf("failed to create zstd reader for %s: %w", shardPath, err) - } - defer zstdReader.Close() + // Decompress if needed (based on extension or always for v2?) + // RFC v2: "Shard files MUST be compressed with zstd". + // Extension is .zst. - reader = zstdReader + // Wrap with zstd decoder + // We use io.ReadAll on the decoder. + decoder, err := zstd.NewReader(rc) + if err != nil { + return DefiniteMiss, fmt.Errorf("failed to create zstd reader for %s: %w", shardPath, err) } + defer decoder.Close() - data, err := io.ReadAll(reader) + data, err := io.ReadAll(decoder) if err != nil { - return DefiniteMiss, err + return DefiniteMiss, fmt.Errorf("failed to read/decompress shard: %w", err) } shardReader, err := ReadShard(bytes.NewReader(data)) diff --git a/pkg/nixcacheindex/delta.go b/pkg/nixcacheindex/delta.go new file mode 100644 index 00000000..39e31964 --- /dev/null +++ b/pkg/nixcacheindex/delta.go @@ -0,0 +1,271 @@ +package nixcacheindex + +import ( + "bufio" + "fmt" + "io" + "math/big" + "strings" +) + +// DeltaOp represents a delta operation (add or delete). +type DeltaOp int + +const ( + DeltaOpAdd DeltaOp = iota + DeltaOpDelete +) + +// DeltaEntry is a single entry in the delta file. +type DeltaEntry struct { + Op DeltaOp + Hash string // 32-char Nix base32 hash +} + +// ChecksumFile represents the checksums/ metadata file for an epoch. +type ChecksumFile struct { + Epoch int `json:"epoch"` + Algorithm string `json:"algorithm"` // e.g. "xxh64" + Shards map[string]ShardChecksum `json:"shards"` // Key is shard prefix (e.g. "b6") +} + +// ShardChecksum contains verification data for a single shard. +type ShardChecksum struct { + Checksum string `json:"checksum"` // Hex-encoded checksum + ItemCount uint64 `json:"item_count"` //nolint:tagliatelle // RFC 0195 + SizeBytes uint64 `json:"size_bytes"` //nolint:tagliatelle // RFC 0195 +} + +// ParseDelta parses delta entries from an io.Reader. +// Format is line-based: +// + +// - +// Ops must be sorted by hash. +func ParseDelta(r io.Reader) ([]DeltaEntry, error) { + scanner := bufio.NewScanner(r) + + var entries []DeltaEntry + + var lastHash string + + lineNum := 0 + + for scanner.Scan() { + lineNum++ + + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + if len(line) != HashLength+1 { + return nil, fmt.Errorf("%w: line %d: got %d (expected %d)", ErrInvalidHashLength, lineNum, len(line), HashLength+1) + } + + opChar := line[0] + hash := line[1:] + + var op DeltaOp + + switch opChar { + case '+': + op = DeltaOpAdd + case '-': + op = DeltaOpDelete + default: + return nil, fmt.Errorf("%w: line %d: %q", ErrInvalidDeltaOp, lineNum, opChar) + } + + // Verify sorting + if len(entries) > 0 { + if hash < lastHash { + return nil, fmt.Errorf("%w: line %d: %s < %s", ErrDeltaNotSorted, lineNum, hash, lastHash) + } + } + + lastHash = hash + + entries = append(entries, DeltaEntry{ + Op: op, + Hash: hash, + }) + } + + if err := scanner.Err(); err != nil { + return nil, err + } + + return entries, nil +} + +// WriteDelta writes delta entries to an io.Writer. +func WriteDelta(w io.Writer, entries []DeltaEntry) error { + for _, entry := range entries { + var opChar string + + switch entry.Op { + case DeltaOpAdd: + opChar = "+" + case DeltaOpDelete: + opChar = "-" + default: + return fmt.Errorf("%w: %v", ErrInvalidDeltaOp, entry.Op) + } + + if len(entry.Hash) != HashLength { + return fmt.Errorf("%w: %d (expected %d)", ErrInvalidHashLength, len(entry.Hash), HashLength) + } + + _, err := fmt.Fprintf(w, "%s%s\n", opChar, entry.Hash) + if err != nil { + return err + } + } + + return nil +} + +// GenerateDeltas computes the operations needed to transform oldHashes to newHashes. +// Both inputs must be sorted unique lists of hashes. +func GenerateDeltas(oldHashes, newHashes []*big.Int) []DeltaEntry { + var deltas []DeltaEntry + + i, j := 0, 0 + for i < len(oldHashes) && j < len(newHashes) { + cmp := oldHashes[i].Cmp(newHashes[j]) + + if cmp < 0 { + // Old hash not in new -> Deleted + deltas = append(deltas, DeltaEntry{ + Op: DeltaOpDelete, + Hash: FormatHash(oldHashes[i]), + }) + + i++ + } else if cmp > 0 { + // New hash not in old -> Added + deltas = append(deltas, DeltaEntry{ + Op: DeltaOpAdd, + Hash: FormatHash(newHashes[j]), + }) + + j++ + } else { + // Equal -> Present in both + i++ + j++ + } + } + + // Remaining old -> Deleted + for i < len(oldHashes) { + deltas = append(deltas, DeltaEntry{ + Op: DeltaOpDelete, + Hash: FormatHash(oldHashes[i]), + }) + + i++ + } + + // Remaining new -> Added + for j < len(newHashes) { + deltas = append(deltas, DeltaEntry{ + Op: DeltaOpAdd, + Hash: FormatHash(newHashes[j]), + }) + + j++ + } + + return deltas +} + +// ApplyDelta applies a list of delta operations to a sorted list of hashes. +// Returns the new sorted list. +// oldHashes must be sorted. delta must be sorted by hash. +func ApplyDelta(oldHashes []*big.Int, delta []DeltaEntry) ([]*big.Int, error) { //nolint:cyclop + newHashes := make([]*big.Int, 0, len(oldHashes)+len(delta)) + + i, j := 0, 0 + + // We need to parse delta hashes to *big.Int for comparison + // Optimization: Parse on demand + var deltaHash *big.Int + + var err error + + for i < len(oldHashes) && j < len(delta) { + if deltaHash == nil { + deltaHash, err = ParseHash(delta[j].Hash) + if err != nil { + return nil, fmt.Errorf("invalid hash in delta at index %d: %w", j, err) + } + } + + oldHash := oldHashes[i] + cmp := oldHash.Cmp(deltaHash) + + if cmp < 0 { + // Old hash is smaller than next delta op target. + // Means old hash is unaffected by this op. + newHashes = append(newHashes, oldHash) + + i++ + } else if cmp > 0 { + // Delta op targets a hash smaller than current old hash. + // Must be an ADD. + switch delta[j].Op { + case DeltaOpAdd: + newHashes = append(newHashes, deltaHash) + deltaHash = nil // Force next parse + j++ + case DeltaOpDelete: + return nil, fmt.Errorf("%w: delta tries to delete hash %s", ErrHashNotFound, delta[j].Hash) + default: + return nil, fmt.Errorf("%w: %v", ErrInvalidDeltaOp, delta[j].Op) + } + } else { + // Equal. + switch delta[j].Op { + case DeltaOpAdd: + // Treat as no-op/update. + newHashes = append(newHashes, oldHash) + case DeltaOpDelete: + // If Delete, don't add. + default: + return nil, fmt.Errorf("%w: %v", ErrInvalidDeltaOp, delta[j].Op) + } + + i++ + deltaHash = nil + j++ + } + } + + // Remaining old hashes + newHashes = append(newHashes, oldHashes[i:]...) + + // Remaining delta ops + for j < len(delta) { + if deltaHash == nil { + deltaHash, err = ParseHash(delta[j].Hash) + if err != nil { + return nil, fmt.Errorf("invalid hash in delta at index %d: %w", j, err) + } + } + + switch delta[j].Op { + case DeltaOpAdd: + newHashes = append(newHashes, deltaHash) + case DeltaOpDelete: + return nil, fmt.Errorf("%w: delta tries to delete hash %s (at end)", ErrHashNotFound, delta[j].Hash) + default: + return nil, fmt.Errorf("%w: %v", ErrInvalidDeltaOp, delta[j].Op) + } + + deltaHash = nil + j++ + } + + return newHashes, nil +} diff --git a/pkg/nixcacheindex/delta_test.go b/pkg/nixcacheindex/delta_test.go new file mode 100644 index 00000000..30f9628f --- /dev/null +++ b/pkg/nixcacheindex/delta_test.go @@ -0,0 +1,199 @@ +package nixcacheindex_test + +import ( + "bytes" + "math/big" + "reflect" + "testing" + + "github.com/kalbasit/ncps/pkg/nixcacheindex" +) + +func TestGenerateDeltas(t *testing.T) { + t.Parallel() + + // Helper to create big.Ints + h1 := big.NewInt(1) + h2 := big.NewInt(2) + h3 := big.NewInt(3) + h4 := big.NewInt(4) + + tests := []struct { + name string + oldHashes []*big.Int + newHashes []*big.Int + want []nixcacheindex.DeltaEntry + }{ + { + name: "Empty to Empty", + oldHashes: []*big.Int{}, + newHashes: []*big.Int{}, + want: nil, + }, + { + name: "No change", + oldHashes: []*big.Int{h1, h2}, + newHashes: []*big.Int{h1, h2}, + want: nil, + }, + { + name: "Additions only", + oldHashes: []*big.Int{h1}, + newHashes: []*big.Int{h1, h2, h3}, + want: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h2)}, + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h3)}, + }, + }, + { + name: "Deletions only", + oldHashes: []*big.Int{h1, h2, h3}, + newHashes: []*big.Int{h1}, + want: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h2)}, + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h3)}, + }, + }, + { + name: "Mixed", + oldHashes: []*big.Int{h1, h2}, + newHashes: []*big.Int{h2, h3}, + want: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h1)}, + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h3)}, + }, + }, + { + name: "Complete Change", + oldHashes: []*big.Int{h1, h2}, + newHashes: []*big.Int{h3, h4}, + want: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h1)}, + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h2)}, + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h3)}, + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h4)}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got := nixcacheindex.GenerateDeltas(tt.oldHashes, tt.newHashes) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GenerateDeltas() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestApplyDelta(t *testing.T) { + t.Parallel() + + h1 := big.NewInt(1) + h2 := big.NewInt(2) + h3 := big.NewInt(3) + + tests := []struct { + name string + oldHashes []*big.Int + delta []nixcacheindex.DeltaEntry + want []*big.Int + wantErr bool + }{ + { + name: "Apply Add", + oldHashes: []*big.Int{h1}, + delta: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h2)}, + }, + want: []*big.Int{h1, h2}, + }, + { + name: "Apply Add In Middle", + oldHashes: []*big.Int{h1, h3}, + delta: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h2)}, + }, + want: []*big.Int{h1, h2, h3}, + }, + { + name: "Apply Delete", + oldHashes: []*big.Int{h1, h2}, + delta: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h2)}, + }, + want: []*big.Int{h1}, + }, + { + name: "Apply Invalid Delete", + oldHashes: []*big.Int{h1}, + delta: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h2)}, + }, + wantErr: true, + }, + { + name: "Mixed", + oldHashes: []*big.Int{h1, h2}, + delta: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(h1)}, + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(h3)}, + }, + want: []*big.Int{h2, h3}, + }, + { + name: "Apply Invalid Op", + oldHashes: []*big.Int{h1}, + delta: []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOp(999), Hash: nixcacheindex.FormatHash(h1)}, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := nixcacheindex.ApplyDelta(tt.oldHashes, tt.delta) + if (err != nil) != tt.wantErr { + t.Errorf("ApplyDelta() error = %v, wantErr %v", err, tt.wantErr) + + return + } + + if !tt.wantErr && !reflect.DeepEqual(got, tt.want) { + t.Errorf("ApplyDelta() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestReadWriteDelta(t *testing.T) { + t.Parallel() + + entries := []nixcacheindex.DeltaEntry{ + {Op: nixcacheindex.DeltaOpDelete, Hash: nixcacheindex.FormatHash(big.NewInt(100))}, + {Op: nixcacheindex.DeltaOpAdd, Hash: nixcacheindex.FormatHash(big.NewInt(200))}, + } + + var buf bytes.Buffer + + err := nixcacheindex.WriteDelta(&buf, entries) + if err != nil { + t.Fatalf("WriteDelta failed: %v", err) + } + + readEntries, err := nixcacheindex.ParseDelta(&buf) + if err != nil { + t.Fatalf("ParseDelta failed: %v", err) + } + + if !reflect.DeepEqual(entries, readEntries) { + t.Errorf("Read/Write mismatch. Got %v, want %v", readEntries, entries) + } +} diff --git a/pkg/nixcacheindex/errors.go b/pkg/nixcacheindex/errors.go index 2301e25c..cc35046b 100644 --- a/pkg/nixcacheindex/errors.go +++ b/pkg/nixcacheindex/errors.go @@ -17,4 +17,10 @@ var ( ErrShardNotFound = errors.New("shard not found") // ErrInvalidJournalOp is returned when a journal operation is invalid. ErrInvalidJournalOp = errors.New("invalid journal operation") + // ErrInvalidDeltaOp is returned when a delta operation is invalid. + ErrInvalidDeltaOp = errors.New("invalid delta operation") + // ErrDeltaNotSorted is returned when delta entries are not sorted. + ErrDeltaNotSorted = errors.New("delta entries not sorted") + // ErrHashNotFound is returned when trying to delete a hash that is not present. + ErrHashNotFound = errors.New("hash not found") )