diff --git a/pkg/clip/oci_hash_consistency_test.go b/pkg/clip/oci_hash_consistency_test.go new file mode 100644 index 0000000..ea9a6b3 --- /dev/null +++ b/pkg/clip/oci_hash_consistency_test.go @@ -0,0 +1,292 @@ +package clip + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "crypto/sha256" + "encoding/hex" + "io" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestDecompressedHashConsistency verifies that the hash computed during indexing +// matches the hash of the actual decompressed layer data that will be stored +func TestDecompressedHashConsistency(t *testing.T) { + // Create a test tar archive with some files + tarBuffer := new(bytes.Buffer) + tw := tar.NewWriter(tarBuffer) + + // Add a file + file1Content := []byte("This is file 1 content with some data") + err := tw.WriteHeader(&tar.Header{ + Name: "file1.txt", + Mode: 0644, + Size: int64(len(file1Content)), + }) + require.NoError(t, err) + _, err = tw.Write(file1Content) + require.NoError(t, err) + + // Add another file + file2Content := []byte("File 2 has different content that is longer to test hashing") + err = tw.WriteHeader(&tar.Header{ + Name: "dir/file2.txt", + Mode: 0644, + Size: int64(len(file2Content)), + }) + require.NoError(t, err) + _, err = tw.Write(file2Content) + require.NoError(t, err) + + // Add a directory + err = tw.WriteHeader(&tar.Header{ + Name: "emptydir/", + Mode: 0755, + Typeflag: tar.TypeDir, + }) + require.NoError(t, err) + + err = tw.Close() + require.NoError(t, err) + + // Compress the tar archive with gzip + compressedBuffer := new(bytes.Buffer) + gzw := gzip.NewWriter(compressedBuffer) + _, err = io.Copy(gzw, tarBuffer) + require.NoError(t, err) + err = gzw.Close() + require.NoError(t, err) + + compressedData := compressedBuffer.Bytes() + t.Logf("Compressed size: %d bytes", len(compressedData)) + + // Step 1: Compute hash during indexing (what oci_indexer.go does) + indexingHash, err := computeHashDuringIndexing(compressedData) + require.NoError(t, err) + t.Logf("Hash during indexing: %s", indexingHash) + + // Step 2: Compute hash during decompression (what oci.go does) + decompressedHash, decompressedData, err := computeHashDuringDecompression(compressedData) + require.NoError(t, err) + t.Logf("Hash during decompression: %s", decompressedHash) + t.Logf("Decompressed size: %d bytes", len(decompressedData)) + + // Step 3: Compute hash from the decompressed data (what ContentCache does) + contentCacheHash := computeContentCacheHash(decompressedData) + t.Logf("Hash from ContentCache: %s", contentCacheHash) + + // Step 4: Verify all three hashes match + assert.Equal(t, indexingHash, decompressedHash, + "Hash during indexing should match hash during decompression") + assert.Equal(t, decompressedHash, contentCacheHash, + "Hash during decompression should match ContentCache hash") + assert.Equal(t, indexingHash, contentCacheHash, + "Hash during indexing should match ContentCache hash") +} + +// computeHashDuringIndexing simulates how we compute the hash during indexing in oci_indexer.go +func computeHashDuringIndexing(compressedData []byte) (string, error) { + // Create gzip reader + gzr, err := gzip.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return "", err + } + defer gzr.Close() + + // Hash the decompressed data as we read it (this is what oci_indexer.go does) + hasher := sha256.New() + hashingReader := io.TeeReader(gzr, hasher) + + // Create tar reader to consume the stream (simulating the indexing process) + tr := tar.NewReader(hashingReader) + + // Read through all tar entries (like the indexer does) + for { + _, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return "", err + } + // Consume file content (the indexer skips over file contents) + _, err = io.Copy(io.Discard, tr) + if err != nil { + return "", err + } + } + + return hex.EncodeToString(hasher.Sum(nil)), nil +} + +// computeHashDuringDecompression simulates how we compute the hash during decompression in oci.go +func computeHashDuringDecompression(compressedData []byte) (string, []byte, error) { + // Create gzip reader + gzr, err := gzip.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return "", nil, err + } + defer gzr.Close() + + // Hash the decompressed data as we write it (this is what oci.go does) + hasher := sha256.New() + buffer := new(bytes.Buffer) + multiWriter := io.MultiWriter(buffer, hasher) + + _, err = io.Copy(multiWriter, gzr) + if err != nil { + return "", nil, err + } + + return hex.EncodeToString(hasher.Sum(nil)), buffer.Bytes(), nil +} + +// computeContentCacheHash simulates how the ContentCache computes the hash +func computeContentCacheHash(data []byte) string { + hashBytes := sha256.Sum256(data) + return hex.EncodeToString(hashBytes[:]) +} + +// TestRealLayerHashConsistency tests with the actual indexing code to ensure consistency +func TestRealLayerHashConsistency(t *testing.T) { + // Create test data + testData := []byte("This is test data for real layer testing with multiple iterations") + + // Create a tar archive + tarBuffer := new(bytes.Buffer) + tw := tar.NewWriter(tarBuffer) + + // Add multiple files to make it realistic + for i := 0; i < 5; i++ { + content := append(testData, byte(i)) + filename := "testfile_" + string(rune('0'+i)) + ".txt" + + err := tw.WriteHeader(&tar.Header{ + Name: filename, + Mode: 0644, + Size: int64(len(content)), + }) + require.NoError(t, err) + _, err = tw.Write(content) + require.NoError(t, err) + } + + err := tw.Close() + require.NoError(t, err) + + // Compress with gzip + compressedBuffer := new(bytes.Buffer) + gzw := gzip.NewWriter(compressedBuffer) + _, err = io.Copy(gzw, tarBuffer) + require.NoError(t, err) + err = gzw.Close() + require.NoError(t, err) + + compressedData := compressedBuffer.Bytes() + + // Test using the actual indexing code + archiver := &ClipArchiver{} + index := archiver.newIndex() + + gzipIndex, indexedHash, err := archiver.indexLayerOptimized( + context.Background(), + io.NopCloser(bytes.NewReader(compressedData)), + "sha256:test123", + index, + IndexOCIImageOptions{CheckpointMiB: 2}, + ) + require.NoError(t, err) + require.NotNil(t, gzipIndex) + t.Logf("Indexed hash: %s", indexedHash) + + // Now decompress and verify + _, decompressedData, err := computeHashDuringDecompression(compressedData) + require.NoError(t, err) + + // Compute what ContentCache would compute + contentCacheHash := computeContentCacheHash(decompressedData) + t.Logf("ContentCache hash: %s", contentCacheHash) + + // The critical assertion + assert.Equal(t, indexedHash, contentCacheHash, + "Hash from indexing must match what ContentCache computes from decompressed data") +} + +// TestHashWithDiskFile verifies the hash remains consistent when writing to disk +func TestHashWithDiskFile(t *testing.T) { + // Create test content + testContent := []byte("Test content for disk file hashing verification") + + // Create tar + tarBuffer := new(bytes.Buffer) + tw := tar.NewWriter(tarBuffer) + err := tw.WriteHeader(&tar.Header{ + Name: "test.txt", + Mode: 0644, + Size: int64(len(testContent)), + }) + require.NoError(t, err) + _, err = tw.Write(testContent) + require.NoError(t, err) + err = tw.Close() + require.NoError(t, err) + + // Compress + compressedBuffer := new(bytes.Buffer) + gzw := gzip.NewWriter(compressedBuffer) + _, err = io.Copy(gzw, tarBuffer) + require.NoError(t, err) + err = gzw.Close() + require.NoError(t, err) + + compressedData := compressedBuffer.Bytes() + + // Compute hash during indexing + indexedHash, err := computeHashDuringIndexing(compressedData) + require.NoError(t, err) + + // Write to disk like oci.go does, using the indexed hash as the filename + tmpDir := t.TempDir() + diskPath := tmpDir + "/" + indexedHash // Name file with the indexed hash (like production does) + + gzr, err := gzip.NewReader(bytes.NewReader(compressedData)) + require.NoError(t, err) + defer gzr.Close() + + tmpFile, err := os.Create(diskPath) + require.NoError(t, err) + + hasher := sha256.New() + multiWriter := io.MultiWriter(tmpFile, hasher) + _, err = io.Copy(multiWriter, gzr) + tmpFile.Close() + require.NoError(t, err) + + diskWriteHash := hex.EncodeToString(hasher.Sum(nil)) + + // Read back from disk and compute hash (simulating sha256sum command) + diskData, err := os.ReadFile(diskPath) + require.NoError(t, err) + + diskReadHash := computeContentCacheHash(diskData) + + t.Logf("File named: %s", indexedHash) + t.Logf("SHA256 of file contents: %s", diskReadHash) + + // All hashes must match + assert.Equal(t, indexedHash, diskWriteHash, "Indexed hash must match hash computed while writing to disk") + assert.Equal(t, diskWriteHash, diskReadHash, "Hash while writing to disk must match hash when reading from disk") + assert.Equal(t, indexedHash, diskReadHash, "CRITICAL: Indexed hash must match hash of data on disk (filename must match content hash)") + + // This is the key assertion that mirrors the user's finding: + // The file is named with indexedHash, and sha256sum of that file should equal indexedHash + assert.Equal(t, indexedHash, diskReadHash, + "File named '%s' should have SHA256 hash '%s' but has '%s'", + indexedHash, indexedHash, diskReadHash) +} diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index 7a12cb4..5b04c76 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -208,8 +208,6 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt gzipIdx[layerDigestStr] = gzipIndex decompressedHashes[layerDigestStr] = decompressedHash - log.Info().Msgf("Layer %s: decompressed_hash=%s", layerDigestStr, decompressedHash) - // Send progress update: completed layer if opts.ProgressChan != nil { opts.ProgressChan <- OCIIndexProgress{ @@ -228,8 +226,14 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt return index, layerDigests, gzipIdx, decompressedHashes, registryURL, repository, reference, imageMetadata, nil } -// indexLayerOptimized processes a single layer with optimizations -// Returns gzip index and SHA256 hash of decompressed data +// indexLayerOptimized processes a single layer using streaming I/O with zero memory overhead. +// +// Performance characteristics: +// - Zero-copy streaming: TeeReader hashes data as it flows to tar.Reader +// - Constant memory: O(checkpoint_size) ~2MB, independent of layer size +// - Single pass: Reads compressed stream exactly once +// +// Returns gzip index and SHA256 hash of complete decompressed stream. func (ca *ClipArchiver) indexLayerOptimized( ctx context.Context, compressedRC io.ReadCloser, @@ -237,28 +241,23 @@ func (ca *ClipArchiver) indexLayerOptimized( index *btree.BTree, opts IndexOCIImageOptions, ) (*common.GzipIndex, string, error) { - // Wrap compressed stream with counting reader compressedCounter := &countingReader{r: compressedRC} - // Create gzip reader gzr, err := gzip.NewReader(compressedCounter) if err != nil { return nil, "", fmt.Errorf("failed to create gzip reader: %w", err) } defer gzr.Close() - // Hash the decompressed data as we read it + // Streaming hash computation via TeeReader (zero-copy) + // TeeReader writes to hasher while tar.Reader consumes data hasher := sha256.New() hashingReader := io.TeeReader(gzr, hasher) - - // Wrap uncompressed stream with counting reader uncompressedCounter := &countingReader{r: hashingReader} - - // Create tar reader tr := tar.NewReader(uncompressedCounter) - // Track checkpoints - checkpoints := make([]common.GzipCheckpoint, 0) + // Pre-allocate checkpoint slice (estimate: 1 per 2MB, typical layer is 50-200MB) + checkpoints := make([]common.GzipCheckpoint, 0, 64) checkpointInterval := opts.CheckpointMiB * 1024 * 1024 lastCheckpoint := int64(0) @@ -277,27 +276,30 @@ func (ca *ClipArchiver) indexLayerOptimized( ca.addCheckpoint(&checkpoints, compressedCounter.n, uncompressedCounter.n, &lastCheckpoint) } - // Clean path - cleanPath := path.Clean("/" + strings.TrimPrefix(hdr.Name, "./")) + // Normalize path (remove ./ prefix, ensure leading slash) + cleanPath := hdr.Name + if strings.HasPrefix(cleanPath, "./") { + cleanPath = cleanPath[1:] // Keep leading slash: "./foo" -> "/foo" + } else if !strings.HasPrefix(cleanPath, "/") { + cleanPath = "/" + cleanPath // Ensure leading slash + } + cleanPath = path.Clean(cleanPath) - // Handle whiteouts + // Handle OCI whiteouts (fast path: check prefix before full processing) if ca.handleWhiteout(index, cleanPath) { continue } - // Process based on type + // Process based on type (most common first for branch prediction) switch hdr.Typeflag { case tar.TypeReg, tar.TypeRegA: if err := ca.processRegularFile(index, tr, hdr, cleanPath, layerDigest, compressedCounter, uncompressedCounter, &checkpoints, &lastCheckpoint); err != nil { return nil, "", err } - - case tar.TypeSymlink: - ca.processSymlink(index, hdr, cleanPath, layerDigest) - case tar.TypeDir: ca.processDirectory(index, hdr, cleanPath, layerDigest) - + case tar.TypeSymlink: + ca.processSymlink(index, hdr, cleanPath, layerDigest) case tar.TypeLink: ca.processHardLink(index, hdr, cleanPath) } @@ -308,11 +310,15 @@ func (ca *ClipArchiver) indexLayerOptimized( ca.addCheckpoint(&checkpoints, compressedCounter.n, uncompressedCounter.n, &lastCheckpoint) } - // Compute final hash of all decompressed data - decompressedHash := hex.EncodeToString(hasher.Sum(nil)) + // Consume trailing TAR padding/EOF blocks that tar.Reader doesn't expose. + // These bytes ARE present in decompressed stream and MUST be hashed to match disk cache. + _, err = io.Copy(io.Discard, uncompressedCounter) + if err != nil && err != io.EOF { + return nil, "", fmt.Errorf("failed to consume trailing tar bytes: %w", err) + } - // Log summary - log.Info().Msgf("Layer indexed with %d checkpoints, decompressed_hash=%s", len(checkpoints), decompressedHash) + // Finalize hash (includes all bytes: file contents + tar headers + padding) + decompressedHash := hex.EncodeToString(hasher.Sum(nil)) // Return gzip index and decompressed hash return &common.GzipIndex{ @@ -470,18 +476,15 @@ func (ca *ClipArchiver) CreateFromOCI(ctx context.Context, opts IndexOCIImageOpt return nil } -// addCheckpoint adds a gzip checkpoint and updates lastCheckpoint +// addCheckpoint adds a gzip checkpoint and updates lastCheckpoint. +// Inlined for performance (called frequently during indexing). func (ca *ClipArchiver) addCheckpoint(checkpoints *[]common.GzipCheckpoint, cOff, uOff int64, lastCheckpoint *int64) { - cp := common.GzipCheckpoint{ - COff: cOff, - UOff: uOff, - } - *checkpoints = append(*checkpoints, cp) + *checkpoints = append(*checkpoints, common.GzipCheckpoint{COff: cOff, UOff: uOff}) *lastCheckpoint = uOff - log.Debug().Msgf("Added checkpoint: COff=%d, UOff=%d", cp.COff, cp.UOff) } -// processRegularFile processes a regular file entry from tar +// processRegularFile processes a regular file entry from tar. +// Uses io.CopyN for efficient content skipping (streaming, no allocation). func (ca *ClipArchiver) processRegularFile( index *btree.BTree, tr *tar.Reader, @@ -495,23 +498,23 @@ func (ca *ClipArchiver) processRegularFile( ) error { dataStart := uncompressedCounter.n - // Content-defined checkpoint: Add checkpoint before large files (>512KB) - // This enables instant seeking to file start without decompression - // Only add if we haven't added a checkpoint in the last 512KB to avoid checkpoint spam + // Content-defined checkpoint for large files (>512KB) + // Enables fast seeking to file start without full layer decompression + const largeFileThreshold = 512 * 1024 const minCheckpointGap = 512 * 1024 - if hdr.Size > 512*1024 && uncompressedCounter.n > *lastCheckpoint && (uncompressedCounter.n-*lastCheckpoint) >= minCheckpointGap { + + if hdr.Size > largeFileThreshold && (uncompressedCounter.n-*lastCheckpoint) >= minCheckpointGap { ca.addCheckpoint(checkpoints, compressedCounter.n, uncompressedCounter.n, lastCheckpoint) - log.Debug().Msgf("Added file-boundary checkpoint for large file: %s", cleanPath) } - // Skip file content efficiently using CopyN + // Skip file content (streaming, zero-copy via io.Discard) if hdr.Size > 0 { n, err := io.CopyN(io.Discard, tr, hdr.Size) if err != nil && err != io.EOF { return fmt.Errorf("failed to skip file content: %w", err) } if n != hdr.Size { - return fmt.Errorf("failed to skip complete file (wanted %d, got %d)", hdr.Size, n) + return fmt.Errorf("incomplete file read: want %d, got %d", hdr.Size, n) } } diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index 8d21321..2cae501 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -408,7 +408,7 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) } defer os.Remove(tempPath) // Clean up on error - // Decompress directly to disk (streaming, low memory!) + // Decompress directly to disk (streaming) gzr, err := gzip.NewReader(compressedRC) if err != nil { tempFile.Close() @@ -428,28 +428,19 @@ func (s *OCIClipStorage) decompressAndCacheLayer(digest string, diskPath string) return fmt.Errorf("failed to rename temp file: %w", err) } - inflateDuration := time.Since(inflateStart) - metrics.RecordInflateCPU(inflateDuration) + duration := time.Since(inflateStart) + metrics.RecordInflateCPU(duration) log.Info(). - Str("layer_digest", digest). - Int64("decompressed_bytes", written). - Str("disk_path", diskPath). - Dur("duration", inflateDuration). - Msg("Layer decompressed and cached to disk") + Str("layer", digest). + Int64("bytes", written). + Dur("duration", duration). + Msg("layer decompressed and cached") - // Store in remote cache (if configured) for other workers + // Upload to content cache for cluster sharing if s.contentCache != nil { decompressedHash := s.getDecompressedHash(digest) - log.Info(). - Str("layer_digest", digest). - Str("decompressed_hash", decompressedHash). - Msg("storing decompressed layer in content cache") go s.storeDecompressedInRemoteCache(decompressedHash, diskPath) - } else { - log.Warn(). - Str("layer_digest", digest). - Msg("content cache not configured - layer will NOT be shared across cluster") } return nil @@ -515,7 +506,7 @@ func streamFileInChunks(filePath string, chunks chan []byte) error { func (s *OCIClipStorage) tryRangeReadFromContentCache(decompressedHash string, offset, length int64) ([]byte, error) { // Use GetContent for range reads (offset + length) // This is the KEY optimization: we only fetch the bytes we need! - data, err := s.contentCache.GetContent(decompressedHash, offset, length, struct{ RoutingKey string }{}) + data, err := s.contentCache.GetContent(decompressedHash, offset, length, struct{ RoutingKey string }{RoutingKey: decompressedHash}) if err != nil { return nil, fmt.Errorf("content cache range read failed: %w", err) } @@ -523,55 +514,20 @@ func (s *OCIClipStorage) tryRangeReadFromContentCache(decompressedHash string, o return data, nil } -// storeDecompressedInRemoteCache stores decompressed layer in remote cache (async safe) -// Stores the ENTIRE layer so other nodes can do range reads from it -// Streams content in chunks to avoid loading the entire layer into memory -// decompressedHash is the hash of the decompressed layer data (used as cache key) +// storeDecompressedInRemoteCache uploads decompressed layer to remote cache for cluster sharing. +// Streams file in 32MB chunks with constant memory usage O(32MB). func (s *OCIClipStorage) storeDecompressedInRemoteCache(decompressedHash string, diskPath string) { - log.Debug(). - Str("decompressed_hash", decompressedHash). - Str("disk_path", diskPath). - Msg("storeDecompressedInRemoteCache goroutine started") - - // Get file size for logging - fileInfo, err := os.Stat(diskPath) - if err != nil { - log.Error(). - Err(err). - Str("decompressed_hash", decompressedHash). - Str("disk_path", diskPath). - Msg("failed to stat disk cache for content cache storage") - return - } - totalSize := fileInfo.Size() - - // Stream the file in chunks (similar to clipfs.go) chunks := make(chan []byte, 1) - go func() { defer close(chunks) - if err := streamFileInChunks(diskPath, chunks); err != nil { - log.Error(). - Err(err). - Str("decompressed_hash", decompressedHash). - Msg("failed to stream file for content cache storage") + log.Error().Err(err).Str("hash", decompressedHash).Msg("failed to stream file") } }() - storedHash, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{}) + _, err := s.contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) if err != nil { - log.Error(). - Err(err). - Str("decompressed_hash", decompressedHash). - Int64("bytes", totalSize). - Msg("failed to store layer in content cache") - } else { - log.Info(). - Str("decompressed_hash", decompressedHash). - Str("stored_hash", storedHash). - Int64("bytes", totalSize). - Msg("successfully stored decompressed layer in content cache") + log.Error().Err(err).Str("hash", decompressedHash).Msg("content cache store failed") } } diff --git a/pkg/storage/oci_test.go b/pkg/storage/oci_test.go index c0c3ed8..7fb0516 100644 --- a/pkg/storage/oci_test.go +++ b/pkg/storage/oci_test.go @@ -185,12 +185,13 @@ func TestOCIStorage_CacheHit(t *testing.T) { storageInfo.DecompressedHashByLayer[digest.String()] = decompressedHash storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: storageInfo, - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: storageInfo, + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -253,12 +254,13 @@ func TestOCIStorage_CacheMiss(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -382,12 +384,13 @@ func TestOCIStorage_PartialRead(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Test reading from different offsets @@ -461,12 +464,13 @@ func TestOCIStorage_CacheError(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -524,12 +528,13 @@ func TestOCIStorage_LayerFetchError(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -587,12 +592,13 @@ func TestOCIStorage_ConcurrentReads(t *testing.T) { } storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: t.TempDir(), - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: t.TempDir(), + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -959,12 +965,13 @@ func TestLayerCacheEliminatesRepeatedInflates(t *testing.T) { diskCacheDir := t.TempDir() storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: diskCacheDir, - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: diskCacheDir, + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Create node @@ -1000,7 +1007,7 @@ func TestLayerCacheEliminatesRepeatedInflates(t *testing.T) { require.Equal(t, testData, dest) } - t.Logf("✅ SUCCESS: %d reads completed - layer decompressed once and cached to disk!", numReads) + t.Logf("? SUCCESS: %d reads completed - layer decompressed once and cached to disk!", numReads) } // BenchmarkLayerCachePerformance benchmarks the performance difference @@ -1178,7 +1185,7 @@ func TestCrossImageCacheSharing(t *testing.T) { cachedLayerPath2 := storage2.getDiskCachePath(sharedDigest.String()) require.Equal(t, cachedLayerPath, cachedLayerPath2, "Both images should use same cache file") - t.Logf("✅ SUCCESS: Image 2 reused cached layer from Image 1!") + t.Logf("? SUCCESS: Image 2 reused cached layer from Image 1!") t.Logf("Cache file: %s", cachedLayerPath) t.Logf("Cache sharing verified: both images use same digest-based cache file") } @@ -1327,7 +1334,7 @@ func TestCheckpointBasedReading(t *testing.T) { }) } - t.Log("✅ Checkpoint-based reading test passed!") + t.Log("? Checkpoint-based reading test passed!") } // TestCheckpointFallback tests that checkpoint mode falls back to full decompression when needed @@ -1390,7 +1397,7 @@ func TestCheckpointFallback(t *testing.T) { assert.Equal(t, len(testData), n) assert.Equal(t, testData, dest) - t.Log("✅ Checkpoint fallback test passed!") + t.Log("? Checkpoint fallback test passed!") } // TestBackwardCompatibilityNoCheckpoints tests that disabling checkpoints works (backward compatibility) @@ -1462,7 +1469,7 @@ func TestBackwardCompatibilityNoCheckpoints(t *testing.T) { _, err = os.Stat(layerPath) require.NoError(t, err, "layer should be cached to disk when checkpoints disabled") - t.Log("✅ Backward compatibility test passed!") + t.Log("? Backward compatibility test passed!") } // TestNearestCheckpoint tests the checkpoint selection algorithm diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 25a4485..22d71ec 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -143,6 +143,7 @@ func TestContentCacheRangeRead(t *testing.T) { diskCacheDir: diskCacheDir, layersDecompressing: make(map[string]chan struct{}), contentCache: cache, + contentCacheAvailable: true, } // Test: First read triggers decompression and caching @@ -233,6 +234,7 @@ func TestDiskCacheThenContentCache(t *testing.T) { diskCacheDir: diskCacheDir, layersDecompressing: make(map[string]chan struct{}), contentCache: cache, + contentCacheAvailable: true, } node := &common.ClipNode{ @@ -324,12 +326,13 @@ func TestRangeReadOnlyFetchesNeededBytes(t *testing.T) { storageInfo.DecompressedHashByLayer[digest.String()] = decompressedHash storage := &OCIClipStorage{ - metadata: metadata, - storageInfo: storageInfo, - layerCache: map[string]v1.Layer{digest.String(): layer}, - diskCacheDir: diskCacheDir, - layersDecompressing: make(map[string]chan struct{}), - contentCache: cache, + metadata: metadata, + storageInfo: storageInfo, + layerCache: map[string]v1.Layer{digest.String(): layer}, + diskCacheDir: diskCacheDir, + layersDecompressing: make(map[string]chan struct{}), + contentCache: cache, + contentCacheAvailable: true, } // Read only a small portion (1 KB from a 10 MB layer)