diff --git a/cmd/cachewd/main.go b/cmd/cachewd/main.go index 3001f37..de8d602 100644 --- a/cmd/cachewd/main.go +++ b/cmd/cachewd/main.go @@ -26,6 +26,7 @@ import ( "github.com/block/cachew/internal/httputil" "github.com/block/cachew/internal/jobscheduler" "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/metadatadb" "github.com/block/cachew/internal/metrics" "github.com/block/cachew/internal/opa" "github.com/block/cachew/internal/reaper" @@ -74,23 +75,23 @@ func main() { // Start initialising tokenManagerProvider := githubapp.NewTokenManagerProvider(globalConfig.GithubAppConfigs, logger) - managerProvider := gitclone.NewManagerProvider(ctx, globalConfig.GitCloneConfig, func() (gitclone.CredentialProvider, error) { + gitManagerProvider := gitclone.NewManagerProvider(ctx, globalConfig.GitCloneConfig, func() (gitclone.CredentialProvider, error) { return tokenManagerProvider() }) s3ClientProvider := s3client.NewClientProvider(ctx, globalConfig.S3Config) schedulerProvider := jobscheduler.NewProvider(ctx, globalConfig.SchedulerConfig) - cr, sr := newRegistries(schedulerProvider, managerProvider, tokenManagerProvider, s3ClientProvider) + cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider) // Commands switch { //nolint:gocritic case cli.Schema: - printSchema(kctx, cr, sr) + printSchema(kctx, cr, mr, sr) return } - mux, err := newMux(ctx, cr, sr, providersConfigHCL, envars) + mux, err := newMux(ctx, cr, mr, sr, providersConfigHCL, envars) fatalIfError(ctx, logger, err, "Failed to load config") metricsClient, err := metrics.New(ctx, globalConfig.MetricsConfig) @@ -114,12 +115,25 @@ func main() { fatalIfError(ctx, logger, err, "Server stopped") } -func newRegistries(scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider, s3ClientProvider s3client.ClientProvider) (*cache.Registry, *strategy.Registry) { +func newRegistries( + scheduler jobscheduler.Provider, + cloneManagerProvider gitclone.ManagerProvider, + tokenManagerProvider githubapp.TokenManagerProvider, + s3ClientProvider s3client.ClientProvider, +) ( + *cache.Registry, + *metadatadb.Registry, + *strategy.Registry, +) { cr := cache.NewRegistry() cache.RegisterMemory(cr) cache.RegisterDisk(cr) cache.RegisterS3(cr, s3ClientProvider) + mr := metadatadb.NewRegistry() + metadatadb.RegisterMemory(mr) + metadatadb.RegisterS3(mr, s3ClientProvider) + sr := strategy.NewRegistry() strategy.RegisterAPIV1(sr) strategy.RegisterArtifactory(sr) @@ -130,11 +144,11 @@ func newRegistries(scheduler jobscheduler.Provider, cloneManagerProvider gitclon git.Register(sr, scheduler, cloneManagerProvider, tokenManagerProvider) gomod.Register(sr, cloneManagerProvider) - return cr, sr + return cr, mr, sr } -func printSchema(kctx *kong.Context, cr *cache.Registry, sr *strategy.Registry) { - schema := config.Schema[GlobalConfig](cr, sr) +func printSchema(kctx *kong.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry) { + schema := config.Schema[GlobalConfig](cr, mr, sr) text, err := hcl.MarshalAST(schema) kctx.FatalIfErrorf(err) @@ -146,7 +160,7 @@ func printSchema(kctx *kong.Context, cr *cache.Registry, sr *strategy.Registry) } } -func newMux(ctx context.Context, cr *cache.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string) (http.Handler, error) { +func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string) (http.Handler, error) { mux := http.NewServeMux() mux.HandleFunc("GET /_liveness", func(w http.ResponseWriter, _ *http.Request) { @@ -179,7 +193,7 @@ func newMux(ctx context.Context, cr *cache.Registry, sr *strategy.Registry, prov http.DefaultServeMux.ServeHTTP(w, r) })) - handler, err := config.Load(ctx, cr, sr, providersConfigHCL, mux, vars) + handler, _, err := config.Load(ctx, cr, mr, sr, providersConfigHCL, mux, vars) if err != nil { return nil, errors.Errorf("load config: %w", err) } diff --git a/internal/cache/api.go b/internal/cache/api.go index 641edd4..f7788e5 100644 --- a/internal/cache/api.go +++ b/internal/cache/api.go @@ -72,6 +72,7 @@ func (r *Registry) Schema() *hcl.AST { Labels: append([]string{entry.schema.Name}, entry.schema.Labels...), Body: entry.schema.Body, Comments: entry.schema.Comments, + Repeated: true, } ast.Entries = append(ast.Entries, wrapped) } diff --git a/internal/config/config.go b/internal/config/config.go index 02e838e..1a95f21 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ import ( "github.com/block/cachew/internal/cache" "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/metadatadb" "github.com/block/cachew/internal/strategy" _ "github.com/block/cachew/internal/strategy/git" // Register git strategy _ "github.com/block/cachew/internal/strategy/gomod" // Register gomod strategy @@ -39,14 +40,15 @@ func (l *loggingMux) HandleFunc(pattern string, handler func(http.ResponseWriter var _ strategy.Mux = (*loggingMux)(nil) // Schema returns the configuration file schema. -func Schema[GlobalConfig any](cr *cache.Registry, sr *strategy.Registry) *hcl.AST { +func Schema[GlobalConfig any](cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry) *hcl.AST { globalSchema, err := hcl.Schema(new(GlobalConfig)) if err != nil { panic(err) } - return &hcl.AST{ - Entries: append(globalSchema.Entries, append(sr.Schema().Entries, cr.Schema().Entries...)...), - } + globalSchema.Entries = append(globalSchema.Entries, sr.Schema().Entries...) + globalSchema.Entries = append(globalSchema.Entries, cr.Schema().Entries...) + globalSchema.Entries = append(globalSchema.Entries, mr.Schema().Entries...) + return globalSchema } // Split configuration into global config and provider-specific config. @@ -103,6 +105,43 @@ func unwrapBlock(block *hcl.Block) (name string, inner *hcl.Block, err error) { return block.Labels[0], inner, nil } +type classifiedBlocks struct { + caches []*hcl.Block + metadata *hcl.Block + strategies []*hcl.Block +} + +func classifyBlocks(ast *hcl.AST) (*classifiedBlocks, error) { + result := &classifiedBlocks{ + // Always enable the default API strategy + strategies: []*hcl.Block{{Name: "apiv1"}}, + } + for _, node := range ast.Entries { + block, ok := node.(*hcl.Block) + if !ok { + return nil, errors.Errorf("%s: attributes are not allowed", node.Position()) + } + switch block.Name { + case "cache": + result.caches = append(result.caches, block) + case "metadata": + if result.metadata != nil { + return nil, errors.Errorf("%s: only one metadata block is allowed", block.Pos) + } + result.metadata = block + case "strategy": + _, inner, err := unwrapBlock(block) + if err != nil { + return nil, err + } + result.strategies = append(result.strategies, inner) + default: + return nil, errors.Errorf("%s: unknown block %q (expected \"cache\", \"metadata\", or \"strategy\")", block.Pos, block.Name) + } + } + return result, nil +} + // Load HCL configuration and use that to construct the cache backend, and proxy strategies. // It returns an http.Handler that wraps mux — any loaded strategies that implement // strategy.Interceptor are applied as middleware before ServeMux route matching, so @@ -110,51 +149,46 @@ func unwrapBlock(block *hcl.Block) (name string, inner *hcl.Block, err error) { func Load( ctx context.Context, cr *cache.Registry, + mr *metadatadb.Registry, sr *strategy.Registry, ast *hcl.AST, mux *http.ServeMux, vars map[string]string, -) (http.Handler, error) { +) (http.Handler, metadatadb.Backend, error) { logger := logging.FromContext(ctx) expandVars(ast, vars) - strategyCandidates := []*hcl.Block{ - // Always enable the default API strategy - {Name: "apiv1"}, + classified, err := classifyBlocks(ast) + if err != nil { + return nil, nil, err } - // First pass: collect cache backends and strategy candidates from prefixed blocks. var caches []cache.Cache - for _, node := range ast.Entries { - block, ok := node.(*hcl.Block) - if !ok { - return nil, errors.Errorf("%s: attributes are not allowed", node.Position()) + for _, block := range classified.caches { + name, inner, err := unwrapBlock(block) + if err != nil { + return nil, nil, err } - switch block.Name { - case "cache": - name, inner, err := unwrapBlock(block) - if err != nil { - return nil, err - } - c, err := cr.Create(ctx, name, inner, vars) - if err != nil { - return nil, errors.Errorf("%s: %w", block.Pos, err) - } - caches = append(caches, c) - - case "strategy": - _, inner, err := unwrapBlock(block) - if err != nil { - return nil, err - } - strategyCandidates = append(strategyCandidates, inner) - - default: - return nil, errors.Errorf("%s: unknown block %q (expected \"cache\" or \"strategy\")", block.Pos, block.Name) + c, err := cr.Create(ctx, name, inner, vars) + if err != nil { + return nil, nil, errors.Errorf("%s: %w", block.Pos, err) } + caches = append(caches, c) } if len(caches) == 0 { - return nil, errors.Errorf("%s: expected at least one cache backend", ast.Pos) + return nil, nil, errors.Errorf("%s: expected at least one cache backend", ast.Pos) + } + + var metadata metadatadb.Backend + if classified.metadata != nil { + name, inner, err := unwrapBlock(classified.metadata) + if err != nil { + return nil, nil, err + } + metadata, err = mr.Create(ctx, name, inner, vars) + if err != nil { + return nil, nil, errors.Errorf("%s: %w", classified.metadata.Pos, err) + } } cache := cache.MaybeNewTiered(ctx, caches) @@ -165,13 +199,13 @@ func Load( // Collect strategies that implement Interceptor separately — they need // to run before ServeMux route matching, not as mux routes. var interceptors []strategy.Interceptor - for _, block := range strategyCandidates { + for _, block := range classified.strategies { name := block.Name slogger := logger.With("strategy", name) mlog := &loggingMux{logger: slogger, mux: mux} s, err := sr.Create(ctx, name, block, cache, mlog, vars) if err != nil { - return nil, errors.Errorf("%s: %w", block.Pos, err) + return nil, nil, errors.Errorf("%s: %w", block.Pos, err) } if interceptor, ok := s.(strategy.Interceptor); ok { interceptors = append(interceptors, interceptor) @@ -184,7 +218,7 @@ func Load( for i := len(interceptors) - 1; i >= 0; i-- { h = interceptors[i].Intercept(h) } - return h, nil + return h, metadata, nil } // ExpandVars expands environment variable references in HCL strings and heredocs. diff --git a/internal/metadatadb/memory.go b/internal/metadatadb/memory.go index 16abe01..9bcc41b 100644 --- a/internal/metadatadb/memory.go +++ b/internal/metadatadb/memory.go @@ -8,6 +8,18 @@ import ( "github.com/alecthomas/errors" ) +// RegisterMemory registers the in-memory metadata backend. +func RegisterMemory(r *Registry) { + Register(r, "memory", "In-memory metadata store for testing and single-instance deployments", + func(_ context.Context, _ MemoryConfig) (*MemoryBackend, error) { + return NewMemoryBackend(), nil + }, + ) +} + +// MemoryConfig is the configuration for the in-memory metadata backend. +type MemoryConfig struct{} + // MemoryBackend is an in-memory Backend for testing and single-instance // deployments. Ops are applied directly — there is no sync or persistence. type MemoryBackend struct { diff --git a/internal/metadatadb/registry.go b/internal/metadatadb/registry.go new file mode 100644 index 0000000..143b0f1 --- /dev/null +++ b/internal/metadatadb/registry.go @@ -0,0 +1,88 @@ +package metadatadb + +import ( + "context" + "os" + + "github.com/alecthomas/errors" + "github.com/alecthomas/hcl/v2" +) + +// ErrNotFound is returned when a metadata backend is not found. +var ErrNotFound = errors.New("metadata backend not found") + +type registryEntry struct { + schema *hcl.Block + factory func(ctx context.Context, config *hcl.Block, vars map[string]string) (Backend, error) +} + +// Registry holds registered metadata backend factories. +type Registry struct { + registry map[string]registryEntry +} + +// NewRegistry creates a new metadata backend registry. +func NewRegistry() *Registry { + return &Registry{ + registry: make(map[string]registryEntry), + } +} + +// Factory is a function that creates a new Backend from the given hcl-tagged configuration struct. +type Factory[Config any, B Backend] func(ctx context.Context, config Config) (B, error) + +// Register a metadata backend factory function. +func Register[Config any, B Backend](r *Registry, id, description string, factory Factory[Config, B]) { + var c Config + schema, err := hcl.BlockSchema(id, &c) + if err != nil { + panic(err) + } + block := schema.Entries[0].(*hcl.Block) //nolint:errcheck // This seems spurious + block.Comments = hcl.CommentList{description} + r.registry[id] = registryEntry{ + schema: block, + factory: func(ctx context.Context, config *hcl.Block, vars map[string]string) (Backend, error) { + var cfg Config + transformer := func(defaultValue string) string { + return os.Expand(defaultValue, func(key string) string { return vars[key] }) + } + if err := hcl.UnmarshalBlock(config, &cfg, hcl.WithDefaultTransformer(transformer)); err != nil { + return nil, errors.WithStack(err) + } + return factory(ctx, cfg) + }, + } +} + +// Schema returns the schema for all registered metadata backends. +func (r *Registry) Schema() *hcl.AST { + ast := &hcl.AST{} + for _, entry := range r.registry { + wrapped := &hcl.Block{ + Name: "metadata", + Labels: append([]string{entry.schema.Name}, entry.schema.Labels...), + Body: entry.schema.Body, + Comments: entry.schema.Comments, + } + ast.Entries = append(ast.Entries, wrapped) + } + return ast +} + +// Exists returns true if a backend with the given name is registered. +func (r *Registry) Exists(name string) bool { + _, ok := r.registry[name] + return ok +} + +// Create a new Backend from the given name and configuration. +// +// Returns ErrNotFound if the backend is not found. +func (r *Registry) Create(ctx context.Context, name string, config *hcl.Block, vars map[string]string) (Backend, error) { + entry, ok := r.registry[name] + if !ok { + return nil, errors.Errorf("%s: %w", name, ErrNotFound) + } + return errors.WithStack2(entry.factory(ctx, config, vars)) +} diff --git a/internal/metadatadb/s3.go b/internal/metadatadb/s3.go index 5173010..c49de4b 100644 --- a/internal/metadatadb/s3.go +++ b/internal/metadatadb/s3.go @@ -4,22 +4,33 @@ import ( "bytes" "context" "encoding/json" - "log/slog" "net/http" "sync" "time" "github.com/alecthomas/errors" "github.com/minio/minio-go/v7" + + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/s3client" ) +// RegisterS3 registers the S3 metadata backend. The clientProvider supplies the +// shared minio client constructed from the global s3 config block. +func RegisterS3(r *Registry, clientProvider s3client.ClientProvider) { + Register(r, "s3", "Stores metadata state in S3 with periodic sync", + func(ctx context.Context, config S3BackendConfig) (*S3Backend, error) { + return NewS3Backend(ctx, clientProvider, config) + }, + ) +} + // S3Backend stores metadata state as JSON objects in S3 with periodic sync. // Writes are applied to local state immediately and queued for the next flush. // Locking uses a separate lock object with TTL-based expiry for stale lock // recovery. The idempotence token maps to the S3 object ETag. type S3Backend struct { client *minio.Client - logger *slog.Logger bucket string prefix string lockTTL time.Duration @@ -32,15 +43,13 @@ type S3Backend struct { // S3BackendConfig configures the S3 metadata backend. type S3BackendConfig struct { - Client *minio.Client - Logger *slog.Logger - Bucket string - Prefix string - LockTTL time.Duration - SyncInterval time.Duration + Bucket string `hcl:"bucket" help:"S3 bucket name."` + Prefix string `hcl:"prefix,optional" help:"Key prefix for metadata objects." default:"_meta"` + LockTTL time.Duration `hcl:"lock-ttl,optional" help:"TTL for namespace locks." default:"30s"` + SyncInterval time.Duration `hcl:"sync-interval,optional" help:"Interval between periodic syncs." default:"30s"` } -func NewS3Backend(ctx context.Context, config S3BackendConfig) (*S3Backend, error) { +func NewS3Backend(ctx context.Context, clientProvider s3client.ClientProvider, config S3BackendConfig) (*S3Backend, error) { if config.Prefix == "" { config.Prefix = "_meta" } @@ -50,11 +59,11 @@ func NewS3Backend(ctx context.Context, config S3BackendConfig) (*S3Backend, erro if config.SyncInterval == 0 { config.SyncInterval = 30 * time.Second } - logger := config.Logger - if logger == nil { - logger = slog.Default() + client, err := clientProvider() + if err != nil { + return nil, errors.Wrap(err, "create S3 client") } - exists, err := config.Client.BucketExists(ctx, config.Bucket) + exists, err := client.BucketExists(ctx, config.Bucket) if err != nil { return nil, errors.Errorf("failed to check if bucket exists: %w", err) } @@ -64,8 +73,7 @@ func NewS3Backend(ctx context.Context, config S3BackendConfig) (*S3Backend, erro ctx, cancel := context.WithCancel(ctx) return &S3Backend{ - client: config.Client, - logger: logger, + client: client, bucket: config.Bucket, prefix: config.Prefix, lockTTL: config.LockTTL, @@ -197,7 +205,7 @@ func (s *S3Backend) lockNamespace(ctx context.Context, namespace string) error { } if err := s.tryExpireStaleLock(ctx, key); err != nil { - s.logger.WarnContext(ctx, "stale lock check failed", "key", key, "error", err) + logging.FromContext(ctx).WarnContext(ctx, "stale lock check failed", "key", key, "error", err) } else { continue } @@ -263,7 +271,7 @@ func (n *s3Namespace) doSync(ctx context.Context) error { } defer func() { if err := n.backend.unlockNamespace(ctx, n.name); err != nil { - n.backend.logger.WarnContext(ctx, "unlock failed", "namespace", n.name, "error", err) + logging.FromContext(ctx).WarnContext(ctx, "unlock failed", "namespace", n.name, "error", err) } }() } @@ -333,16 +341,17 @@ func (n *s3Namespace) restorePending(ops []Op) { func (n *s3Namespace) syncLoop() { defer close(n.done) - logger := n.backend.logger.With("namespace", n.name) + ctx := n.backend.ctx + logger := logging.FromContext(ctx).With("namespace", n.name) ticker := time.NewTicker(n.backend.syncInterval) defer ticker.Stop() for { select { - case <-n.backend.ctx.Done(): + case <-ctx.Done(): return case <-ticker.C: - if err := n.doSync(n.backend.ctx); err != nil { - logger.WarnContext(n.backend.ctx, "sync failed", "error", err) + if err := n.doSync(ctx); err != nil { + logger.WarnContext(ctx, "sync failed", "error", err) } } } diff --git a/internal/metadatadb/s3_test.go b/internal/metadatadb/s3_test.go index 34ea82d..682ea04 100644 --- a/internal/metadatadb/s3_test.go +++ b/internal/metadatadb/s3_test.go @@ -1,13 +1,17 @@ package metadatadb_test import ( + "log/slog" "testing" "time" "github.com/alecthomas/assert/v2" + "github.com/minio/minio-go/v7" + "github.com/block/cachew/internal/logging" "github.com/block/cachew/internal/metadatadb" "github.com/block/cachew/internal/metadatadb/metadatadbtest" + "github.com/block/cachew/internal/s3client" "github.com/block/cachew/internal/s3client/s3clienttest" ) @@ -16,10 +20,10 @@ func TestS3Backend(t *testing.T) { metadatadbtest.Suite(t, func(t *testing.T, n int) []metadatadb.Backend { t.Helper() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) backends := make([]metadatadb.Backend, n) for i := range backends { - b, err := metadatadb.NewS3Backend(t.Context(), metadatadb.S3BackendConfig{ - Client: s3clienttest.Client(t), + b, err := metadatadb.NewS3Backend(ctx, s3client.ClientProvider(func() (*minio.Client, error) { return s3clienttest.Client(t), nil }), metadatadb.S3BackendConfig{ Bucket: bucket, Prefix: "_meta-" + t.Name(), LockTTL: 5 * time.Second, @@ -34,9 +38,9 @@ func TestS3Backend(t *testing.T) { func TestS3BackendSoak(t *testing.T) { bucket := s3clienttest.Start(t) + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) - b, err := metadatadb.NewS3Backend(t.Context(), metadatadb.S3BackendConfig{ - Client: s3clienttest.Client(t), + b, err := metadatadb.NewS3Backend(ctx, s3client.ClientProvider(func() (*minio.Client, error) { return s3clienttest.Client(t), nil }), metadatadb.S3BackendConfig{ Bucket: bucket, Prefix: "_meta-soak", LockTTL: 5 * time.Second, diff --git a/internal/strategy/api.go b/internal/strategy/api.go index f5ece82..6994e72 100644 --- a/internal/strategy/api.go +++ b/internal/strategy/api.go @@ -71,6 +71,7 @@ func (r *Registry) Schema() *hcl.AST { Labels: append([]string{entry.schema.Name}, entry.schema.Labels...), Body: entry.schema.Body, Comments: entry.schema.Comments, + Repeated: true, } ast.Entries = append(ast.Entries, wrapped) }