Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/block/cachew/internal/httputil"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/metadatadb"
"github.com/block/cachew/internal/metrics"
"github.com/block/cachew/internal/opa"
"github.com/block/cachew/internal/reaper"
Expand Down Expand Up @@ -74,23 +75,23 @@ func main() {

// Start initialising
tokenManagerProvider := githubapp.NewTokenManagerProvider(globalConfig.GithubAppConfigs, logger)
managerProvider := gitclone.NewManagerProvider(ctx, globalConfig.GitCloneConfig, func() (gitclone.CredentialProvider, error) {
gitManagerProvider := gitclone.NewManagerProvider(ctx, globalConfig.GitCloneConfig, func() (gitclone.CredentialProvider, error) {
return tokenManagerProvider()
})
s3ClientProvider := s3client.NewClientProvider(ctx, globalConfig.S3Config)

schedulerProvider := jobscheduler.NewProvider(ctx, globalConfig.SchedulerConfig)

cr, sr := newRegistries(schedulerProvider, managerProvider, tokenManagerProvider, s3ClientProvider)
cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider)

// Commands
switch { //nolint:gocritic
case cli.Schema:
printSchema(kctx, cr, sr)
printSchema(kctx, cr, mr, sr)
return
}

mux, err := newMux(ctx, cr, sr, providersConfigHCL, envars)
mux, err := newMux(ctx, cr, mr, sr, providersConfigHCL, envars)
fatalIfError(ctx, logger, err, "Failed to load config")

metricsClient, err := metrics.New(ctx, globalConfig.MetricsConfig)
Expand All @@ -114,12 +115,25 @@ func main() {
fatalIfError(ctx, logger, err, "Server stopped")
}

func newRegistries(scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider, s3ClientProvider s3client.ClientProvider) (*cache.Registry, *strategy.Registry) {
func newRegistries(
scheduler jobscheduler.Provider,
cloneManagerProvider gitclone.ManagerProvider,
tokenManagerProvider githubapp.TokenManagerProvider,
s3ClientProvider s3client.ClientProvider,
) (
*cache.Registry,
*metadatadb.Registry,
*strategy.Registry,
) {
cr := cache.NewRegistry()
cache.RegisterMemory(cr)
cache.RegisterDisk(cr)
cache.RegisterS3(cr, s3ClientProvider)

mr := metadatadb.NewRegistry()
metadatadb.RegisterMemory(mr)
metadatadb.RegisterS3(mr, s3ClientProvider)

sr := strategy.NewRegistry()
strategy.RegisterAPIV1(sr)
strategy.RegisterArtifactory(sr)
Expand All @@ -130,11 +144,11 @@ func newRegistries(scheduler jobscheduler.Provider, cloneManagerProvider gitclon
git.Register(sr, scheduler, cloneManagerProvider, tokenManagerProvider)
gomod.Register(sr, cloneManagerProvider)

return cr, sr
return cr, mr, sr
}

func printSchema(kctx *kong.Context, cr *cache.Registry, sr *strategy.Registry) {
schema := config.Schema[GlobalConfig](cr, sr)
func printSchema(kctx *kong.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry) {
schema := config.Schema[GlobalConfig](cr, mr, sr)
text, err := hcl.MarshalAST(schema)
kctx.FatalIfErrorf(err)

Expand All @@ -146,7 +160,7 @@ func printSchema(kctx *kong.Context, cr *cache.Registry, sr *strategy.Registry)
}
}

func newMux(ctx context.Context, cr *cache.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string) (http.Handler, error) {
func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string) (http.Handler, error) {
mux := http.NewServeMux()

mux.HandleFunc("GET /_liveness", func(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -179,7 +193,7 @@ func newMux(ctx context.Context, cr *cache.Registry, sr *strategy.Registry, prov
http.DefaultServeMux.ServeHTTP(w, r)
}))

handler, err := config.Load(ctx, cr, sr, providersConfigHCL, mux, vars)
handler, _, err := config.Load(ctx, cr, mr, sr, providersConfigHCL, mux, vars)
if err != nil {
return nil, errors.Errorf("load config: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/cache/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (r *Registry) Schema() *hcl.AST {
Labels: append([]string{entry.schema.Name}, entry.schema.Labels...),
Body: entry.schema.Body,
Comments: entry.schema.Comments,
Repeated: true,
}
ast.Entries = append(ast.Entries, wrapped)
}
Expand Down
110 changes: 72 additions & 38 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/metadatadb"
"github.com/block/cachew/internal/strategy"
_ "github.com/block/cachew/internal/strategy/git" // Register git strategy
_ "github.com/block/cachew/internal/strategy/gomod" // Register gomod strategy
Expand All @@ -39,14 +40,15 @@ func (l *loggingMux) HandleFunc(pattern string, handler func(http.ResponseWriter
var _ strategy.Mux = (*loggingMux)(nil)

// Schema returns the configuration file schema.
func Schema[GlobalConfig any](cr *cache.Registry, sr *strategy.Registry) *hcl.AST {
func Schema[GlobalConfig any](cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry) *hcl.AST {
globalSchema, err := hcl.Schema(new(GlobalConfig))
if err != nil {
panic(err)
}
return &hcl.AST{
Entries: append(globalSchema.Entries, append(sr.Schema().Entries, cr.Schema().Entries...)...),
}
globalSchema.Entries = append(globalSchema.Entries, sr.Schema().Entries...)
globalSchema.Entries = append(globalSchema.Entries, cr.Schema().Entries...)
globalSchema.Entries = append(globalSchema.Entries, mr.Schema().Entries...)
return globalSchema
}

// Split configuration into global config and provider-specific config.
Expand Down Expand Up @@ -103,58 +105,90 @@ func unwrapBlock(block *hcl.Block) (name string, inner *hcl.Block, err error) {
return block.Labels[0], inner, nil
}

type classifiedBlocks struct {
caches []*hcl.Block
metadata *hcl.Block
strategies []*hcl.Block
}

func classifyBlocks(ast *hcl.AST) (*classifiedBlocks, error) {
result := &classifiedBlocks{
// Always enable the default API strategy
strategies: []*hcl.Block{{Name: "apiv1"}},
}
for _, node := range ast.Entries {
block, ok := node.(*hcl.Block)
if !ok {
return nil, errors.Errorf("%s: attributes are not allowed", node.Position())
}
switch block.Name {
case "cache":
result.caches = append(result.caches, block)
case "metadata":
if result.metadata != nil {
return nil, errors.Errorf("%s: only one metadata block is allowed", block.Pos)
}
result.metadata = block
case "strategy":
_, inner, err := unwrapBlock(block)
if err != nil {
return nil, err
}
result.strategies = append(result.strategies, inner)
default:
return nil, errors.Errorf("%s: unknown block %q (expected \"cache\", \"metadata\", or \"strategy\")", block.Pos, block.Name)
}
}
return result, nil
}

// Load HCL configuration and use that to construct the cache backend, and proxy strategies.
// It returns an http.Handler that wraps mux — any loaded strategies that implement
// strategy.Interceptor are applied as middleware before ServeMux route matching, so
// that they can inspect r.RequestURI rather than the path-only r.URL.Path.
func Load(
ctx context.Context,
cr *cache.Registry,
mr *metadatadb.Registry,
sr *strategy.Registry,
ast *hcl.AST,
mux *http.ServeMux,
vars map[string]string,
) (http.Handler, error) {
) (http.Handler, metadatadb.Backend, error) {
logger := logging.FromContext(ctx)
expandVars(ast, vars)

strategyCandidates := []*hcl.Block{
// Always enable the default API strategy
{Name: "apiv1"},
classified, err := classifyBlocks(ast)
if err != nil {
return nil, nil, err
}

// First pass: collect cache backends and strategy candidates from prefixed blocks.
var caches []cache.Cache
for _, node := range ast.Entries {
block, ok := node.(*hcl.Block)
if !ok {
return nil, errors.Errorf("%s: attributes are not allowed", node.Position())
for _, block := range classified.caches {
name, inner, err := unwrapBlock(block)
if err != nil {
return nil, nil, err
}
switch block.Name {
case "cache":
name, inner, err := unwrapBlock(block)
if err != nil {
return nil, err
}
c, err := cr.Create(ctx, name, inner, vars)
if err != nil {
return nil, errors.Errorf("%s: %w", block.Pos, err)
}
caches = append(caches, c)

case "strategy":
_, inner, err := unwrapBlock(block)
if err != nil {
return nil, err
}
strategyCandidates = append(strategyCandidates, inner)

default:
return nil, errors.Errorf("%s: unknown block %q (expected \"cache\" or \"strategy\")", block.Pos, block.Name)
c, err := cr.Create(ctx, name, inner, vars)
if err != nil {
return nil, nil, errors.Errorf("%s: %w", block.Pos, err)
}
caches = append(caches, c)
}
if len(caches) == 0 {
return nil, errors.Errorf("%s: expected at least one cache backend", ast.Pos)
return nil, nil, errors.Errorf("%s: expected at least one cache backend", ast.Pos)
}

var metadata metadatadb.Backend
if classified.metadata != nil {
name, inner, err := unwrapBlock(classified.metadata)
if err != nil {
return nil, nil, err
}
metadata, err = mr.Create(ctx, name, inner, vars)
if err != nil {
return nil, nil, errors.Errorf("%s: %w", classified.metadata.Pos, err)
}
}

cache := cache.MaybeNewTiered(ctx, caches)
Expand All @@ -165,13 +199,13 @@ func Load(
// Collect strategies that implement Interceptor separately — they need
// to run before ServeMux route matching, not as mux routes.
var interceptors []strategy.Interceptor
for _, block := range strategyCandidates {
for _, block := range classified.strategies {
name := block.Name
slogger := logger.With("strategy", name)
mlog := &loggingMux{logger: slogger, mux: mux}
s, err := sr.Create(ctx, name, block, cache, mlog, vars)
if err != nil {
return nil, errors.Errorf("%s: %w", block.Pos, err)
return nil, nil, errors.Errorf("%s: %w", block.Pos, err)
}
if interceptor, ok := s.(strategy.Interceptor); ok {
interceptors = append(interceptors, interceptor)
Expand All @@ -184,7 +218,7 @@ func Load(
for i := len(interceptors) - 1; i >= 0; i-- {
h = interceptors[i].Intercept(h)
}
return h, nil
return h, metadata, nil
}

// ExpandVars expands environment variable references in HCL strings and heredocs.
Expand Down
12 changes: 12 additions & 0 deletions internal/metadatadb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ import (
"github.com/alecthomas/errors"
)

// RegisterMemory registers the in-memory metadata backend.
func RegisterMemory(r *Registry) {
Register(r, "memory", "In-memory metadata store for testing and single-instance deployments",
func(_ context.Context, _ MemoryConfig) (*MemoryBackend, error) {
return NewMemoryBackend(), nil
},
)
}

// MemoryConfig is the configuration for the in-memory metadata backend.
type MemoryConfig struct{}

// MemoryBackend is an in-memory Backend for testing and single-instance
// deployments. Ops are applied directly — there is no sync or persistence.
type MemoryBackend struct {
Expand Down
88 changes: 88 additions & 0 deletions internal/metadatadb/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package metadatadb

import (
"context"
"os"

"github.com/alecthomas/errors"
"github.com/alecthomas/hcl/v2"
)

// ErrNotFound is returned when a metadata backend is not found.
var ErrNotFound = errors.New("metadata backend not found")

type registryEntry struct {
schema *hcl.Block
factory func(ctx context.Context, config *hcl.Block, vars map[string]string) (Backend, error)
}

// Registry holds registered metadata backend factories.
type Registry struct {
registry map[string]registryEntry
}

// NewRegistry creates a new metadata backend registry.
func NewRegistry() *Registry {
return &Registry{
registry: make(map[string]registryEntry),
}
}

// Factory is a function that creates a new Backend from the given hcl-tagged configuration struct.
type Factory[Config any, B Backend] func(ctx context.Context, config Config) (B, error)

// Register a metadata backend factory function.
func Register[Config any, B Backend](r *Registry, id, description string, factory Factory[Config, B]) {
var c Config
schema, err := hcl.BlockSchema(id, &c)
if err != nil {
panic(err)
}
block := schema.Entries[0].(*hcl.Block) //nolint:errcheck // This seems spurious
block.Comments = hcl.CommentList{description}
r.registry[id] = registryEntry{
schema: block,
factory: func(ctx context.Context, config *hcl.Block, vars map[string]string) (Backend, error) {
var cfg Config
transformer := func(defaultValue string) string {
return os.Expand(defaultValue, func(key string) string { return vars[key] })
}
if err := hcl.UnmarshalBlock(config, &cfg, hcl.WithDefaultTransformer(transformer)); err != nil {
return nil, errors.WithStack(err)
}
return factory(ctx, cfg)
},
}
}

// Schema returns the schema for all registered metadata backends.
func (r *Registry) Schema() *hcl.AST {
ast := &hcl.AST{}
for _, entry := range r.registry {
wrapped := &hcl.Block{
Name: "metadata",
Labels: append([]string{entry.schema.Name}, entry.schema.Labels...),
Body: entry.schema.Body,
Comments: entry.schema.Comments,
}
ast.Entries = append(ast.Entries, wrapped)
}
return ast
}

// Exists returns true if a backend with the given name is registered.
func (r *Registry) Exists(name string) bool {
_, ok := r.registry[name]
return ok
}

// Create a new Backend from the given name and configuration.
//
// Returns ErrNotFound if the backend is not found.
func (r *Registry) Create(ctx context.Context, name string, config *hcl.Block, vars map[string]string) (Backend, error) {
entry, ok := r.registry[name]
if !ok {
return nil, errors.Errorf("%s: %w", name, ErrNotFound)
}
return errors.WithStack2(entry.factory(ctx, config, vars))
}
Loading