diff --git a/pkg/clip/clip.go b/pkg/clip/clip.go index 202c050..aa90db1 100644 --- a/pkg/clip/clip.go +++ b/pkg/clip/clip.go @@ -10,6 +10,7 @@ import ( "github.com/beam-cloud/clip/pkg/common" "github.com/beam-cloud/clip/pkg/storage" + v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" "github.com/rs/zerolog" @@ -262,12 +263,13 @@ func StoreS3(storeS3Opts StoreS3Options) error { // CreateFromOCIImageOptions configures OCI image indexing type CreateFromOCIImageOptions struct { - ImageRef string // Source image to index (can be local) - StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) + ImageRef string // Source image to index (can be local) + StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) OutputPath string CheckpointMiB int64 CredProvider interface{} ProgressChan chan<- OCIIndexProgress // optional channel for progress updates + Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH) } // CreateFromOCIImage creates a metadata-only index (.clip) file from an OCI image @@ -297,6 +299,7 @@ func CreateFromOCIImage(ctx context.Context, options CreateFromOCIImageOptions) CheckpointMiB: options.CheckpointMiB, CredProvider: credProvider, ProgressChan: options.ProgressChan, + Platform: options.Platform, }, options.OutputPath) if err != nil { diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index c760242..e78b380 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -10,6 +10,7 @@ import ( "hash/fnv" "io" "path" + "runtime" "strings" "syscall" "time" @@ -41,6 +42,7 @@ type IndexOCIImageOptions struct { CheckpointMiB int64 // Checkpoint every N MiB (default 2) CredProvider common.RegistryCredentialProvider // optional credential provider for registry authentication ProgressChan chan<- OCIIndexProgress // optional channel for progress updates + Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH) } // countingReader tracks bytes read from an io.Reader @@ -136,6 +138,20 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt remoteOpts = append(remoteOpts, remote.WithAuthFromKeychain(authn.DefaultKeychain)) } + // Add platform option (default to host architecture) + platform := opts.Platform + if platform == nil { + platform = &v1.Platform{ + OS: "linux", + Architecture: runtime.GOARCH, + } + } + remoteOpts = append(remoteOpts, remote.WithPlatform(*platform)) + log.Debug(). + Str("os", platform.OS). + Str("arch", platform.Architecture). + Msg("Using platform for image fetch") + // Fetch image img, err := remote.Image(ref, remoteOpts...) if err != nil { @@ -521,7 +537,7 @@ func (ca *ClipArchiver) processRegularFile( // Enables fast seeking to file start without full layer decompression const largeFileThreshold = 512 * 1024 const minCheckpointGap = 512 * 1024 - + if hdr.Size > largeFileThreshold && (uncompressedCounter.n-*lastCheckpoint) >= minCheckpointGap { ca.addCheckpoint(checkpoints, compressedCounter.n, uncompressedCounter.n, lastCheckpoint) } @@ -695,17 +711,17 @@ func (ca *ClipArchiver) extractImageMetadata(imgInterface interface{}, imageRef if labels == nil { labels = make(map[string]string) } - + env := configFile.Config.Env if env == nil { env = make([]string, 0) } - + exposedPorts := configFile.Config.ExposedPorts if exposedPorts == nil { exposedPorts = make(map[string]struct{}) } - + volumes := configFile.Config.Volumes if volumes == nil { volumes = make(map[string]struct{}) diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index 2cae501..c8c0766 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path/filepath" + "runtime" "sync" "time" @@ -148,6 +149,19 @@ func (s *OCIClipStorage) initLayers(ctx context.Context) error { remoteOpts = append(remoteOpts, remote.WithAuthFromKeychain(authn.DefaultKeychain)) } + // Add platform option to match the architecture used during indexing + // Without this, go-containerregistry defaults to amd64 which may have different layer digests + platform := v1.Platform{ + OS: "linux", + Architecture: runtime.GOARCH, + } + remoteOpts = append(remoteOpts, remote.WithPlatform(platform)) + + log.Debug(). + Str("image_ref", imageRef). + Str("platform", platform.Architecture). + Msg("fetching image layers from registry") + img, err := remote.Image(ref, remoteOpts...) if err != nil { return fmt.Errorf("failed to fetch image: %w", err) @@ -278,22 +292,32 @@ func (s *OCIClipStorage) ensureLayerCached(digest string) (string, string, error layerPath := s.getDecompressedCachePath(decompressedHash) - // Check if already cached on disk + // Fast path: check if already cached on disk (outside lock for performance) if _, err := os.Stat(layerPath); err == nil { log.Debug().Str("digest", digest).Str("decompressed_hash", decompressedHash).Msg("disk cache hit") return decompressedHash, layerPath, nil } - // Check if another goroutine is already decompressing this layer + // Acquire lock for decompression coordination s.layerDecompressMu.Lock() + + // Double-check disk cache under lock (another goroutine may have just finished) + if _, err := os.Stat(layerPath); err == nil { + s.layerDecompressMu.Unlock() + log.Debug().Str("digest", digest).Str("decompressed_hash", decompressedHash).Msg("disk cache hit (after lock)") + return decompressedHash, layerPath, nil + } + + // Check if another goroutine is already decompressing this layer if waitChan, inProgress := s.layersDecompressing[digest]; inProgress { // Another goroutine is decompressing - wait for it s.layerDecompressMu.Unlock() - log.Debug().Str("digest", digest).Msg("waiting for in-progress decompression") + log.Info().Str("digest", digest).Msg("waiting for in-progress layer decompression") <-waitChan // Now it should be on disk if _, err := os.Stat(layerPath); err == nil { + log.Debug().Str("digest", digest).Msg("layer ready after waiting") return decompressedHash, layerPath, nil } return "", "", fmt.Errorf("decompression failed for layer: %s", digest) @@ -319,6 +343,7 @@ func (s *OCIClipStorage) ensureLayerCached(digest string) (string, string, error s.layerDecompressMu.Unlock() if err != nil { + log.Error().Err(err).Str("digest", digest).Msg("layer decompression failed") return "", "", err }