Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ linters:
- nilnesserr # reports that it checks for err != nil, but it returns a different nil value error (powered by nilness and nilerr)
- nilnil # checks that there is no simultaneous return of nil error and an invalid value
- noctx # finds sending http request without context.Context
- nonamedreturns # reports all named returns
#- nonamedreturns # reports all named returns
- nosprintfhostport # checks for misuse of Sprintf to construct a host with port in a URL
- perfsprint # checks that fmt.Sprintf can be replaced with a faster alternative
- predeclared # finds code that shadows one of Go's predeclared identifiers
Expand Down Expand Up @@ -363,7 +363,7 @@ linters:
nakedret:
# Make an issue if func has more lines of code than this setting, and it has naked returns.
# Default: 30
max-func-lines: 0
max-func-lines: 30

nolintlint:
# Exclude following linters from requiring an explanation.
Expand Down
1 change: 1 addition & 0 deletions internal/cache/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var _ Cache = (*Disk)(nil)
// evicted based on their last access time. TTLs are stored in a bbolt database. If an entry exceeds its
// TTL or the default, it is evicted. The implementation is safe for concurrent use within a single Go process.
func NewDisk(ctx context.Context, config DiskConfig) (*Disk, error) {
logging.FromContext(ctx).InfoContext(ctx, "Constructing disk cache", "limit-mb", config.LimitMB, "evict-interval", config.EvictInterval, "root", config.Root, "max-ttl", config.MaxTTL)
// Validate config
if config.Root == "" {
return nil, errors.New("root directory is required")
Expand Down
6 changes: 4 additions & 2 deletions internal/cache/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache_test
import (
"context"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -11,10 +12,11 @@ import (
"github.com/alecthomas/assert/v2"

"github.com/block/sfptc/internal/cache"
"github.com/block/sfptc/internal/logging"
)

func TestCachedFetch(t *testing.T) {
ctx := context.Background()
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
defer memCache.Close()
Expand Down Expand Up @@ -58,7 +60,7 @@ func TestCachedFetch(t *testing.T) {
}

func TestCachedFetchNonOKStatus(t *testing.T) {
ctx := context.Background()
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
defer memCache.Close()
Expand Down
5 changes: 4 additions & 1 deletion internal/cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"

"github.com/alecthomas/errors"

"github.com/block/sfptc/internal/logging"
)

func init() {
Expand All @@ -35,7 +37,8 @@ type Memory struct {
currentSize int64
}

func NewMemory(_ context.Context, config MemoryConfig) (*Memory, error) {
func NewMemory(ctx context.Context, config MemoryConfig) (*Memory, error) {
logging.FromContext(ctx).InfoContext(ctx, "Constructing in-memory Cache", "limit-mb", config.LimitMB, "max-ttl", config.MaxTTL)
return &Memory{
config: config,
entries: make(map[Key]*memoryEntry),
Expand Down
4 changes: 3 additions & 1 deletion internal/cache/memory_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package cache_test

import (
"log/slog"
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/block/sfptc/internal/cache"
"github.com/block/sfptc/internal/cache/cachetest"
"github.com/block/sfptc/internal/logging"
)

func TestMemoryCache(t *testing.T) {
cachetest.Suite(t, func(t *testing.T) cache.Cache {
ctx := t.Context()
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError})
c, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: 100 * time.Millisecond})
assert.NoError(t, err)
return c
Expand Down
146 changes: 146 additions & 0 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package cache

import (
"context"
"io"
"net/textproto"
"os"
"strings"
"sync"
"time"

"github.com/alecthomas/errors"

"github.com/block/sfptc/internal/logging"
)

// The Tiered cache combines multiple caches.
//
// It is not directly selectable from configuration, but instead is automatically used if multiple caches are
// configured.
type Tiered struct {
caches []Cache
}

// MaybeNewTiered creates a [Tiered] cache if multiple are provided, or if there is only one it will return that cache.
//
// If no caches are passed it will panic.
func MaybeNewTiered(ctx context.Context, caches []Cache) Cache {
logging.FromContext(ctx).InfoContext(ctx, "Constructing tiered cache", "tiers", len(caches))
if len(caches) == 0 {
panic("Tiered cache requires at least one backing cache")
}
if len(caches) == 1 {
return caches[0]
}
return Tiered{caches}
}

var _ Cache = (*Tiered)(nil)

// Close all underlying caches.
func (t Tiered) Close() error {
wg := sync.WaitGroup{}
errs := make([]error, len(t.caches))
for i, cache := range t.caches {
wg.Go(func() { errs[i] = errors.WithStack(cache.Close()) })
}
wg.Wait()
return errors.Join(errs...)
}

// Create a new object. All underlying caches will be written to in sequence.
func (t Tiered) Create(ctx context.Context, key Key, headers textproto.MIMEHeader, ttl time.Duration) (io.WriteCloser, error) {
// The first error will cancel all outstanding writes.
ctx, cancel := context.WithCancelCause(ctx)

tw := tieredWriter{make([]io.WriteCloser, len(t.caches)), cancel}
// Note: we can't use errgroup here because we do not want to cancel the context on Wait().
wg := sync.WaitGroup{}
for i, cache := range t.caches {
wg.Go(func() {
w, err := cache.Create(ctx, key, headers, ttl)
if err != nil {
cancel(err)
}
tw.writers[i] = w
})
}
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
select {
case <-done:
return tw, nil

case <-ctx.Done():
return nil, errors.WithStack(context.Cause(ctx))
}
}

// Delete from all underlying caches. All errors are returned.
func (t Tiered) Delete(ctx context.Context, key Key) error {
wg := sync.WaitGroup{}
errs := make([]error, len(t.caches))
for i, cache := range t.caches {
wg.Go(func() { errs[i] = errors.WithStack(cache.Delete(ctx, key)) })
}
wg.Wait()
return errors.Join(errs...)
}

// Open returns a reader from the first cache that succeeds.
//
// If all caches fail, all errors are returned.
func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MIMEHeader, error) {
errs := make([]error, len(t.caches))
for i, c := range t.caches {
r, headers, err := c.Open(ctx, key)
errs[i] = err
if errors.Is(err, os.ErrNotExist) {
continue
} else if err != nil {
return nil, nil, errors.WithStack(err)
}
return r, headers, nil
}
return nil, nil, errors.Join(errs...)
}

func (t Tiered) String() string {
names := make([]string, len(t.caches))
for i, c := range t.caches {
names[i] = c.String()
}
return "tiered:" + strings.Join(names, ",")
}

type tieredWriter struct {
writers []io.WriteCloser
cancel context.CancelCauseFunc
}

var _ io.WriteCloser = (*tieredWriter)(nil)

// Close all writers and return all errors.
func (t tieredWriter) Close() error {
wg := sync.WaitGroup{}
errs := make([]error, len(t.writers))
for i, cache := range t.writers {
wg.Go(func() { errs[i] = errors.WithStack(cache.Close()) })
}
wg.Wait()
return errors.Join(errs...)
}

func (t tieredWriter) Write(p []byte) (n int, err error) {
for _, cache := range t.writers {
n, err = cache.Write(p)
if err != nil {
if !errors.Is(err, context.Canceled) {
t.cancel(err)
}
return n, errors.WithStack(err)
}
}
return
}
23 changes: 23 additions & 0 deletions internal/cache/tiered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package cache_test

import (
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/block/sfptc/internal/cache"
"github.com/block/sfptc/internal/cache/cachetest"
"github.com/block/sfptc/internal/logging"
)

func TestTiered(t *testing.T) {
cachetest.Suite(t, func(t *testing.T) cache.Cache {
_, ctx := logging.Configure(t.Context(), logging.Config{})
memory, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
disk, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
return cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk})
})
}
6 changes: 3 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func Load(ctx context.Context, r io.Reader, mux *http.ServeMux, vars map[string]
return errors.Errorf("%s: attributes are not allowed", node.Pos)
}
}
if len(caches) != 1 {
return errors.Errorf("%s: expected exactly one cache backend, got %d", ast.Pos, len(caches))
if len(caches) == 0 {
return errors.Errorf("%s: expected at least one cache backend", ast.Pos)
}

cache := caches[0]
cache := cache.MaybeNewTiered(ctx, caches)

logger.DebugContext(ctx, "Cache backend", "cache", cache)

Expand Down
3 changes: 2 additions & 1 deletion internal/strategy/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ func TestHandlerMethodChaining(t *testing.T) {
}

func mustNewMemoryCache() cache.Cache {
c, err := cache.NewMemory(context.Background(), cache.MemoryConfig{
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
c, err := cache.NewMemory(ctx, cache.MemoryConfig{
MaxTTL: time.Hour,
})
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions sfptc.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github-releases {
private-orgs = ["alecthomas"]
}

memory {}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of tiering.


disk {
root = "./cache"
}