Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,145 changes: 765 additions & 380 deletions README.md

Large diffs are not rendered by default.

18 changes: 12 additions & 6 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) {
evictionPercentage := 10
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
synchronousRefreshDelay := time.Minute * 30
refreshRetryInterval := time.Millisecond * 10
batchSize := 10
batchBufferTimeout := time.Minute
Expand All @@ -34,7 +35,7 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) {
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -86,6 +87,7 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {
ttl := time.Hour
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
synchronousRefreshDelay := time.Minute * 30
refreshRetryInterval := time.Millisecond * 10
batchSize := 10
batchBufferTimeout := time.Minute
Expand All @@ -100,7 +102,7 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -180,6 +182,7 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) {
evictionPercentage := 10
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
synchronousRefreshDelay := time.Minute * 30
refreshRetryInterval := time.Millisecond * 10
batchSize := 10
batchBufferTimeout := time.Minute
Expand All @@ -194,7 +197,7 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) {
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -250,6 +253,7 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {
evictionPercentage := 15
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
synchronousRefreshDelay := time.Minute * 30
refreshRetryInterval := time.Millisecond * 10
batchSize := 5
batchBufferTimeout := time.Minute
Expand All @@ -264,7 +268,7 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {
// 2. The 'batchBufferTimeout' threshold is exceeded.
c := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -339,6 +343,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {
evictionPercentage := 23
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
synchronousRefreshDelay := time.Minute * 30
refreshRetryInterval := time.Millisecond * 10
batchSize := 5
batchBufferTimeout := time.Minute
Expand All @@ -353,7 +358,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -401,6 +406,7 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) {
evictionPercentage := 10
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
synchronousRefreshDelay := time.Minute * 50
refreshRetryInterval := time.Millisecond * 10
batchSize := 10
batchBufferTimeout := time.Minute
Expand All @@ -415,7 +421,7 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) {
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down
19 changes: 10 additions & 9 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type Config struct {
metricsRecorder DistributedMetricsRecorder
log Logger

refreshInBackground bool
minRefreshTime time.Duration
maxRefreshTime time.Duration
earlyRefreshes bool
minAsyncRefreshTime time.Duration
maxAsyncRefreshTime time.Duration
syncRefreshTime time.Duration
retryBaseDelay time.Duration
storeMissingRecords bool

Expand Down Expand Up @@ -127,11 +128,11 @@ func (c *Client[T]) getShard(key string) *shard[T] {
// getWithState retrieves a single value from the cache and returns additional
// information about the state of the record. The state includes whether the record
// exists, if it has been marked as missing, and if it is due for a refresh.
func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, refresh bool) {
func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, backgroundRefresh, synchronousRefresh bool) {
shard := c.getShard(key)
val, exists, markedAsMissing, refresh := shard.get(key)
c.reportCacheHits(exists, markedAsMissing, refresh)
return val, exists, markedAsMissing, refresh
val, exists, markedAsMissing, backgroundRefresh, synchronousRefresh := shard.get(key)
c.reportCacheHits(exists, markedAsMissing, backgroundRefresh, synchronousRefresh)
return val, exists, markedAsMissing, backgroundRefresh, synchronousRefresh
}

// Get retrieves a single value from the cache.
Expand All @@ -145,8 +146,8 @@ func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing,
// The value corresponding to the key and a boolean indicating if the value was found.
func (c *Client[T]) Get(key string) (T, bool) {
shard := c.getShard(key)
val, ok, markedAsMissing, refresh := shard.get(key)
c.reportCacheHits(ok, markedAsMissing, refresh)
val, ok, markedAsMissing, backgroundRefresh, synchronousRefresh := shard.get(key)
c.reportCacheHits(ok, markedAsMissing, backgroundRefresh, synchronousRefresh)
return val, ok && !markedAsMissing
}

Expand Down
28 changes: 15 additions & 13 deletions distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestDistributedStorage(t *testing.T) {
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.Clear()

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecord(t, key)
distributedStorage.assertGetCount(t, 1)
Expand All @@ -177,7 +177,7 @@ func TestDistributedStorage(t *testing.T) {
t.Errorf("expected valuekey1, got %s", res)
}

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
fetchObserver.AssertFetchCount(t, 1)
distributedStorage.assertGetCount(t, 2)
Expand Down Expand Up @@ -411,7 +411,7 @@ func TestDistributedStorageBatch(t *testing.T) {
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.Clear()

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, firstBatchOfIDs, keyFn)
distributedStorage.assertGetCount(t, 1)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestDistributedStorageBatch(t *testing.T) {
fetchObserver.AssertRequestedRecords(t, []string{"4", "5", "6"})
fetchObserver.AssertFetchCount(t, 2)

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, secondBatchOfIDs, keyFn)
distributedStorage.assertGetCount(t, 2)
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestDistributedStaleStorageBatch(t *testing.T) {
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.Clear()

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, firstBatchOfIDs, keyFn)
distributedStorage.assertGetCount(t, 1)
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestDistributedStorageBatchDeletes(t *testing.T) {
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.Clear()

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, batchOfIDs, keyFn)
distributedStorage.assertGetCount(t, 1)
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestDistributedStorageBatchDeletes(t *testing.T) {
fetchObserver.AssertRequestedRecords(t, batchOfIDs)
fetchObserver.AssertFetchCount(t, 2)

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, []string{"1", "2"}, keyFn)
distributedStorage.assertGetCount(t, 2)
Expand Down Expand Up @@ -615,7 +615,7 @@ func TestDistributedStorageBatchConvertsToMissingRecord(t *testing.T) {
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.Clear()

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, batchOfIDs, keyFn)
distributedStorage.assertGetCount(t, 1)
Expand Down Expand Up @@ -648,7 +648,7 @@ func TestDistributedStorageBatchConvertsToMissingRecord(t *testing.T) {
fetchObserver.AssertFetchCount(t, 2)
fetchObserver.Clear()

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, []string{"1", "2"}, keyFn)
distributedStorage.assertGetCount(t, 2)
Expand All @@ -675,7 +675,8 @@ func TestDistributedStorageBatchConvertsToMissingRecord(t *testing.T) {
fetchObserver.AssertRequestedRecords(t, batchOfIDs)
fetchObserver.AssertFetchCount(t, 3)

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, batchOfIDs, keyFn)
distributedStorage.assertGetCount(t, 3)
Expand Down Expand Up @@ -727,7 +728,7 @@ func TestDistributedStorageDoesNotCachePartialResponseAsMissingRecords(t *testin
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.Clear()

// The keys are written asynchonously to the distributed storage.
// The keys are written asynchronously to the distributed storage.
time.Sleep(100 * time.Millisecond)
distributedStorage.assertRecords(t, batchOfIDs, keyFn)
distributedStorage.assertGetCount(t, 1)
Expand Down Expand Up @@ -779,6 +780,7 @@ func TestPartialResponseForRefreshesDoesNotResultInMissingRecords(t *testing.T)
ttl := time.Hour
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
synchronousRefreshDelay := time.Minute * 30
refreshRetryInterval := time.Millisecond * 10
batchSize := 10
batchBufferTimeout := time.Minute
Expand All @@ -788,7 +790,7 @@ func TestPartialResponseForRefreshesDoesNotResultInMissingRecords(t *testing.T)

c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithDistributedStorageEarlyRefreshes(distributedStorage, refreshAfter),
Expand Down Expand Up @@ -816,7 +818,7 @@ func TestPartialResponseForRefreshesDoesNotResultInMissingRecords(t *testing.T)
fetchObserver.AssertRequestedRecords(t, ids)
fetchObserver.Clear()

// We need to add a sleep because the keys are written asynchonously to the
// We need to add a sleep because the keys are written asynchronously to the
// distributed storage. We expect that the distributed storage was queried
// for the ids before we went to the underlying data source, and then written
// to when it resulted in a cache miss and the data was in fact fetched.
Expand Down
12 changes: 8 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ var (
// ErrMissingRecord is returned by client.GetOrFetch and client.Passthrough when a record has been marked
// as missing. The cache will still try to refresh the record in the background if it's being requested.
ErrMissingRecord = errors.New("sturdyc: the record has been marked as missing in the cache")
// ErrOnlyCachedRecords is returned by client.GetOrFetchBatch and client.PassthroughBatch
// when some of the requested records are available in the cache, but the attempt to
// fetch the remaining records failed. As the consumer, you can then decide whether to
// proceed with the cached records or if the entire batch is necessary.
// ErrOnlyCachedRecords is returned by client.GetOrFetchBatch and
// client.PassthroughBatch when some of the requested records are available
// in the cache, but the attempt to fetch the remaining records failed. It
// may also be returned when you're using the WithEarlyRefreshes
// functionality, and the call to synchronously refresh a record failed. The
// cache will then give you the latest data it has cached, and you as the
// consumer can then decide whether to proceed with the cached records or if
// the newest data is necessary.
ErrOnlyCachedRecords = errors.New("sturdyc: failed to fetch the records that were not in the cache")
// ErrInvalidType is returned when you try to use one of the generic
// package level functions but the type assertion fails.
Expand Down
6 changes: 4 additions & 2 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ func demonstrateGetOrFetchBatch(cacheClient *sturdyc.Client[int]) {
{"11", "12", "13", "14", "15"},
}

// We'll use a cache key function to add a prefix to the IDs. If we only used
// the IDs, we wouldn't be able to fetch the same IDs from multiple data sources.
// We are going to pass a cache a key function that prefixes each id with
// the string "my-data-source", and adds an -ID- separator before the actual
// id. This makes it possible to save the same id for different data
// sources as the keys would look something like this: my-data-source-ID-1
keyPrefixFn := cacheClient.BatchKeyFn("my-data-source")

// Request the keys for each batch.
Expand Down
10 changes: 7 additions & 3 deletions examples/batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ func NewAPI(c *sturdyc.Client[string]) *API {
}

func (a *API) GetBatch(ctx context.Context, ids []string) (map[string]string, error) {
// We are going to pass the cache a key function that prefixes each id.
// This makes it possible to save the same id for different data sources.
// We are going to pass a cache a key function that prefixes each id with
// the string "some-prefix", and adds an -ID- separator before the actual
// id. This makes it possible to save the same id for different data
// sources as the keys would look something like this: some-prefix-ID-1
cacheKeyFn := a.BatchKeyFn("some-prefix")

// The fetchFn is only going to retrieve the IDs that are not in the cache.
Expand Down Expand Up @@ -54,12 +56,14 @@ func main() {
// used to spread out the refreshes for entries evenly over time.
minRefreshDelay := time.Second
maxRefreshDelay := time.Second * 2
// Set a synchronous refresh delay for when we want a refresh to happen synchronously.
synchronousRefreshDelay := time.Second * 30
// The base for exponential backoff when retrying a refresh.
retryBaseDelay := time.Millisecond * 10

// Create a cache client with the specified configuration.
cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay),
)

// Create a new API instance with the cache client.
Expand Down
6 changes: 4 additions & 2 deletions examples/buffering/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewOrderAPI(client *sturdyc.Client[string]) *OrderAPI {
}

func (a *OrderAPI) OrderStatus(ctx context.Context, ids []string, opts OrderOptions) (map[string]string, error) {
// We use the PermutedBatchKeyFn when an ID isn't enough to uniquely identify a
// We use the PermutedBatchKeyFn when an ID isn't enough to uniquely identify a
// record. The cache is going to store each id once per set of options. In a more
// realistic scenario, the opts would be query params or arguments to a DB query.
cacheKeyFn := a.PermutatedBatchKeyFn("key", opts)
Expand Down Expand Up @@ -56,6 +56,8 @@ func main() {
// used to spread out the refreshes for entries evenly over time.
minRefreshDelay := time.Second
maxRefreshDelay := time.Second * 2
// Set a synchronous refresh delay for when we want a refresh to happen synchronously.
synchronousRefreshDelay := time.Second * 30
// The base for exponential backoff when retrying a refresh.
retryBaseDelay := time.Millisecond * 10
// Whether to store misses in the sturdyc. This can be useful to
Expand All @@ -68,7 +70,7 @@ func main() {

// Create a new cache client with the specified configuration.
cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
)

Expand Down
9 changes: 5 additions & 4 deletions examples/distributed-early-refreshes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ const (

// Configuration for the early in-memory refreshes.
const (
minRefreshTime = 2 * time.Second
maxRefreshTime = 4 * time.Second
retryBaseDelay = 5 * time.Second
minRefreshTime = 2 * time.Second
maxRefreshTime = 4 * time.Second
synchronousRefreshTime = 30 * time.Second
retryBaseDelay = 5 * time.Second
)

// Configuration for the refresh coalescing.
Expand All @@ -36,7 +37,7 @@ const refreshAfter = time.Second
func newAPIClient(distributedStorage sturdyc.DistributedStorageWithDeletions) *apiClient {
return &apiClient{
cache: sturdyc.New[any](capacity, numberOfShards, ttl, percentageOfRecordsToEvictWhenFull,
sturdyc.WithEarlyRefreshes(minRefreshTime, maxRefreshTime, retryBaseDelay),
sturdyc.WithEarlyRefreshes(minRefreshTime, maxRefreshTime, synchronousRefreshTime, retryBaseDelay),
sturdyc.WithRefreshCoalescing(idealBufferSize, bufferTimeout),
sturdyc.WithDistributedStorageEarlyRefreshes(distributedStorage, refreshAfter),
// NOTE: Uncommenting this line will make the cache mark the records as
Expand Down
Loading
Loading