Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/block/cachew/internal/metrics"
"github.com/block/cachew/internal/opa"
"github.com/block/cachew/internal/reaper"
"github.com/block/cachew/internal/s3client"
"github.com/block/cachew/internal/strategy"
"github.com/block/cachew/internal/strategy/git"
"github.com/block/cachew/internal/strategy/gomod"
Expand All @@ -42,6 +43,7 @@ type GlobalConfig struct {
LoggingConfig logging.Config `hcl:"log,block"`
MetricsConfig metrics.Config `hcl:"metrics,block"`
GitCloneConfig gitclone.Config `hcl:"git-clone,block"`
S3Config s3client.Config `hcl:"s3,block,optional"`
GithubAppConfigs []githubapp.Config `hcl:"github-app,block,optional"`
OPAConfig opa.Config `hcl:"opa,block"`
}
Expand Down Expand Up @@ -75,10 +77,11 @@ func main() {
managerProvider := gitclone.NewManagerProvider(ctx, globalConfig.GitCloneConfig, func() (gitclone.CredentialProvider, error) {
return tokenManagerProvider()
})
s3ClientProvider := s3client.NewClientProvider(ctx, globalConfig.S3Config)

schedulerProvider := jobscheduler.NewProvider(ctx, globalConfig.SchedulerConfig)

cr, sr := newRegistries(schedulerProvider, managerProvider, tokenManagerProvider)
cr, sr := newRegistries(schedulerProvider, managerProvider, tokenManagerProvider, s3ClientProvider)

// Commands
switch { //nolint:gocritic
Expand Down Expand Up @@ -111,11 +114,11 @@ func main() {
kctx.FatalIfErrorf(err)
}

func newRegistries(scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider) (*cache.Registry, *strategy.Registry) {
func newRegistries(scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider, s3ClientProvider s3client.ClientProvider) (*cache.Registry, *strategy.Registry) {
cr := cache.NewRegistry()
cache.RegisterMemory(cr)
cache.RegisterDisk(cr)
cache.RegisterS3(cr)
cache.RegisterS3(cr, s3ClientProvider)

sr := strategy.NewRegistry()
strategy.RegisterAPIV1(sr)
Expand Down
91 changes: 22 additions & 69 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cache
import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
Expand All @@ -17,26 +16,29 @@ import (

"github.com/alecthomas/errors"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"

"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/s3client"
)

func RegisterS3(r *Registry) {
// RegisterS3 registers the S3 cache backend. The clientProvider supplies the
// shared minio client constructed from the global s3 config block.
func RegisterS3(r *Registry, clientProvider s3client.ClientProvider) {
Register(
r,
"s3",
"Caches objects in S3",
NewS3,
func(ctx context.Context, config S3Config) (*S3, error) {
return NewS3(ctx, config, clientProvider)
},
)
}

// S3Config contains cache-specific S3 settings. Connection parameters
// (endpoint, region, SSL, credentials) are provided by the global
// s3client.Config block and shared via the s3client.ClientProvider.
type S3Config struct {
Bucket string `hcl:"bucket" help:"S3 bucket name."`
Endpoint string `hcl:"endpoint,optional" help:"S3 endpoint URL (e.g., s3.amazonaws.com or localhost:9000)." default:"s3.amazonaws.com"`
Region string `hcl:"region,optional" help:"S3 region (defaults to us-west-2)." default:"us-west-2"`
UseSSL bool `hcl:"use-ssl,optional" help:"Use SSL for S3 connections (defaults to true)." default:"true"`
SkipSSLVerify bool `hcl:"skip-ssl-verify,optional" help:"Skip SSL certificate verification (defaults to false)." default:"false"`
MaxTTL time.Duration `hcl:"max-ttl,optional" help:"Maximum time-to-live for entries in the S3 cache (defaults to 1 hour)." default:"1h"`
UploadConcurrency uint `hcl:"upload-concurrency,optional" help:"Number of concurrent workers for multi-part uploads (0 = use all CPU cores, defaults to 1)." default:"1"`
UploadPartSizeMB uint `hcl:"upload-part-size-mb,optional" help:"Size of each part for multi-part uploads in megabytes (defaults to 16MB, minimum 5MB)." default:"16"`
Expand All @@ -51,19 +53,16 @@ type S3 struct {

var _ Cache = (*S3)(nil)

// NewS3 creates a new S3-based cache instance using the minio SDK.
// NewS3 creates a new S3-based cache instance.
//
// config.Endpoint and config.Bucket MUST be set.
//
// The standard AWS credential chain is used for authentication, which includes:
// 1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN)
// 2. AWS credentials file (~/.aws/credentials)
// 3. IAM role from EC2 instance metadata or ECS container credentials
// The minio client is obtained from the shared clientProvider, which is
// constructed once from the global s3 configuration block. Cache-specific
// settings (bucket, TTL, upload tuning) come from the per-instance S3Config.
//
// This [Cache] implementation stores cache entries in an S3-compatible object storage service.
// Metadata (headers and expiration time) are stored as object user metadata. The implementation
// uses the lightweight minio-go SDK to reduce overhead compared to the AWS SDK.
func NewS3(ctx context.Context, config S3Config) (*S3, error) {
func NewS3(ctx context.Context, config S3Config, clientProvider s3client.ClientProvider) (*S3, error) {
// Set defaults and validate configuration
if config.UploadConcurrency == 0 {
// #nosec G115 -- n is guaranteed >= 1. I was unable to satisfy all linters.
Expand All @@ -74,61 +73,15 @@ func NewS3(ctx context.Context, config S3Config) (*S3, error) {
return nil, errors.New("upload-part-size-mb must be at least 5MB (S3 minimum part size)")
}

logging.FromContext(ctx).InfoContext(ctx, "Constructing S3 cache", "endpoint", config.Endpoint, "bucket", config.Bucket,
"region", config.Region, "use-ssl", config.UseSSL, "max-ttl", config.MaxTTL,
"upload-concurrency", config.UploadConcurrency, "upload-part-size-mb", config.UploadPartSizeMB)

// Create default transport for credential chain
defaultTransport, err := minio.DefaultTransport(config.UseSSL)
client, err := clientProvider()
if err != nil {
return nil, errors.Errorf("failed to create default transport: %w", err)
}

// Apply SSL verification settings if needed
var transport http.RoundTripper
if config.SkipSSLVerify {
// Clone the default transport and disable SSL verification
customTransport := defaultTransport.Clone()
if customTransport.TLSClientConfig == nil {
customTransport.TLSClientConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
}
} else {
customTransport.TLSClientConfig.MinVersion = tls.VersionTLS12
}
customTransport.TLSClientConfig.InsecureSkipVerify = true
transport = customTransport
defaultTransport = customTransport
return nil, errors.Errorf("failed to obtain shared S3 client: %w", err)
}

// Use AWS credential chain
creds := credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.EnvAWS{}, // Check AWS environment variables
&credentials.FileAWSCredentials{}, // Check ~/.aws/credentials
&credentials.IAM{
Client: &http.Client{
Transport: defaultTransport,
},
}, // Check EC2 instance metadata or ECS container credentials
})

// Create minio client options
options := &minio.Options{
Creds: creds,
Secure: config.UseSSL,
Region: config.Region,
}

// Only set custom transport if needed (for SkipSSLVerify)
if transport != nil {
options.Transport = transport
}

client, err := minio.New(config.Endpoint, options)
if err != nil {
return nil, errors.Errorf("failed to create minio client: %w", err)
}
logging.FromContext(ctx).InfoContext(ctx, "Constructing S3 cache",
"endpoint", client.EndpointURL(), "bucket", config.Bucket,
"max-ttl", config.MaxTTL,
"upload-concurrency", config.UploadConcurrency, "upload-part-size-mb", config.UploadPartSizeMB)

// Verify bucket exists
exists, err := client.BucketExists(ctx, config.Bucket)
Expand All @@ -147,7 +100,7 @@ func NewS3(ctx context.Context, config S3Config) (*S3, error) {
}

func (s *S3) String() string {
return fmt.Sprintf("s3:%s/%s", s.config.Endpoint, s.config.Bucket)
return fmt.Sprintf("s3:%s/%s", s.client.EndpointURL().Host, s.config.Bucket)
}

func (s *S3) Close() error {
Expand Down
36 changes: 21 additions & 15 deletions internal/cache/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,30 @@ import (
"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/cache/cachetest"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/minitest"
"github.com/block/cachew/internal/s3client"
"github.com/block/cachew/internal/s3client/s3clienttest"
)

// TestS3Cache tests the S3 cache implementation using MinIO in Docker.
func TestS3Cache(t *testing.T) {
minitest.Start(t)
bucket := s3clienttest.Start(t)

cachetest.Suite(t, func(t *testing.T) cache.Cache {
s3clienttest.CleanBucket(t, bucket)
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})

minitest.CleanBucket(t)
clientProvider := s3client.NewClientProvider(ctx, s3client.Config{
Endpoint: s3clienttest.Addr,
Region: "",
UseSSL: false,
SkipSSLVerify: false,
})

c, err := cache.NewS3(ctx, cache.S3Config{
Endpoint: minitest.Addr,
Bucket: minitest.Bucket,
Region: "",
UseSSL: false,
Bucket: bucket,
MaxTTL: 100 * time.Millisecond,
UploadPartSizeMB: 16,
})
}, clientProvider)
assert.NoError(t, err)
return c
})
Expand All @@ -41,20 +45,22 @@ func TestS3CacheSoak(t *testing.T) {
t.Skip("Skipping soak test; set SOAK_TEST=1 to run")
}

minitest.Start(t)
bucket := s3clienttest.Start(t)

_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError})

minitest.CleanBucket(t)
clientProvider := s3client.NewClientProvider(ctx, s3client.Config{
Endpoint: s3clienttest.Addr,
Region: "",
UseSSL: false,
SkipSSLVerify: false,
})

c, err := cache.NewS3(ctx, cache.S3Config{
Endpoint: minitest.Addr,
Bucket: minitest.Bucket,
Region: "",
UseSSL: false,
Bucket: bucket,
MaxTTL: 10 * time.Minute,
UploadPartSizeMB: 16,
})
}, clientProvider)
assert.NoError(t, err)
defer c.Close()

Expand Down
14 changes: 7 additions & 7 deletions internal/metadatadb/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@ import (

"github.com/block/cachew/internal/metadatadb"
"github.com/block/cachew/internal/metadatadb/metadatadbtest"
"github.com/block/cachew/internal/minitest"
"github.com/block/cachew/internal/s3client/s3clienttest"
)

func TestS3Backend(t *testing.T) {
minitest.Start(t)
bucket := s3clienttest.Start(t)

metadatadbtest.Suite(t, func(t *testing.T) metadatadb.Backend {
t.Helper()
return metadatadb.NewS3Backend(metadatadb.S3BackendConfig{
Client: minitest.Client(t),
Bucket: minitest.Bucket,
Client: s3clienttest.Client(t),
Bucket: bucket,
Prefix: "_meta-" + t.Name(),
LockTTL: 5 * time.Second,
})
})
}

func TestS3BackendSoak(t *testing.T) {
minitest.Start(t)
bucket := s3clienttest.Start(t)

metadatadbtest.Soak(t, metadatadb.NewS3Backend(metadatadb.S3BackendConfig{
Client: minitest.Client(t),
Bucket: minitest.Bucket,
Client: s3clienttest.Client(t),
Bucket: bucket,
Prefix: "_meta-soak",
LockTTL: 5 * time.Second,
}), metadatadbtest.SoakConfig{
Expand Down
Loading