From aa6b6535d8a90610df103331cb864e7933c21d24 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Mon, 12 Jan 2026 21:06:33 +1100 Subject: [PATCH] feat: add support for cache tiering This allows multiple caches to be combined such that when reading, the first cache wins. A typical configuration would tier local disk caching followed by remote caching, hitting the local disk first before going to the remote cache server. --- .golangci.yml | 4 +- internal/cache/disk.go | 1 + internal/cache/http_test.go | 6 +- internal/cache/memory.go | 5 +- internal/cache/memory_test.go | 4 +- internal/cache/tiered.go | 146 ++++++++++++++++++++++ internal/cache/tiered_test.go | 23 ++++ internal/config/config.go | 6 +- internal/strategy/handler/handler_test.go | 3 +- sfptc.hcl | 2 + 10 files changed, 190 insertions(+), 10 deletions(-) create mode 100644 internal/cache/tiered.go create mode 100644 internal/cache/tiered_test.go diff --git a/.golangci.yml b/.golangci.yml index 0adf0fa..b64d8d0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -92,7 +92,7 @@ linters: - nilnesserr # reports that it checks for err != nil, but it returns a different nil value error (powered by nilness and nilerr) - nilnil # checks that there is no simultaneous return of nil error and an invalid value - noctx # finds sending http request without context.Context - - nonamedreturns # reports all named returns + #- nonamedreturns # reports all named returns - nosprintfhostport # checks for misuse of Sprintf to construct a host with port in a URL - perfsprint # checks that fmt.Sprintf can be replaced with a faster alternative - predeclared # finds code that shadows one of Go's predeclared identifiers @@ -363,7 +363,7 @@ linters: nakedret: # Make an issue if func has more lines of code than this setting, and it has naked returns. # Default: 30 - max-func-lines: 0 + max-func-lines: 30 nolintlint: # Exclude following linters from requiring an explanation. diff --git a/internal/cache/disk.go b/internal/cache/disk.go index 0de9058..1eebee0 100644 --- a/internal/cache/disk.go +++ b/internal/cache/disk.go @@ -48,6 +48,7 @@ var _ Cache = (*Disk)(nil) // evicted based on their last access time. TTLs are stored in a bbolt database. If an entry exceeds its // TTL or the default, it is evicted. The implementation is safe for concurrent use within a single Go process. func NewDisk(ctx context.Context, config DiskConfig) (*Disk, error) { + logging.FromContext(ctx).InfoContext(ctx, "Constructing disk cache", "limit-mb", config.LimitMB, "evict-interval", config.EvictInterval, "root", config.Root, "max-ttl", config.MaxTTL) // Validate config if config.Root == "" { return nil, errors.New("root directory is required") diff --git a/internal/cache/http_test.go b/internal/cache/http_test.go index 3a8d710..dccfebe 100644 --- a/internal/cache/http_test.go +++ b/internal/cache/http_test.go @@ -3,6 +3,7 @@ package cache_test import ( "context" "io" + "log/slog" "net/http" "net/http/httptest" "testing" @@ -11,10 +12,11 @@ import ( "github.com/alecthomas/assert/v2" "github.com/block/sfptc/internal/cache" + "github.com/block/sfptc/internal/logging" ) func TestCachedFetch(t *testing.T) { - ctx := context.Background() + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) assert.NoError(t, err) defer memCache.Close() @@ -58,7 +60,7 @@ func TestCachedFetch(t *testing.T) { } func TestCachedFetchNonOKStatus(t *testing.T) { - ctx := context.Background() + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) assert.NoError(t, err) defer memCache.Close() diff --git a/internal/cache/memory.go b/internal/cache/memory.go index f6e6c6a..4d315dc 100644 --- a/internal/cache/memory.go +++ b/internal/cache/memory.go @@ -11,6 +11,8 @@ import ( "time" "github.com/alecthomas/errors" + + "github.com/block/sfptc/internal/logging" ) func init() { @@ -35,7 +37,8 @@ type Memory struct { currentSize int64 } -func NewMemory(_ context.Context, config MemoryConfig) (*Memory, error) { +func NewMemory(ctx context.Context, config MemoryConfig) (*Memory, error) { + logging.FromContext(ctx).InfoContext(ctx, "Constructing in-memory Cache", "limit-mb", config.LimitMB, "max-ttl", config.MaxTTL) return &Memory{ config: config, entries: make(map[Key]*memoryEntry), diff --git a/internal/cache/memory_test.go b/internal/cache/memory_test.go index 953e62c..e125704 100644 --- a/internal/cache/memory_test.go +++ b/internal/cache/memory_test.go @@ -1,6 +1,7 @@ package cache_test import ( + "log/slog" "testing" "time" @@ -8,11 +9,12 @@ import ( "github.com/block/sfptc/internal/cache" "github.com/block/sfptc/internal/cache/cachetest" + "github.com/block/sfptc/internal/logging" ) func TestMemoryCache(t *testing.T) { cachetest.Suite(t, func(t *testing.T) cache.Cache { - ctx := t.Context() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) c, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: 100 * time.Millisecond}) assert.NoError(t, err) return c diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go new file mode 100644 index 0000000..fd1938e --- /dev/null +++ b/internal/cache/tiered.go @@ -0,0 +1,146 @@ +package cache + +import ( + "context" + "io" + "net/textproto" + "os" + "strings" + "sync" + "time" + + "github.com/alecthomas/errors" + + "github.com/block/sfptc/internal/logging" +) + +// The Tiered cache combines multiple caches. +// +// It is not directly selectable from configuration, but instead is automatically used if multiple caches are +// configured. +type Tiered struct { + caches []Cache +} + +// MaybeNewTiered creates a [Tiered] cache if multiple are provided, or if there is only one it will return that cache. +// +// If no caches are passed it will panic. +func MaybeNewTiered(ctx context.Context, caches []Cache) Cache { + logging.FromContext(ctx).InfoContext(ctx, "Constructing tiered cache", "tiers", len(caches)) + if len(caches) == 0 { + panic("Tiered cache requires at least one backing cache") + } + if len(caches) == 1 { + return caches[0] + } + return Tiered{caches} +} + +var _ Cache = (*Tiered)(nil) + +// Close all underlying caches. +func (t Tiered) Close() error { + wg := sync.WaitGroup{} + errs := make([]error, len(t.caches)) + for i, cache := range t.caches { + wg.Go(func() { errs[i] = errors.WithStack(cache.Close()) }) + } + wg.Wait() + return errors.Join(errs...) +} + +// Create a new object. All underlying caches will be written to in sequence. +func (t Tiered) Create(ctx context.Context, key Key, headers textproto.MIMEHeader, ttl time.Duration) (io.WriteCloser, error) { + // The first error will cancel all outstanding writes. + ctx, cancel := context.WithCancelCause(ctx) + + tw := tieredWriter{make([]io.WriteCloser, len(t.caches)), cancel} + // Note: we can't use errgroup here because we do not want to cancel the context on Wait(). + wg := sync.WaitGroup{} + for i, cache := range t.caches { + wg.Go(func() { + w, err := cache.Create(ctx, key, headers, ttl) + if err != nil { + cancel(err) + } + tw.writers[i] = w + }) + } + done := make(chan struct{}) + go func() { wg.Wait(); close(done) }() + select { + case <-done: + return tw, nil + + case <-ctx.Done(): + return nil, errors.WithStack(context.Cause(ctx)) + } +} + +// Delete from all underlying caches. All errors are returned. +func (t Tiered) Delete(ctx context.Context, key Key) error { + wg := sync.WaitGroup{} + errs := make([]error, len(t.caches)) + for i, cache := range t.caches { + wg.Go(func() { errs[i] = errors.WithStack(cache.Delete(ctx, key)) }) + } + wg.Wait() + return errors.Join(errs...) +} + +// Open returns a reader from the first cache that succeeds. +// +// If all caches fail, all errors are returned. +func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MIMEHeader, error) { + errs := make([]error, len(t.caches)) + for i, c := range t.caches { + r, headers, err := c.Open(ctx, key) + errs[i] = err + if errors.Is(err, os.ErrNotExist) { + continue + } else if err != nil { + return nil, nil, errors.WithStack(err) + } + return r, headers, nil + } + return nil, nil, errors.Join(errs...) +} + +func (t Tiered) String() string { + names := make([]string, len(t.caches)) + for i, c := range t.caches { + names[i] = c.String() + } + return "tiered:" + strings.Join(names, ",") +} + +type tieredWriter struct { + writers []io.WriteCloser + cancel context.CancelCauseFunc +} + +var _ io.WriteCloser = (*tieredWriter)(nil) + +// Close all writers and return all errors. +func (t tieredWriter) Close() error { + wg := sync.WaitGroup{} + errs := make([]error, len(t.writers)) + for i, cache := range t.writers { + wg.Go(func() { errs[i] = errors.WithStack(cache.Close()) }) + } + wg.Wait() + return errors.Join(errs...) +} + +func (t tieredWriter) Write(p []byte) (n int, err error) { + for _, cache := range t.writers { + n, err = cache.Write(p) + if err != nil { + if !errors.Is(err, context.Canceled) { + t.cancel(err) + } + return n, errors.WithStack(err) + } + } + return +} diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go new file mode 100644 index 0000000..8abab5b --- /dev/null +++ b/internal/cache/tiered_test.go @@ -0,0 +1,23 @@ +package cache_test + +import ( + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/sfptc/internal/cache" + "github.com/block/sfptc/internal/cache/cachetest" + "github.com/block/sfptc/internal/logging" +) + +func TestTiered(t *testing.T) { + cachetest.Suite(t, func(t *testing.T) cache.Cache { + _, ctx := logging.Configure(t.Context(), logging.Config{}) + memory, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + disk, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + return cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk}) + }) +} diff --git a/internal/config/config.go b/internal/config/config.go index e686edf..c410a3d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -66,11 +66,11 @@ func Load(ctx context.Context, r io.Reader, mux *http.ServeMux, vars map[string] return errors.Errorf("%s: attributes are not allowed", node.Pos) } } - if len(caches) != 1 { - return errors.Errorf("%s: expected exactly one cache backend, got %d", ast.Pos, len(caches)) + if len(caches) == 0 { + return errors.Errorf("%s: expected at least one cache backend", ast.Pos) } - cache := caches[0] + cache := cache.MaybeNewTiered(ctx, caches) logger.DebugContext(ctx, "Cache backend", "cache", cache) diff --git a/internal/strategy/handler/handler_test.go b/internal/strategy/handler/handler_test.go index cb9f4ef..b82bd6c 100644 --- a/internal/strategy/handler/handler_test.go +++ b/internal/strategy/handler/handler_test.go @@ -296,7 +296,8 @@ func TestHandlerMethodChaining(t *testing.T) { } func mustNewMemoryCache() cache.Cache { - c, err := cache.NewMemory(context.Background(), cache.MemoryConfig{ + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + c, err := cache.NewMemory(ctx, cache.MemoryConfig{ MaxTTL: time.Hour, }) if err != nil { diff --git a/sfptc.hcl b/sfptc.hcl index 8047789..b376634 100644 --- a/sfptc.hcl +++ b/sfptc.hcl @@ -12,6 +12,8 @@ github-releases { private-orgs = ["alecthomas"] } +memory {} + disk { root = "./cache" }