Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/clip/clip.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/beam-cloud/clip/pkg/common"
"github.com/beam-cloud/clip/pkg/storage"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -262,12 +263,13 @@ func StoreS3(storeS3Opts StoreS3Options) error {

// CreateFromOCIImageOptions configures OCI image indexing
type CreateFromOCIImageOptions struct {
ImageRef string // Source image to index (can be local)
StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef)
ImageRef string // Source image to index (can be local)
StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef)
OutputPath string
CheckpointMiB int64
CredProvider interface{}
ProgressChan chan<- OCIIndexProgress // optional channel for progress updates
Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH)
}

// CreateFromOCIImage creates a metadata-only index (.clip) file from an OCI image
Expand Down Expand Up @@ -297,6 +299,7 @@ func CreateFromOCIImage(ctx context.Context, options CreateFromOCIImageOptions)
CheckpointMiB: options.CheckpointMiB,
CredProvider: credProvider,
ProgressChan: options.ProgressChan,
Platform: options.Platform,
}, options.OutputPath)

if err != nil {
Expand Down
24 changes: 20 additions & 4 deletions pkg/clip/oci_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"hash/fnv"
"io"
"path"
"runtime"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -41,6 +42,7 @@ type IndexOCIImageOptions struct {
CheckpointMiB int64 // Checkpoint every N MiB (default 2)
CredProvider common.RegistryCredentialProvider // optional credential provider for registry authentication
ProgressChan chan<- OCIIndexProgress // optional channel for progress updates
Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH)
}

// countingReader tracks bytes read from an io.Reader
Expand Down Expand Up @@ -136,6 +138,20 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt
remoteOpts = append(remoteOpts, remote.WithAuthFromKeychain(authn.DefaultKeychain))
}

// Add platform option (default to host architecture)
platform := opts.Platform
if platform == nil {
platform = &v1.Platform{
OS: "linux",
Architecture: runtime.GOARCH,
}
}
remoteOpts = append(remoteOpts, remote.WithPlatform(*platform))
log.Debug().
Str("os", platform.OS).
Str("arch", platform.Architecture).
Msg("Using platform for image fetch")

// Fetch image
img, err := remote.Image(ref, remoteOpts...)
if err != nil {
Expand Down Expand Up @@ -521,7 +537,7 @@ func (ca *ClipArchiver) processRegularFile(
// Enables fast seeking to file start without full layer decompression
const largeFileThreshold = 512 * 1024
const minCheckpointGap = 512 * 1024

if hdr.Size > largeFileThreshold && (uncompressedCounter.n-*lastCheckpoint) >= minCheckpointGap {
ca.addCheckpoint(checkpoints, compressedCounter.n, uncompressedCounter.n, lastCheckpoint)
}
Expand Down Expand Up @@ -695,17 +711,17 @@ func (ca *ClipArchiver) extractImageMetadata(imgInterface interface{}, imageRef
if labels == nil {
labels = make(map[string]string)
}

env := configFile.Config.Env
if env == nil {
env = make([]string, 0)
}

exposedPorts := configFile.Config.ExposedPorts
if exposedPorts == nil {
exposedPorts = make(map[string]struct{})
}

volumes := configFile.Config.Volumes
if volumes == nil {
volumes = make(map[string]struct{})
Expand Down
31 changes: 28 additions & 3 deletions pkg/storage/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -148,6 +149,19 @@ func (s *OCIClipStorage) initLayers(ctx context.Context) error {
remoteOpts = append(remoteOpts, remote.WithAuthFromKeychain(authn.DefaultKeychain))
}

// Add platform option to match the architecture used during indexing
// Without this, go-containerregistry defaults to amd64 which may have different layer digests
platform := v1.Platform{
OS: "linux",
Architecture: runtime.GOARCH,
}
remoteOpts = append(remoteOpts, remote.WithPlatform(platform))

log.Debug().
Str("image_ref", imageRef).
Str("platform", platform.Architecture).
Msg("fetching image layers from registry")

img, err := remote.Image(ref, remoteOpts...)
if err != nil {
return fmt.Errorf("failed to fetch image: %w", err)
Expand Down Expand Up @@ -278,22 +292,32 @@ func (s *OCIClipStorage) ensureLayerCached(digest string) (string, string, error

layerPath := s.getDecompressedCachePath(decompressedHash)

// Check if already cached on disk
// Fast path: check if already cached on disk (outside lock for performance)
if _, err := os.Stat(layerPath); err == nil {
log.Debug().Str("digest", digest).Str("decompressed_hash", decompressedHash).Msg("disk cache hit")
return decompressedHash, layerPath, nil
}

// Check if another goroutine is already decompressing this layer
// Acquire lock for decompression coordination
s.layerDecompressMu.Lock()

// Double-check disk cache under lock (another goroutine may have just finished)
if _, err := os.Stat(layerPath); err == nil {
s.layerDecompressMu.Unlock()
log.Debug().Str("digest", digest).Str("decompressed_hash", decompressedHash).Msg("disk cache hit (after lock)")
return decompressedHash, layerPath, nil
}

// Check if another goroutine is already decompressing this layer
if waitChan, inProgress := s.layersDecompressing[digest]; inProgress {
// Another goroutine is decompressing - wait for it
s.layerDecompressMu.Unlock()
log.Debug().Str("digest", digest).Msg("waiting for in-progress decompression")
log.Info().Str("digest", digest).Msg("waiting for in-progress layer decompression")
<-waitChan

// Now it should be on disk
if _, err := os.Stat(layerPath); err == nil {
log.Debug().Str("digest", digest).Msg("layer ready after waiting")
return decompressedHash, layerPath, nil
}
return "", "", fmt.Errorf("decompression failed for layer: %s", digest)
Expand All @@ -319,6 +343,7 @@ func (s *OCIClipStorage) ensureLayerCached(digest string) (string, string, error
s.layerDecompressMu.Unlock()

if err != nil {
log.Error().Err(err).Str("digest", digest).Msg("layer decompression failed")
return "", "", err
}

Expand Down