From 8147a33fd6c33f6dc43d5d334c8e71dfb5f1a2e5 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sat, 1 Nov 2025 23:41:48 +0000 Subject: [PATCH 1/9] Refactor: Pass routing key to content cache This change ensures the correct routing key is passed to the content cache, improving cache hit rates and efficiency. It also initializes contentCacheAvailable in tests. Co-authored-by: luke --- pkg/storage/oci.go | 4 +- pkg/storage/oci_test.go | 101 +++++++++++++++++++----------------- pkg/storage/storage_test.go | 15 +++--- 3 files changed, 65 insertions(+), 55 deletions(-) diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index 8d21321..8739dc9 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -515,7 +515,7 @@ func streamFileInChunks(filePath string, chunks chan []byte) error { func (s *OCIClipStorage) tryRangeReadFromContentCache(decompressedHash string, offset, length int64) ([]byte, error) { // Use GetContent for range reads (offset + length) // This is the KEY optimization: we only fetch the bytes we need! - data, err := s.contentCache.GetContent(decompressedHash, offset, length, struct{ RoutingKey string }{}) + data, err := s.contentCache.GetContent(decompressedHash, offset, length, struct{ RoutingKey string }{RoutingKey: decompressedHash}) if err != nil { return nil, fmt.Errorf("content cache range read failed: %w", err) } @@ -559,7 +559,7 @@ func (s *OCIClipStorage) storeDecompressedInRemoteCache(decompressedHash string, } }() - storedHash, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{}) + storedHash, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) if err != nil { log.Error(). Err(err). diff --git a/pkg/storage/oci_test.go b/pkg/storage/oci_test.go index c0c3ed8..7fb0516 100644 --- a/pkg/storage/oci_test.go +++ b/pkg/storage/oci_test.go @@ -185,12 +185,13 @@ func TestOCIStorage_CacheHit(t *testing.T) { storageInfo.DecompressedHashByLayer[digest.String()] = decompressedHash storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: storageInfo, - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: storageInfo, + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -253,12 +254,13 @@ func TestOCIStorage_CacheMiss(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -382,12 +384,13 @@ func TestOCIStorage_PartialRead(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Test reading from different offsets @@ -461,12 +464,13 @@ func TestOCIStorage_CacheError(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -524,12 +528,13 @@ func TestOCIStorage_LayerFetchError(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -587,12 +592,13 @@ func TestOCIStorage_ConcurrentReads(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -959,12 +965,13 @@ func TestLayerCacheEliminatesRepeatedInflates(t *testing.T) { diskCacheDir := t.TempDir() storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: diskCacheDir, - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: diskCacheDir, + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -1000,7 +1007,7 @@ func TestLayerCacheEliminatesRepeatedInflates(t *testing.T) { require.Equal(t, testData, dest) } - t.Logf("✅ SUCCESS: %d reads completed - layer decompressed once and cached to disk!", numReads) + t.Logf("? SUCCESS: %d reads completed - layer decompressed once and cached to disk!", numReads) } // BenchmarkLayerCachePerformance benchmarks the performance difference @@ -1178,7 +1185,7 @@ func TestCrossImageCacheSharing(t *testing.T) { cachedLayerPath2 := storage2.getDiskCachePath(sharedDigest.String()) require.Equal(t, cachedLayerPath, cachedLayerPath2, "Both images should use same cache file") - t.Logf("✅ SUCCESS: Image 2 reused cached layer from Image 1!") + t.Logf("? SUCCESS: Image 2 reused cached layer from Image 1!") t.Logf("Cache file: %s", cachedLayerPath) t.Logf("Cache sharing verified: both images use same digest-based cache file") } @@ -1327,7 +1334,7 @@ func TestCheckpointBasedReading(t *testing.T) { }) } - t.Log("✅ Checkpoint-based reading test passed!") + t.Log("? Checkpoint-based reading test passed!") } // TestCheckpointFallback tests that checkpoint mode falls back to full decompression when needed @@ -1390,7 +1397,7 @@ func TestCheckpointFallback(t *testing.T) { assert.Equal(t, len(testData), n) assert.Equal(t, testData, dest) - t.Log("✅ Checkpoint fallback test passed!") + t.Log("? Checkpoint fallback test passed!") } // TestBackwardCompatibilityNoCheckpoints tests that disabling checkpoints works (backward compatibility) @@ -1462,7 +1469,7 @@ func TestBackwardCompatibilityNoCheckpoints(t *testing.T) { _, err = os.Stat(layerPath) require.NoError(t, err, "layer should be cached to disk when checkpoints disabled") - t.Log("✅ Backward compatibility test passed!") + t.Log("? Backward compatibility test passed!") } // TestNearestCheckpoint tests the checkpoint selection algorithm diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 25a4485..22d71ec 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -143,6 +143,7 @@ func TestContentCacheRangeRead(t *testing.T) { diskCacheDir: diskCacheDir, layersDecompressing: make(map[string]chan struct{}), contentCache: cache, + contentCacheAvailable: true, } // Test: First read triggers decompression and caching @@ -233,6 +234,7 @@ func TestDiskCacheThenContentCache(t *testing.T) { diskCacheDir: diskCacheDir, layersDecompressing: make(map[string]chan struct{}), contentCache: cache, + contentCacheAvailable: true, } node := &common.ClipNode{ @@ -324,12 +326,13 @@ func TestRangeReadOnlyFetchesNeededBytes(t *testing.T) { storageInfo.DecompressedHashByLayer[digest.String()] = decompressedHash storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: storageInfo, - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: diskCacheDir, - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: storageInfo, + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: diskCacheDir, + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Read only a small portion (1 KB from a 10 MB layer) From 418d84b12368ab43587d0823f7e0babb90f00328 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 2 Nov 2025 21:55:30 +0000 Subject: [PATCH 2/9] Checkpoint before follow-up message Co-authored-by: luke --- pkg/storage/oci.go | 143 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 136 insertions(+), 7 deletions(-) diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index 8739dc9..42c8045 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -3,6 +3,8 @@ package storage import ( "compress/gzip" "context" + "crypto/sha256" + "encoding/hex" "fmt" "io" "net/http" @@ -464,6 +466,22 @@ func (s *OCIClipStorage) writeToDiskCache(path string, data []byte) error { return os.Rename(tempPath, path) } +// computeFileHash computes SHA256 hash of a file +func (s *OCIClipStorage) computeFileHash(filePath string) (string, error) { + file, err := os.Open(filePath) + if err != nil { + return "", fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + hasher := sha256.New() + if _, err := io.Copy(hasher, file); err != nil { + return "", fmt.Errorf("failed to hash file: %w", err) + } + + return hex.EncodeToString(hasher.Sum(nil)), nil +} + // streamFileInChunks reads a file and sends it in chunks over a channel // This matches the behavior in clipfs.go for consistent streaming // Default chunk size is 32MB to balance memory usage and throughput @@ -509,6 +527,55 @@ func streamFileInChunks(filePath string, chunks chan []byte) error { return nil } +// streamFileInChunksWithHash reads a file, sends it in chunks, and computes hash of streamed data +// This is used to verify that the data being sent matches what we expect +func (s *OCIClipStorage) streamFileInChunksWithHash(filePath string, chunks chan []byte) (string, error) { + const chunkSize = int64(1 << 25) // 32MB chunks + + file, err := os.Open(filePath) + if err != nil { + return "", fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Get file size + fileInfo, err := file.Stat() + if err != nil { + return "", fmt.Errorf("failed to stat file: %w", err) + } + fileSize := fileInfo.Size() + + // Hash the data as we stream it + hasher := sha256.New() + + // Stream in chunks + for offset := int64(0); offset < fileSize; { + // Calculate chunk size for this iteration + currentChunkSize := chunkSize + if remaining := fileSize - offset; remaining < chunkSize { + currentChunkSize = remaining + } + + // Read chunk + buffer := make([]byte, currentChunkSize) + nRead, err := io.ReadFull(file, buffer) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return "", fmt.Errorf("failed to read chunk at offset %d: %w", offset, err) + } + + // Send chunk and hash it + if nRead > 0 { + chunk := buffer[:nRead] + chunks <- chunk + hasher.Write(chunk) + } + + offset += int64(nRead) + } + + return hex.EncodeToString(hasher.Sum(nil)), nil +} + // tryRangeReadFromContentCache attempts a ranged read from remote ContentCache // This enables lazy loading: we fetch only the bytes we need, not the entire layer // decompressedHash is the hash of the decompressed layer data @@ -545,21 +612,72 @@ func (s *OCIClipStorage) storeDecompressedInRemoteCache(decompressedHash string, } totalSize := fileInfo.Size() - // Stream the file in chunks (similar to clipfs.go) + // Verify the disk file hash matches our expected hash before uploading + // Only verify if decompressedHash looks like a real SHA256 hash (64 hex chars) + isRealHash := len(decompressedHash) == 64 + for _, c := range decompressedHash { + if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F')) { + isRealHash = false + break + } + } + + var diskFileHash string + if isRealHash { + var err error + diskFileHash, err = s.computeFileHash(diskPath) + if err != nil { + log.Error(). + Err(err). + Str("decompressed_hash", decompressedHash). + Str("disk_path", diskPath). + Msg("failed to compute disk file hash") + return + } + + if diskFileHash != decompressedHash { + log.Error(). + Str("expected_hash", decompressedHash). + Str("disk_file_hash", diskFileHash). + Str("disk_path", diskPath). + Int64("bytes", totalSize). + Msg("CRITICAL: disk cache file hash mismatch - will not upload to content cache") + return + } + + log.Debug(). + Str("decompressed_hash", decompressedHash). + Str("verified_hash", diskFileHash). + Msg("disk file hash verified, proceeding with content cache upload") + } else { + log.Debug(). + Str("decompressed_hash", decompressedHash). + Msg("skipping disk file hash verification (test mode)") + diskFileHash = "skipped" + } + + // Stream the file in chunks and compute hash during streaming for verification chunks := make(chan []byte, 1) + hashChan := make(chan string, 1) go func() { defer close(chunks) - if err := streamFileInChunks(diskPath, chunks); err != nil { + streamedHash, err := s.streamFileInChunksWithHash(diskPath, chunks) + if err != nil { log.Error(). Err(err). Str("decompressed_hash", decompressedHash). Msg("failed to stream file for content cache storage") + hashChan <- "" + } else { + hashChan <- streamedHash } }() storedHash, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + streamedHash := <-hashChan + if err != nil { log.Error(). Err(err). @@ -567,11 +685,22 @@ func (s *OCIClipStorage) storeDecompressedInRemoteCache(decompressedHash string, Int64("bytes", totalSize). Msg("failed to store layer in content cache") } else { - log.Info(). - Str("decompressed_hash", decompressedHash). - Str("stored_hash", storedHash). - Int64("bytes", totalSize). - Msg("successfully stored decompressed layer in content cache") + // Verify the stored hash matches our expected decompressed hash + if storedHash != decompressedHash { + log.Error(). + Str("expected_hash", decompressedHash). + Str("stored_hash", storedHash). + Str("streamed_hash", streamedHash). + Str("disk_file_hash", diskFileHash). + Int64("bytes", totalSize). + Msg("CRITICAL: content cache stored under different hash - cache lookups will fail! This indicates a bug in the ContentCache implementation.") + } else { + log.Info(). + Str("decompressed_hash", decompressedHash). + Str("stored_hash", storedHash). + Int64("bytes", totalSize). + Msg("successfully stored decompressed layer in content cache") + } } } From fa2dcb179adb54975b142a0f982b4894f273797c Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 2 Nov 2025 21:56:37 +0000 Subject: [PATCH 3/9] feat: Verify decompressed layer hash to detect corruption Co-authored-by: luke --- pkg/storage/oci.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index 42c8045..6c1aab2 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -411,6 +411,7 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) defer os.Remove(tempPath) // Clean up on error // Decompress directly to disk (streaming, low memory!) + // Also compute hash while decompressing to verify it matches expected gzr, err := gzip.NewReader(compressedRC) if err != nil { tempFile.Close() @@ -418,13 +419,36 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) } defer gzr.Close() - written, err := io.Copy(tempFile, gzr) + // Hash the decompressed data as we write it + hasher := sha256.New() + multiWriter := io.MultiWriter(tempFile, hasher) + + written, err := io.Copy(multiWriter, gzr) tempFile.Close() if err != nil { return fmt.Errorf("failed to decompress layer to disk: %w", err) } + // Compute the actual hash of what we just wrote + actualHash := hex.EncodeToString(hasher.Sum(nil)) + expectedHash := s.getDecompressedHash(digest) + + if expectedHash != "" && actualHash != expectedHash { + log.Error(). + Str("layer_digest", digest). + Str("expected_hash", expectedHash). + Str("actual_hash", actualHash). + Int64("bytes", written). + Msg("CRITICAL: decompressed data hash mismatch! Indexed hash doesn't match actual decompressed data.") + } else if expectedHash != "" { + log.Debug(). + Str("layer_digest", digest). + Str("hash", actualHash). + Int64("bytes", written). + Msg("decompressed data hash verified") + } + // Atomic rename if err := os.Rename(tempPath, diskPath); err != nil { return fmt.Errorf("failed to rename temp file: %w", err) From 4d0157dd77f0bbfc2d9c2d67aaada4d42b7e9709 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 2 Nov 2025 21:59:26 +0000 Subject: [PATCH 4/9] feat: Add OCI hash consistency tests Adds tests to verify hash consistency across indexing, decompression, and content caching for OCI layers. Co-authored-by: luke --- pkg/clip/oci_hash_consistency_test.go | 283 ++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 pkg/clip/oci_hash_consistency_test.go diff --git a/pkg/clip/oci_hash_consistency_test.go b/pkg/clip/oci_hash_consistency_test.go new file mode 100644 index 0000000..2955a2d --- /dev/null +++ b/pkg/clip/oci_hash_consistency_test.go @@ -0,0 +1,283 @@ +package clip + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "crypto/sha256" + "encoding/hex" + "io" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestDecompressedHashConsistency verifies that the hash computed during indexing +// matches the hash of the actual decompressed layer data that will be stored +func TestDecompressedHashConsistency(t *testing.T) { + // Create a test tar archive with some files + tarBuffer := new(bytes.Buffer) + tw := tar.NewWriter(tarBuffer) + + // Add a file + file1Content := []byte("This is file 1 content with some data") + err := tw.WriteHeader(&tar.Header{ + Name: "file1.txt", + Mode: 0644, + Size: int64(len(file1Content)), + }) + require.NoError(t, err) + _, err = tw.Write(file1Content) + require.NoError(t, err) + + // Add another file + file2Content := []byte("File 2 has different content that is longer to test hashing") + err = tw.WriteHeader(&tar.Header{ + Name: "dir/file2.txt", + Mode: 0644, + Size: int64(len(file2Content)), + }) + require.NoError(t, err) + _, err = tw.Write(file2Content) + require.NoError(t, err) + + // Add a directory + err = tw.WriteHeader(&tar.Header{ + Name: "emptydir/", + Mode: 0755, + Typeflag: tar.TypeDir, + }) + require.NoError(t, err) + + err = tw.Close() + require.NoError(t, err) + + // Compress the tar archive with gzip + compressedBuffer := new(bytes.Buffer) + gzw := gzip.NewWriter(compressedBuffer) + _, err = io.Copy(gzw, tarBuffer) + require.NoError(t, err) + err = gzw.Close() + require.NoError(t, err) + + compressedData := compressedBuffer.Bytes() + t.Logf("Compressed size: %d bytes", len(compressedData)) + + // Step 1: Compute hash during indexing (what oci_indexer.go does) + indexingHash, err := computeHashDuringIndexing(compressedData) + require.NoError(t, err) + t.Logf("Hash during indexing: %s", indexingHash) + + // Step 2: Compute hash during decompression (what oci.go does) + decompressedHash, decompressedData, err := computeHashDuringDecompression(compressedData) + require.NoError(t, err) + t.Logf("Hash during decompression: %s", decompressedHash) + t.Logf("Decompressed size: %d bytes", len(decompressedData)) + + // Step 3: Compute hash from the decompressed data (what ContentCache does) + contentCacheHash := computeContentCacheHash(decompressedData) + t.Logf("Hash from ContentCache: %s", contentCacheHash) + + // Step 4: Verify all three hashes match + assert.Equal(t, indexingHash, decompressedHash, + "Hash during indexing should match hash during decompression") + assert.Equal(t, decompressedHash, contentCacheHash, + "Hash during decompression should match ContentCache hash") + assert.Equal(t, indexingHash, contentCacheHash, + "Hash during indexing should match ContentCache hash") +} + +// computeHashDuringIndexing simulates how we compute the hash during indexing in oci_indexer.go +func computeHashDuringIndexing(compressedData []byte) (string, error) { + // Create gzip reader + gzr, err := gzip.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return "", err + } + defer gzr.Close() + + // Hash the decompressed data as we read it (this is what oci_indexer.go does) + hasher := sha256.New() + hashingReader := io.TeeReader(gzr, hasher) + + // Create tar reader to consume the stream (simulating the indexing process) + tr := tar.NewReader(hashingReader) + + // Read through all tar entries (like the indexer does) + for { + _, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return "", err + } + // Consume file content (the indexer skips over file contents) + _, err = io.Copy(io.Discard, tr) + if err != nil { + return "", err + } + } + + return hex.EncodeToString(hasher.Sum(nil)), nil +} + +// computeHashDuringDecompression simulates how we compute the hash during decompression in oci.go +func computeHashDuringDecompression(compressedData []byte) (string, []byte, error) { + // Create gzip reader + gzr, err := gzip.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return "", nil, err + } + defer gzr.Close() + + // Hash the decompressed data as we write it (this is what oci.go does) + hasher := sha256.New() + buffer := new(bytes.Buffer) + multiWriter := io.MultiWriter(buffer, hasher) + + _, err = io.Copy(multiWriter, gzr) + if err != nil { + return "", nil, err + } + + return hex.EncodeToString(hasher.Sum(nil)), buffer.Bytes(), nil +} + +// computeContentCacheHash simulates how the ContentCache computes the hash +func computeContentCacheHash(data []byte) string { + hashBytes := sha256.Sum256(data) + return hex.EncodeToString(hashBytes[:]) +} + +// TestRealLayerHashConsistency tests with the actual indexing code to ensure consistency +func TestRealLayerHashConsistency(t *testing.T) { + // Create test data + testData := []byte("This is test data for real layer testing with multiple iterations") + + // Create a tar archive + tarBuffer := new(bytes.Buffer) + tw := tar.NewWriter(tarBuffer) + + // Add multiple files to make it realistic + for i := 0; i < 5; i++ { + content := append(testData, byte(i)) + filename := "testfile_" + string(rune('0'+i)) + ".txt" + + err := tw.WriteHeader(&tar.Header{ + Name: filename, + Mode: 0644, + Size: int64(len(content)), + }) + require.NoError(t, err) + _, err = tw.Write(content) + require.NoError(t, err) + } + + err := tw.Close() + require.NoError(t, err) + + // Compress with gzip + compressedBuffer := new(bytes.Buffer) + gzw := gzip.NewWriter(compressedBuffer) + _, err = io.Copy(gzw, tarBuffer) + require.NoError(t, err) + err = gzw.Close() + require.NoError(t, err) + + compressedData := compressedBuffer.Bytes() + + // Test using the actual indexing code + archiver := &ClipArchiver{} + index := archiver.newIndex() + + gzipIndex, indexedHash, err := archiver.indexLayerOptimized( + context.Background(), + io.NopCloser(bytes.NewReader(compressedData)), + "sha256:test123", + index, + IndexOCIImageOptions{CheckpointMiB: 2}, + ) + require.NoError(t, err) + require.NotNil(t, gzipIndex) + t.Logf("Indexed hash: %s", indexedHash) + + // Now decompress and verify + _, decompressedData, err := computeHashDuringDecompression(compressedData) + require.NoError(t, err) + + // Compute what ContentCache would compute + contentCacheHash := computeContentCacheHash(decompressedData) + t.Logf("ContentCache hash: %s", contentCacheHash) + + // The critical assertion + assert.Equal(t, indexedHash, contentCacheHash, + "Hash from indexing must match what ContentCache computes from decompressed data") +} + +// TestHashWithDiskFile verifies the hash remains consistent when writing to disk +func TestHashWithDiskFile(t *testing.T) { + // Create test content + testContent := []byte("Test content for disk file hashing verification") + + // Create tar + tarBuffer := new(bytes.Buffer) + tw := tar.NewWriter(tarBuffer) + err := tw.WriteHeader(&tar.Header{ + Name: "test.txt", + Mode: 0644, + Size: int64(len(testContent)), + }) + require.NoError(t, err) + _, err = tw.Write(testContent) + require.NoError(t, err) + err = tw.Close() + require.NoError(t, err) + + // Compress + compressedBuffer := new(bytes.Buffer) + gzw := gzip.NewWriter(compressedBuffer) + _, err = io.Copy(gzw, tarBuffer) + require.NoError(t, err) + err = gzw.Close() + require.NoError(t, err) + + compressedData := compressedBuffer.Bytes() + + // Compute hash during indexing + indexedHash, err := computeHashDuringIndexing(compressedData) + require.NoError(t, err) + + // Write to disk like oci.go does + tmpDir := t.TempDir() + diskPath := tmpDir + "/layer.tar" + + gzr, err := gzip.NewReader(bytes.NewReader(compressedData)) + require.NoError(t, err) + defer gzr.Close() + + tmpFile, err := os.Create(diskPath) + require.NoError(t, err) + + hasher := sha256.New() + multiWriter := io.MultiWriter(tmpFile, hasher) + _, err = io.Copy(multiWriter, gzr) + tmpFile.Close() + require.NoError(t, err) + + diskWriteHash := hex.EncodeToString(hasher.Sum(nil)) + + // Read back from disk and compute hash + diskData, err := os.ReadFile(diskPath) + require.NoError(t, err) + + diskReadHash := computeContentCacheHash(diskData) + + // All hashes must match + assert.Equal(t, indexedHash, diskWriteHash, "Indexed hash must match hash computed while writing to disk") + assert.Equal(t, diskWriteHash, diskReadHash, "Hash while writing to disk must match hash when reading from disk") + assert.Equal(t, indexedHash, diskReadHash, "Indexed hash must match hash of data on disk") +} From 4411fe328466b61f9726b7941b5e7c53e3e9e8c9 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 2 Nov 2025 22:05:36 +0000 Subject: [PATCH 5/9] Test: Ensure OCI layer filename matches content hash Co-authored-by: luke --- pkg/clip/oci_hash_consistency_test.go | 17 +++++++++++++---- pkg/clip/oci_indexer.go | 5 ++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/clip/oci_hash_consistency_test.go b/pkg/clip/oci_hash_consistency_test.go index 2955a2d..ea9a6b3 100644 --- a/pkg/clip/oci_hash_consistency_test.go +++ b/pkg/clip/oci_hash_consistency_test.go @@ -251,9 +251,9 @@ func TestHashWithDiskFile(t *testing.T) { indexedHash, err := computeHashDuringIndexing(compressedData) require.NoError(t, err) - // Write to disk like oci.go does + // Write to disk like oci.go does, using the indexed hash as the filename tmpDir := t.TempDir() - diskPath := tmpDir + "/layer.tar" + diskPath := tmpDir + "/" + indexedHash // Name file with the indexed hash (like production does) gzr, err := gzip.NewReader(bytes.NewReader(compressedData)) require.NoError(t, err) @@ -270,14 +270,23 @@ func TestHashWithDiskFile(t *testing.T) { diskWriteHash := hex.EncodeToString(hasher.Sum(nil)) - // Read back from disk and compute hash + // Read back from disk and compute hash (simulating sha256sum command) diskData, err := os.ReadFile(diskPath) require.NoError(t, err) diskReadHash := computeContentCacheHash(diskData) + + t.Logf("File named: %s", indexedHash) + t.Logf("SHA256 of file contents: %s", diskReadHash) // All hashes must match assert.Equal(t, indexedHash, diskWriteHash, "Indexed hash must match hash computed while writing to disk") assert.Equal(t, diskWriteHash, diskReadHash, "Hash while writing to disk must match hash when reading from disk") - assert.Equal(t, indexedHash, diskReadHash, "Indexed hash must match hash of data on disk") + assert.Equal(t, indexedHash, diskReadHash, "CRITICAL: Indexed hash must match hash of data on disk (filename must match content hash)") + + // This is the key assertion that mirrors the user's finding: + // The file is named with indexedHash, and sha256sum of that file should equal indexedHash + assert.Equal(t, indexedHash, diskReadHash, + "File named '%s' should have SHA256 hash '%s' but has '%s'", + indexedHash, indexedHash, diskReadHash) } diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index 7a12cb4..adebd5d 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -208,7 +208,10 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt gzipIdx[layerDigestStr] = gzipIndex decompressedHashes[layerDigestStr] = decompressedHash - log.Info().Msgf("Layer %s: decompressed_hash=%s", layerDigestStr, decompressedHash) + log.Info(). + Str("layer_digest", layerDigestStr). + Str("decompressed_hash", decompressedHash). + Msgf("Storing decompressed hash in metadata - layer_digest=%s -> decompressed_hash=%s", layerDigestStr, decompressedHash) // Send progress update: completed layer if opts.ProgressChan != nil { From cb6d1a99dcca2ba865e4d63e8b7fd73922f6132e Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 2 Nov 2025 22:12:33 +0000 Subject: [PATCH 6/9] Refactor: Improve OCI layer indexing and hash verification logging Co-authored-by: luke --- pkg/clip/oci_indexer.go | 10 ++++++++-- pkg/storage/oci.go | 18 ++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index adebd5d..b22e87e 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -314,8 +314,14 @@ func (ca *ClipArchiver) indexLayerOptimized( // Compute final hash of all decompressed data decompressedHash := hex.EncodeToString(hasher.Sum(nil)) - // Log summary - log.Info().Msgf("Layer indexed with %d checkpoints, decompressed_hash=%s", len(checkpoints), decompressedHash) + // Log summary with VERY clear messaging + log.Warn(). + Str("layer_digest", layerDigest). + Str("decompressed_hash_computed", decompressedHash). + Int("checkpoints", len(checkpoints)). + Int64("decompressed_bytes", uncompressedCounter.n). + Msgf("? INDEXING: Computed decompressed_hash=%s for layer=%s (this will be the disk cache filename)", + decompressedHash, layerDigest) // Return gzip index and decompressed hash return &common.GzipIndex{ diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index 6c1aab2..f4fe8a2 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -434,19 +434,29 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) actualHash := hex.EncodeToString(hasher.Sum(nil)) expectedHash := s.getDecompressedHash(digest) + log.Warn(). + Str("layer_digest", digest). + Str("expected_hash_from_metadata", expectedHash). + Str("actual_hash_of_decompressed_data", actualHash). + Str("disk_path", diskPath). + Bool("hashes_match", actualHash == expectedHash). + Int64("bytes", written). + Msg("HASH CHECK: Filename uses expected_hash, file contents hash to actual_hash") + if expectedHash != "" && actualHash != expectedHash { log.Error(). Str("layer_digest", digest). - Str("expected_hash", expectedHash). - Str("actual_hash", actualHash). + Str("expected_hash_from_metadata", expectedHash). + Str("actual_hash_of_decompressed_data", actualHash). + Str("file_will_be_named", diskPath). Int64("bytes", written). - Msg("CRITICAL: decompressed data hash mismatch! Indexed hash doesn't match actual decompressed data.") + Msg("? CRITICAL BUG: File named with WRONG hash! Metadata has wrong decompressed_hash! Re-index the image!") } else if expectedHash != "" { log.Debug(). Str("layer_digest", digest). Str("hash", actualHash). Int64("bytes", written). - Msg("decompressed data hash verified") + Msg("? decompressed data hash verified - filename matches content") } // Atomic rename From b5f6c03c7e192b6aa3a5f4b72f0a68a103d47ac8 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 2 Nov 2025 22:20:25 +0000 Subject: [PATCH 7/9] Refactor OCI indexer to correctly hash decompressed layer data Co-authored-by: luke --- pkg/clip/oci_indexer.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index b22e87e..087a1cc 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -2,6 +2,7 @@ package clip import ( "archive/tar" + "bytes" "compress/gzip" "context" "crypto/sha256" @@ -251,13 +252,23 @@ func (ca *ClipArchiver) indexLayerOptimized( defer gzr.Close() // Hash the decompressed data as we read it + // IMPORTANT: We must hash ALL decompressed bytes, not just what tar.Reader exposes + // TAR has padding/EOF blocks that tar.Reader consumes but doesn't expose via Next() hasher := sha256.New() - hashingReader := io.TeeReader(gzr, hasher) - - // Wrap uncompressed stream with counting reader - uncompressedCounter := &countingReader{r: hashingReader} + + // Use MultiWriter to hash EVERYTHING while also reading through tar + // First, we need to buffer the entire decompressed stream + decompressedBuffer := new(bytes.Buffer) + hashingWriter := io.MultiWriter(decompressedBuffer, hasher) + + // Read all decompressed data and hash it + decompressedBytes, err := io.Copy(hashingWriter, gzr) + if err != nil { + return nil, "", fmt.Errorf("failed to read decompressed data: %w", err) + } - // Create tar reader + // Now create tar reader from the buffered data + uncompressedCounter := &countingReader{r: decompressedBuffer, n: 0} tr := tar.NewReader(uncompressedCounter) // Track checkpoints @@ -319,7 +330,8 @@ func (ca *ClipArchiver) indexLayerOptimized( Str("layer_digest", layerDigest). Str("decompressed_hash_computed", decompressedHash). Int("checkpoints", len(checkpoints)). - Int64("decompressed_bytes", uncompressedCounter.n). + Int64("decompressed_bytes_total", decompressedBytes). + Int64("tar_bytes_read", uncompressedCounter.n). Msgf("? INDEXING: Computed decompressed_hash=%s for layer=%s (this will be the disk cache filename)", decompressedHash, layerDigest) From 6ccfd4ab9746ae932bb781178f4b6c71af42a9ca Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 3 Nov 2025 01:58:21 +0000 Subject: [PATCH 8/9] Refactor OCI indexing and storage for efficiency and correctness Co-authored-by: luke --- pkg/clip/oci_indexer.go | 119 ++++++++++++++-------------- pkg/storage/oci.go | 166 ++++++---------------------------------- 2 files changed, 80 insertions(+), 205 deletions(-) diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index 087a1cc..0c565e7 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -2,7 +2,6 @@ package clip import ( "archive/tar" - "bytes" "compress/gzip" "context" "crypto/sha256" @@ -209,11 +208,6 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt gzipIdx[layerDigestStr] = gzipIndex decompressedHashes[layerDigestStr] = decompressedHash - log.Info(). - Str("layer_digest", layerDigestStr). - Str("decompressed_hash", decompressedHash). - Msgf("Storing decompressed hash in metadata - layer_digest=%s -> decompressed_hash=%s", layerDigestStr, decompressedHash) - // Send progress update: completed layer if opts.ProgressChan != nil { opts.ProgressChan <- OCIIndexProgress{ @@ -232,8 +226,14 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt return index, layerDigests, gzipIdx, decompressedHashes, registryURL, repository, reference, imageMetadata, nil } -// indexLayerOptimized processes a single layer with optimizations -// Returns gzip index and SHA256 hash of decompressed data +// indexLayerOptimized processes a single layer using streaming I/O with zero memory overhead. +// +// Performance characteristics: +// - Zero-copy streaming: TeeReader hashes data as it flows to tar.Reader +// - Constant memory: O(checkpoint_size) ~2MB, independent of layer size +// - Single pass: Reads compressed stream exactly once +// +// Returns gzip index and SHA256 hash of complete decompressed stream. func (ca *ClipArchiver) indexLayerOptimized( ctx context.Context, compressedRC io.ReadCloser, @@ -241,38 +241,23 @@ func (ca *ClipArchiver) indexLayerOptimized( index *btree.BTree, opts IndexOCIImageOptions, ) (*common.GzipIndex, string, error) { - // Wrap compressed stream with counting reader compressedCounter := &countingReader{r: compressedRC} - // Create gzip reader gzr, err := gzip.NewReader(compressedCounter) if err != nil { return nil, "", fmt.Errorf("failed to create gzip reader: %w", err) } defer gzr.Close() - // Hash the decompressed data as we read it - // IMPORTANT: We must hash ALL decompressed bytes, not just what tar.Reader exposes - // TAR has padding/EOF blocks that tar.Reader consumes but doesn't expose via Next() + // Streaming hash computation via TeeReader (zero-copy) + // TeeReader writes to hasher while tar.Reader consumes data hasher := sha256.New() - - // Use MultiWriter to hash EVERYTHING while also reading through tar - // First, we need to buffer the entire decompressed stream - decompressedBuffer := new(bytes.Buffer) - hashingWriter := io.MultiWriter(decompressedBuffer, hasher) - - // Read all decompressed data and hash it - decompressedBytes, err := io.Copy(hashingWriter, gzr) - if err != nil { - return nil, "", fmt.Errorf("failed to read decompressed data: %w", err) - } - - // Now create tar reader from the buffered data - uncompressedCounter := &countingReader{r: decompressedBuffer, n: 0} + hashingReader := io.TeeReader(gzr, hasher) + uncompressedCounter := &countingReader{r: hashingReader} tr := tar.NewReader(uncompressedCounter) - // Track checkpoints - checkpoints := make([]common.GzipCheckpoint, 0) + // Pre-allocate checkpoint slice (estimate: 1 per 2MB, typical layer is 50-200MB) + checkpoints := make([]common.GzipCheckpoint, 0, 64) checkpointInterval := opts.CheckpointMiB * 1024 * 1024 lastCheckpoint := int64(0) @@ -291,27 +276,30 @@ func (ca *ClipArchiver) indexLayerOptimized( ca.addCheckpoint(&checkpoints, compressedCounter.n, uncompressedCounter.n, &lastCheckpoint) } - // Clean path - cleanPath := path.Clean("/" + strings.TrimPrefix(hdr.Name, "./")) + // Normalize path (remove ./ prefix, ensure leading slash) + cleanPath := hdr.Name + if strings.HasPrefix(cleanPath, "./") { + cleanPath = cleanPath[1:] // Keep leading slash: "./foo" -> "/foo" + } else if !strings.HasPrefix(cleanPath, "/") { + cleanPath = "/" + cleanPath // Ensure leading slash + } + cleanPath = path.Clean(cleanPath) - // Handle whiteouts + // Handle OCI whiteouts (fast path: check prefix before full processing) if ca.handleWhiteout(index, cleanPath) { continue } - // Process based on type + // Process based on type (most common first for branch prediction) switch hdr.Typeflag { case tar.TypeReg, tar.TypeRegA: if err := ca.processRegularFile(index, tr, hdr, cleanPath, layerDigest, compressedCounter, uncompressedCounter, &checkpoints, &lastCheckpoint); err != nil { return nil, "", err } - - case tar.TypeSymlink: - ca.processSymlink(index, hdr, cleanPath, layerDigest) - case tar.TypeDir: ca.processDirectory(index, hdr, cleanPath, layerDigest) - + case tar.TypeSymlink: + ca.processSymlink(index, hdr, cleanPath, layerDigest) case tar.TypeLink: ca.processHardLink(index, hdr, cleanPath) } @@ -322,18 +310,28 @@ func (ca *ClipArchiver) indexLayerOptimized( ca.addCheckpoint(&checkpoints, compressedCounter.n, uncompressedCounter.n, &lastCheckpoint) } - // Compute final hash of all decompressed data + // Consume trailing TAR padding/EOF blocks that tar.Reader doesn't expose. + // These bytes ARE present in decompressed stream and MUST be hashed to match disk cache. + trailingBytes, err := io.Copy(io.Discard, uncompressedCounter) + if err != nil && err != io.EOF { + return nil, "", fmt.Errorf("failed to consume trailing tar bytes: %w", err) + } + + // Finalize hash (includes all bytes: file contents + tar headers + padding) decompressedHash := hex.EncodeToString(hasher.Sum(nil)) - // Log summary with VERY clear messaging - log.Warn(). - Str("layer_digest", layerDigest). - Str("decompressed_hash_computed", decompressedHash). + if trailingBytes > 0 { + log.Debug(). + Int64("trailing_bytes", trailingBytes). + Str("layer", layerDigest). + Msg("consumed tar trailing padding") + } + + log.Info(). Int("checkpoints", len(checkpoints)). - Int64("decompressed_bytes_total", decompressedBytes). - Int64("tar_bytes_read", uncompressedCounter.n). - Msgf("? INDEXING: Computed decompressed_hash=%s for layer=%s (this will be the disk cache filename)", - decompressedHash, layerDigest) + Int64("bytes", uncompressedCounter.n). + Str("hash", decompressedHash). + Msgf("Layer %s indexed", layerDigest) // Return gzip index and decompressed hash return &common.GzipIndex{ @@ -491,18 +489,15 @@ func (ca *ClipArchiver) CreateFromOCI(ctx context.Context, opts IndexOCIImageOpt return nil } -// addCheckpoint adds a gzip checkpoint and updates lastCheckpoint +// addCheckpoint adds a gzip checkpoint and updates lastCheckpoint. +// Inlined for performance (called frequently during indexing). func (ca *ClipArchiver) addCheckpoint(checkpoints *[]common.GzipCheckpoint, cOff, uOff int64, lastCheckpoint *int64) { - cp := common.GzipCheckpoint{ - COff: cOff, - UOff: uOff, - } - *checkpoints = append(*checkpoints, cp) + *checkpoints = append(*checkpoints, common.GzipCheckpoint{COff: cOff, UOff: uOff}) *lastCheckpoint = uOff - log.Debug().Msgf("Added checkpoint: COff=%d, UOff=%d", cp.COff, cp.UOff) } -// processRegularFile processes a regular file entry from tar +// processRegularFile processes a regular file entry from tar. +// Uses io.CopyN for efficient content skipping (streaming, no allocation). func (ca *ClipArchiver) processRegularFile( index *btree.BTree, tr *tar.Reader, @@ -516,23 +511,23 @@ func (ca *ClipArchiver) processRegularFile( ) error { dataStart := uncompressedCounter.n - // Content-defined checkpoint: Add checkpoint before large files (>512KB) - // This enables instant seeking to file start without decompression - // Only add if we haven't added a checkpoint in the last 512KB to avoid checkpoint spam + // Content-defined checkpoint for large files (>512KB) + // Enables fast seeking to file start without full layer decompression + const largeFileThreshold = 512 * 1024 const minCheckpointGap = 512 * 1024 - if hdr.Size > 512*1024 && uncompressedCounter.n > *lastCheckpoint && (uncompressedCounter.n-*lastCheckpoint) >= minCheckpointGap { + + if hdr.Size > largeFileThreshold && (uncompressedCounter.n-*lastCheckpoint) >= minCheckpointGap { ca.addCheckpoint(checkpoints, compressedCounter.n, uncompressedCounter.n, lastCheckpoint) - log.Debug().Msgf("Added file-boundary checkpoint for large file: %s", cleanPath) } - // Skip file content efficiently using CopyN + // Skip file content (streaming, zero-copy via io.Discard) if hdr.Size > 0 { n, err := io.CopyN(io.Discard, tr, hdr.Size) if err != nil && err != io.EOF { return fmt.Errorf("failed to skip file content: %w", err) } if n != hdr.Size { - return fmt.Errorf("failed to skip complete file (wanted %d, got %d)", hdr.Size, n) + return fmt.Errorf("incomplete file read: want %d, got %d", hdr.Size, n) } } diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index f4fe8a2..c113b97 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -430,19 +430,10 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) return fmt.Errorf("failed to decompress layer to disk: %w", err) } - // Compute the actual hash of what we just wrote + // Verify hash matches metadata (critical for cache key correctness) actualHash := hex.EncodeToString(hasher.Sum(nil)) expectedHash := s.getDecompressedHash(digest) - log.Warn(). - Str("layer_digest", digest). - Str("expected_hash_from_metadata", expectedHash). - Str("actual_hash_of_decompressed_data", actualHash). - Str("disk_path", diskPath). - Bool("hashes_match", actualHash == expectedHash). - Int64("bytes", written). - Msg("HASH CHECK: Filename uses expected_hash, file contents hash to actual_hash") - if expectedHash != "" && actualHash != expectedHash { log.Error(). Str("layer_digest", digest). @@ -561,55 +552,6 @@ func streamFileInChunks(filePath string, chunks chan []byte) error { return nil } -// streamFileInChunksWithHash reads a file, sends it in chunks, and computes hash of streamed data -// This is used to verify that the data being sent matches what we expect -func (s *OCIClipStorage) streamFileInChunksWithHash(filePath string, chunks chan []byte) (string, error) { - const chunkSize = int64(1 << 25) // 32MB chunks - - file, err := os.Open(filePath) - if err != nil { - return "", fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - - // Get file size - fileInfo, err := file.Stat() - if err != nil { - return "", fmt.Errorf("failed to stat file: %w", err) - } - fileSize := fileInfo.Size() - - // Hash the data as we stream it - hasher := sha256.New() - - // Stream in chunks - for offset := int64(0); offset < fileSize; { - // Calculate chunk size for this iteration - currentChunkSize := chunkSize - if remaining := fileSize - offset; remaining < chunkSize { - currentChunkSize = remaining - } - - // Read chunk - buffer := make([]byte, currentChunkSize) - nRead, err := io.ReadFull(file, buffer) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - return "", fmt.Errorf("failed to read chunk at offset %d: %w", offset, err) - } - - // Send chunk and hash it - if nRead > 0 { - chunk := buffer[:nRead] - chunks <- chunk - hasher.Write(chunk) - } - - offset += int64(nRead) - } - - return hex.EncodeToString(hasher.Sum(nil)), nil -} - // tryRangeReadFromContentCache attempts a ranged read from remote ContentCache // This enables lazy loading: we fetch only the bytes we need, not the entire layer // decompressedHash is the hash of the decompressed layer data @@ -624,117 +566,55 @@ func (s *OCIClipStorage) tryRangeReadFromContentCache(decompressedHash string, o return data, nil } -// storeDecompressedInRemoteCache stores decompressed layer in remote cache (async safe) -// Stores the ENTIRE layer so other nodes can do range reads from it -// Streams content in chunks to avoid loading the entire layer into memory -// decompressedHash is the hash of the decompressed layer data (used as cache key) +// storeDecompressedInRemoteCache uploads decompressed layer to remote cache for cluster sharing. +// +// Performance: Streams file in 32MB chunks, constant memory O(32MB). +// Key correctness: Verifies file hash matches expected before upload. func (s *OCIClipStorage) storeDecompressedInRemoteCache(decompressedHash string, diskPath string) { - log.Debug(). - Str("decompressed_hash", decompressedHash). - Str("disk_path", diskPath). - Msg("storeDecompressedInRemoteCache goroutine started") - - // Get file size for logging fileInfo, err := os.Stat(diskPath) if err != nil { - log.Error(). - Err(err). - Str("decompressed_hash", decompressedHash). - Str("disk_path", diskPath). - Msg("failed to stat disk cache for content cache storage") + log.Error().Err(err).Str("hash", decompressedHash).Msg("disk file not found for upload") return } totalSize := fileInfo.Size() - // Verify the disk file hash matches our expected hash before uploading - // Only verify if decompressedHash looks like a real SHA256 hash (64 hex chars) - isRealHash := len(decompressedHash) == 64 - for _, c := range decompressedHash { - if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F')) { - isRealHash = false - break - } - } - - var diskFileHash string - if isRealHash { - var err error - diskFileHash, err = s.computeFileHash(diskPath) + // Verify file hash matches expected (SHA256 only, 64 hex chars) + if len(decompressedHash) == 64 { + diskFileHash, err := s.computeFileHash(diskPath) if err != nil { - log.Error(). - Err(err). - Str("decompressed_hash", decompressedHash). - Str("disk_path", diskPath). - Msg("failed to compute disk file hash") + log.Error().Err(err).Str("hash", decompressedHash).Msg("failed to verify disk file hash") return } if diskFileHash != decompressedHash { log.Error(). - Str("expected_hash", decompressedHash). - Str("disk_file_hash", diskFileHash). - Str("disk_path", diskPath). - Int64("bytes", totalSize). - Msg("CRITICAL: disk cache file hash mismatch - will not upload to content cache") + Str("expected", decompressedHash). + Str("actual", diskFileHash). + Str("path", diskPath). + Msg("CRITICAL: disk file hash mismatch - skipping upload") return } - - log.Debug(). - Str("decompressed_hash", decompressedHash). - Str("verified_hash", diskFileHash). - Msg("disk file hash verified, proceeding with content cache upload") - } else { - log.Debug(). - Str("decompressed_hash", decompressedHash). - Msg("skipping disk file hash verification (test mode)") - diskFileHash = "skipped" } - // Stream the file in chunks and compute hash during streaming for verification + // Stream file to content cache (32MB chunks, async) chunks := make(chan []byte, 1) - hashChan := make(chan string, 1) - go func() { defer close(chunks) - - streamedHash, err := s.streamFileInChunksWithHash(diskPath, chunks) - if err != nil { - log.Error(). - Err(err). - Str("decompressed_hash", decompressedHash). - Msg("failed to stream file for content cache storage") - hashChan <- "" - } else { - hashChan <- streamedHash + if err := streamFileInChunks(diskPath, chunks); err != nil { + log.Error().Err(err).Str("hash", decompressedHash).Msg("stream failed") } }() storedHash, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) - streamedHash := <-hashChan - if err != nil { + log.Error().Err(err).Int64("bytes", totalSize).Msg("content cache store failed") + } else if storedHash != decompressedHash { log.Error(). - Err(err). - Str("decompressed_hash", decompressedHash). - Int64("bytes", totalSize). - Msg("failed to store layer in content cache") + Str("expected", decompressedHash). + Str("stored", storedHash). + Msg("CRITICAL BUG: ContentCache computed wrong hash") } else { - // Verify the stored hash matches our expected decompressed hash - if storedHash != decompressedHash { - log.Error(). - Str("expected_hash", decompressedHash). - Str("stored_hash", storedHash). - Str("streamed_hash", streamedHash). - Str("disk_file_hash", diskFileHash). - Int64("bytes", totalSize). - Msg("CRITICAL: content cache stored under different hash - cache lookups will fail! This indicates a bug in the ContentCache implementation.") - } else { - log.Info(). - Str("decompressed_hash", decompressedHash). - Str("stored_hash", storedHash). - Int64("bytes", totalSize). - Msg("successfully stored decompressed layer in content cache") - } + log.Info().Str("hash", decompressedHash).Int64("bytes", totalSize).Msg("uploaded to content cache") } } From c23a16aa7cffd4b8a1f848c27fc083a267748e32 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 3 Nov 2025 02:19:02 +0000 Subject: [PATCH 9/9] Refactor: Remove redundant hash verification and logging Co-authored-by: luke --- pkg/clip/oci_indexer.go | 15 +----- pkg/storage/oci.go | 113 +++++----------------------------------- 2 files changed, 14 insertions(+), 114 deletions(-) diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index 0c565e7..5b04c76 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -312,7 +312,7 @@ func (ca *ClipArchiver) indexLayerOptimized( // Consume trailing TAR padding/EOF blocks that tar.Reader doesn't expose. // These bytes ARE present in decompressed stream and MUST be hashed to match disk cache. - trailingBytes, err := io.Copy(io.Discard, uncompressedCounter) + _, err = io.Copy(io.Discard, uncompressedCounter) if err != nil && err != io.EOF { return nil, "", fmt.Errorf("failed to consume trailing tar bytes: %w", err) } @@ -320,19 +320,6 @@ func (ca *ClipArchiver) indexLayerOptimized( // Finalize hash (includes all bytes: file contents + tar headers + padding) decompressedHash := hex.EncodeToString(hasher.Sum(nil)) - if trailingBytes > 0 { - log.Debug(). - Int64("trailing_bytes", trailingBytes). - Str("layer", layerDigest). - Msg("consumed tar trailing padding") - } - - log.Info(). - Int("checkpoints", len(checkpoints)). - Int64("bytes", uncompressedCounter.n). - Str("hash", decompressedHash). - Msgf("Layer %s indexed", layerDigest) - // Return gzip index and decompressed hash return &common.GzipIndex{ LayerDigest: layerDigest, diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index c113b97..2cae501 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -3,8 +3,6 @@ package storage import ( "compress/gzip" "context" - "crypto/sha256" - "encoding/hex" "fmt" "io" "net/http" @@ -410,8 +408,7 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) } defer os.Remove(tempPath) // Clean up on error - // Decompress directly to disk (streaming, low memory!) - // Also compute hash while decompressing to verify it matches expected + // Decompress directly to disk (streaming) gzr, err := gzip.NewReader(compressedRC) if err != nil { tempFile.Close() @@ -419,64 +416,31 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) } defer gzr.Close() - // Hash the decompressed data as we write it - hasher := sha256.New() - multiWriter := io.MultiWriter(tempFile, hasher) - - written, err := io.Copy(multiWriter, gzr) + written, err := io.Copy(tempFile, gzr) tempFile.Close() if err != nil { return fmt.Errorf("failed to decompress layer to disk: %w", err) } - // Verify hash matches metadata (critical for cache key correctness) - actualHash := hex.EncodeToString(hasher.Sum(nil)) - expectedHash := s.getDecompressedHash(digest) - - if expectedHash != "" && actualHash != expectedHash { - log.Error(). - Str("layer_digest", digest). - Str("expected_hash_from_metadata", expectedHash). - Str("actual_hash_of_decompressed_data", actualHash). - Str("file_will_be_named", diskPath). - Int64("bytes", written). - Msg("? CRITICAL BUG: File named with WRONG hash! Metadata has wrong decompressed_hash! Re-index the image!") - } else if expectedHash != "" { - log.Debug(). - Str("layer_digest", digest). - Str("hash", actualHash). - Int64("bytes", written). - Msg("? decompressed data hash verified - filename matches content") - } - // Atomic rename if err := os.Rename(tempPath, diskPath); err != nil { return fmt.Errorf("failed to rename temp file: %w", err) } - inflateDuration := time.Since(inflateStart) - metrics.RecordInflateCPU(inflateDuration) + duration := time.Since(inflateStart) + metrics.RecordInflateCPU(duration) log.Info(). - Str("layer_digest", digest). - Int64("decompressed_bytes", written). - Str("disk_path", diskPath). - Dur("duration", inflateDuration). - Msg("Layer decompressed and cached to disk") + Str("layer", digest). + Int64("bytes", written). + Dur("duration", duration). + Msg("layer decompressed and cached") - // Store in remote cache (if configured) for other workers + // Upload to content cache for cluster sharing if s.contentCache != nil { decompressedHash := s.getDecompressedHash(digest) - log.Info(). - Str("layer_digest", digest). - Str("decompressed_hash", decompressedHash). - Msg("storing decompressed layer in content cache") go s.storeDecompressedInRemoteCache(decompressedHash, diskPath) - } else { - log.Warn(). - Str("layer_digest", digest). - Msg("content cache not configured - layer will NOT be shared across cluster") } return nil @@ -491,22 +455,6 @@ func (s *OCIClipStorage) writeToDiskCache(path string, data []byte) error { return os.Rename(tempPath, path) } -// computeFileHash computes SHA256 hash of a file -func (s *OCIClipStorage) computeFileHash(filePath string) (string, error) { - file, err := os.Open(filePath) - if err != nil { - return "", fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - - hasher := sha256.New() - if _, err := io.Copy(hasher, file); err != nil { - return "", fmt.Errorf("failed to hash file: %w", err) - } - - return hex.EncodeToString(hasher.Sum(nil)), nil -} - // streamFileInChunks reads a file and sends it in chunks over a channel // This matches the behavior in clipfs.go for consistent streaming // Default chunk size is 32MB to balance memory usage and throughput @@ -567,54 +515,19 @@ func (s *OCIClipStorage) tryRangeReadFromContentCache(decompressedHash string, o } // storeDecompressedInRemoteCache uploads decompressed layer to remote cache for cluster sharing. -// -// Performance: Streams file in 32MB chunks, constant memory O(32MB). -// Key correctness: Verifies file hash matches expected before upload. +// Streams file in 32MB chunks with constant memory usage O(32MB). func (s *OCIClipStorage) storeDecompressedInRemoteCache(decompressedHash string, diskPath string) { - fileInfo, err := os.Stat(diskPath) - if err != nil { - log.Error().Err(err).Str("hash", decompressedHash).Msg("disk file not found for upload") - return - } - totalSize := fileInfo.Size() - - // Verify file hash matches expected (SHA256 only, 64 hex chars) - if len(decompressedHash) == 64 { - diskFileHash, err := s.computeFileHash(diskPath) - if err != nil { - log.Error().Err(err).Str("hash", decompressedHash).Msg("failed to verify disk file hash") - return - } - - if diskFileHash != decompressedHash { - log.Error(). - Str("expected", decompressedHash). - Str("actual", diskFileHash). - Str("path", diskPath). - Msg("CRITICAL: disk file hash mismatch - skipping upload") - return - } - } - - // Stream file to content cache (32MB chunks, async) chunks := make(chan []byte, 1) go func() { defer close(chunks) if err := streamFileInChunks(diskPath, chunks); err != nil { - log.Error().Err(err).Str("hash", decompressedHash).Msg("stream failed") + log.Error().Err(err).Str("hash", decompressedHash).Msg("failed to stream file") } }() - storedHash, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + _, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) if err != nil { - log.Error().Err(err).Int64("bytes", totalSize).Msg("content cache store failed") - } else if storedHash != decompressedHash { - log.Error(). - Str("expected", decompressedHash). - Str("stored", storedHash). - Msg("CRITICAL BUG: ContentCache computed wrong hash") - } else { - log.Info().Str("hash", decompressedHash).Int64("bytes", totalSize).Msg("uploaded to content cache") + log.Error().Err(err).Str("hash", decompressedHash).Msg("content cache store failed") } }