diff --git a/cmd/cachew/main.go b/cmd/cachew/main.go index 547a167..b454b2f 100644 --- a/cmd/cachew/main.go +++ b/cmd/cachew/main.go @@ -26,10 +26,11 @@ type CLI struct { Daily bool `help:"Prefix keys with date ($${YYYY}-$${MM}-$${DD}-). Mutually exclusive with --hourly." xor:"timeprefix"` Hourly bool `help:"Prefix keys with date and hour ($${YYYY}-$${MM}-$${DD}-$${HH}-). Mutually exclusive with --daily." xor:"timeprefix"` - Get GetCmd `cmd:"" help:"Download object from cache." group:"Operations:"` - Stat StatCmd `cmd:"" help:"Show metadata for cached object." group:"Operations:"` - Put PutCmd `cmd:"" help:"Upload object to cache." group:"Operations:"` - Delete DeleteCmd `cmd:"" help:"Remove object from cache." group:"Operations:"` + Get GetCmd `cmd:"" help:"Download object from cache." group:"Operations:"` + Stat StatCmd `cmd:"" help:"Show metadata for cached object." group:"Operations:"` + Put PutCmd `cmd:"" help:"Upload object to cache." group:"Operations:"` + Delete DeleteCmd `cmd:"" help:"Remove object from cache." group:"Operations:"` + Namespaces NamespacesCmd `cmd:"" help:"List available namespaces in cache." group:"Operations:"` Snapshot SnapshotCmd `cmd:"" help:"Create compressed archive of directory and upload." group:"Snapshots:"` Restore RestoreCmd `cmd:"" help:"Download and extract archive to directory." group:"Snapshots:"` @@ -50,14 +51,16 @@ func main() { } type GetCmd struct { - Key PlatformKey `arg:"" help:"Object key (hex or string)."` - Output *os.File `short:"o" help:"Output file (default: stdout)." default:"-"` + Namespace string `arg:"" help:"Namespace for organizing cache objects."` + Key PlatformKey `arg:"" help:"Object key (hex or string)."` + Output *os.File `short:"o" help:"Output file (default: stdout)." default:"-"` } func (c *GetCmd) Run(ctx context.Context, cache cache.Cache) error { defer c.Output.Close() - rc, headers, err := cache.Open(ctx, c.Key.Key()) + namespacedCache := cache.Namespace(c.Namespace) + rc, headers, err := namespacedCache.Open(ctx, c.Key.Key()) if err != nil { return errors.Wrap(err, "failed to open object") } @@ -74,11 +77,13 @@ func (c *GetCmd) Run(ctx context.Context, cache cache.Cache) error { } type StatCmd struct { - Key PlatformKey `arg:"" help:"Object key (hex or string)."` + Namespace string `arg:"" help:"Namespace for organizing cache objects."` + Key PlatformKey `arg:"" help:"Object key (hex or string)."` } func (c *StatCmd) Run(ctx context.Context, cache cache.Cache) error { - headers, err := cache.Stat(ctx, c.Key.Key()) + namespacedCache := cache.Namespace(c.Namespace) + headers, err := namespacedCache.Stat(ctx, c.Key.Key()) if err != nil { return errors.Wrap(err, "failed to stat object") } @@ -93,10 +98,11 @@ func (c *StatCmd) Run(ctx context.Context, cache cache.Cache) error { } type PutCmd struct { - Key PlatformKey `arg:"" help:"Object key (hex or string)."` - Input *os.File `arg:"" help:"Input file (default: stdin)." default:"-"` - TTL time.Duration `help:"Time to live for the object."` - Headers map[string]string `short:"H" help:"Additional headers (key=value)."` + Namespace string `arg:"" help:"Namespace for organizing cache objects."` + Key PlatformKey `arg:"" help:"Object key (hex or string)."` + Input *os.File `arg:"" help:"Input file (default: stdin)." default:"-"` + TTL time.Duration `help:"Time to live for the object."` + Headers map[string]string `short:"H" help:"Additional headers (key=value)."` } func (c *PutCmd) Run(ctx context.Context, cache cache.Cache) error { @@ -111,7 +117,8 @@ func (c *PutCmd) Run(ctx context.Context, cache cache.Cache) error { headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(filename))) //nolint:perfsprint } - wc, err := cache.Create(ctx, c.Key.Key(), headers, c.TTL) + namespacedCache := cache.Namespace(c.Namespace) + wc, err := namespacedCache.Create(ctx, c.Key.Key(), headers, c.TTL) if err != nil { return errors.Wrap(err, "failed to create object") } @@ -124,14 +131,36 @@ func (c *PutCmd) Run(ctx context.Context, cache cache.Cache) error { } type DeleteCmd struct { - Key PlatformKey `arg:"" help:"Object key (hex or string)."` + Namespace string `arg:"" help:"Namespace for organizing cache objects."` + Key PlatformKey `arg:"" help:"Object key (hex or string)."` } func (c *DeleteCmd) Run(ctx context.Context, cache cache.Cache) error { - return errors.Wrap(cache.Delete(ctx, c.Key.Key()), "failed to delete object") + namespacedCache := cache.Namespace(c.Namespace) + return errors.Wrap(namespacedCache.Delete(ctx, c.Key.Key()), "failed to delete object") +} + +type NamespacesCmd struct{} + +func (c *NamespacesCmd) Run(ctx context.Context, cache cache.Cache) error { + namespaces, err := cache.ListNamespaces(ctx) + if err != nil { + return errors.Wrap(err, "failed to list namespaces") + } + + if len(namespaces) == 0 { + fmt.Println("No namespaces found") //nolint:forbidigo + return nil + } + + for _, ns := range namespaces { + fmt.Println(ns) //nolint:forbidigo + } + return nil } type SnapshotCmd struct { + Namespace string `arg:"" help:"Namespace for organizing cache objects."` Key PlatformKey `arg:"" help:"Object key (hex or string)."` Directory string `arg:"" help:"Directory to archive." type:"path"` TTL time.Duration `help:"Time to live for the object."` @@ -140,7 +169,8 @@ type SnapshotCmd struct { func (c *SnapshotCmd) Run(ctx context.Context, cache cache.Cache) error { fmt.Fprintf(os.Stderr, "Archiving %s...\n", c.Directory) //nolint:forbidigo - if err := snapshot.Create(ctx, cache, c.Key.Key(), c.Directory, c.TTL, c.Exclude); err != nil { + namespacedCache := cache.Namespace(c.Namespace) + if err := snapshot.Create(ctx, namespacedCache, c.Key.Key(), c.Directory, c.TTL, c.Exclude); err != nil { return errors.Wrap(err, "failed to create snapshot") } @@ -149,13 +179,15 @@ func (c *SnapshotCmd) Run(ctx context.Context, cache cache.Cache) error { } type RestoreCmd struct { + Namespace string `arg:"" help:"Namespace for organizing cache objects."` Key PlatformKey `arg:"" help:"Object key (hex or string)."` Directory string `arg:"" help:"Target directory for extraction." type:"path"` } func (c *RestoreCmd) Run(ctx context.Context, cache cache.Cache) error { fmt.Fprintf(os.Stderr, "Restoring to %s...\n", c.Directory) //nolint:forbidigo - if err := snapshot.Restore(ctx, cache, c.Key.Key(), c.Directory); err != nil { + namespacedCache := cache.Namespace(c.Namespace) + if err := snapshot.Restore(ctx, namespacedCache, c.Key.Key(), c.Directory); err != nil { return errors.Wrap(err, "failed to restore snapshot") } diff --git a/internal/cache/api.go b/internal/cache/api.go index 6f13121..c5c2d72 100644 --- a/internal/cache/api.go +++ b/internal/cache/api.go @@ -148,6 +148,9 @@ type Stats struct { type Cache interface { // String describes the Cache implementation. String() string + // Namespace creates a namespaced view of this cache. + // All operations on the returned cache will use the given namespace prefix. + Namespace(namespace string) Cache // Stat returns the headers of an existing object in the cache. // // Expired files MUST not be returned. @@ -173,6 +176,8 @@ type Cache interface { Delete(ctx context.Context, key Key) error // Stats returns health and usage statistics for the cache. Stats(ctx context.Context) (Stats, error) + // ListNamespaces returns all unique namespaces in the cache in order. + ListNamespaces(ctx context.Context) ([]string, error) // Close the Cache. Close() error } diff --git a/internal/cache/cachetest/suite.go b/internal/cache/cachetest/suite.go index db43d3c..217e57e 100644 --- a/internal/cache/cachetest/suite.go +++ b/internal/cache/cachetest/suite.go @@ -2,6 +2,7 @@ package cachetest import ( "context" + "errors" "io" "net/http" "os" @@ -55,6 +56,18 @@ func Suite(t *testing.T, newCache func(t *testing.T) cache.Cache) { t.Run("LastModified", func(t *testing.T) { testLastModified(t, newCache(t)) }) + + t.Run("NamespaceIsolation", func(t *testing.T) { + testNamespaceIsolation(t, newCache(t)) + }) + + t.Run("ListNamespaces", func(t *testing.T) { + testListNamespaces(t, newCache(t)) + }) + + t.Run("NamespaceDelete", func(t *testing.T) { + testNamespaceDelete(t, newCache(t)) + }) } func testCreateAndOpen(t *testing.T, c cache.Cache) { @@ -329,3 +342,120 @@ func testLastModified(t *testing.T, c cache.Cache) { assert.Equal(t, explicitTime.Format(http.TimeFormat), headers2.Get("Last-Modified")) } + +func testNamespaceIsolation(t *testing.T, c cache.Cache) { + defer c.Close() + ctx := t.Context() + + // Create namespace views + gitCache := c.Namespace("git") + gomodCache := c.Namespace("gomod") + + // Create entries in different namespaces with same key + key := cache.NewKey("same-key") + + // Write to git namespace + w, err := gitCache.Create(ctx, key, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write([]byte("git data")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + // Write to gomod namespace + w, err = gomodCache.Create(ctx, key, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write([]byte("gomod data")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + // Verify isolation - each namespace returns its own data + r, _, err := gitCache.Open(ctx, key) + assert.NoError(t, err) + gitData, err := io.ReadAll(r) + assert.NoError(t, err) + assert.Equal(t, "git data", string(gitData)) + assert.NoError(t, r.Close()) + + r, _, err = gomodCache.Open(ctx, key) + assert.NoError(t, err) + gomodData, err := io.ReadAll(r) + assert.NoError(t, err) + assert.Equal(t, "gomod data", string(gomodData)) + assert.NoError(t, r.Close()) +} + +func testListNamespaces(t *testing.T, c cache.Cache) { + defer c.Close() + ctx := t.Context() + + // Initially no namespaces + namespaces, err := c.ListNamespaces(ctx) + if errors.Is(err, cache.ErrStatsUnavailable) { + t.Skip("Cache does not support ListNamespaces") + } + assert.NoError(t, err) + assert.Equal(t, 0, len(namespaces)) + + // Create entries in different namespaces + gitCache := c.Namespace("git") + gomodCache := c.Namespace("gomod") + hermitCache := c.Namespace("hermit") + + for i, cacheNS := range []cache.Cache{gitCache, gomodCache, hermitCache} { + w, err := cacheNS.Create(ctx, cache.NewKey(string(rune('a'+i))), nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write([]byte("data")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + } + + // Verify all namespaces are listed + namespaces, err = c.ListNamespaces(ctx) + assert.NoError(t, err) + assert.Equal(t, 3, len(namespaces)) + + nsMap := make(map[string]bool) + for _, ns := range namespaces { + nsMap[ns] = true + } + assert.True(t, nsMap["git"]) + assert.True(t, nsMap["gomod"]) + assert.True(t, nsMap["hermit"]) +} + +func testNamespaceDelete(t *testing.T, c cache.Cache) { + defer c.Close() + ctx := t.Context() + + gitCache := c.Namespace("git") + gomodCache := c.Namespace("gomod") + + key := cache.NewKey("test-key") + + // Create entry in git namespace + w, err := gitCache.Create(ctx, key, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write([]byte("git data")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + // Create entry in gomod namespace + w, err = gomodCache.Create(ctx, key, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write([]byte("gomod data")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + // Delete from git namespace + err = gitCache.Delete(ctx, key) + assert.NoError(t, err) + + // Verify git entry is gone + _, _, err = gitCache.Open(ctx, key) + assert.IsError(t, err, os.ErrNotExist) + + // Verify gomod entry still exists + r, _, err := gomodCache.Open(ctx, key) + assert.NoError(t, err) + assert.NoError(t, r.Close()) +} diff --git a/internal/cache/disk.go b/internal/cache/disk.go index 7d28ba0..79e7fca 100644 --- a/internal/cache/disk.go +++ b/internal/cache/disk.go @@ -38,8 +38,9 @@ type DiskConfig struct { type Disk struct { logger *slog.Logger config DiskConfig + namespace string db *diskMetaDB - size atomic.Int64 + size *atomic.Int64 runEviction chan struct{} stop context.CancelFunc evictionDone chan struct{} @@ -113,6 +114,7 @@ func NewDisk(ctx context.Context, config DiskConfig) (*Disk, error) { logger: logger, config: config, db: db, + size: &atomic.Int64{}, runEviction: make(chan struct{}), stop: stop, evictionDone: make(chan struct{}), @@ -164,7 +166,7 @@ func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl tim clonedHeaders.Set("Last-Modified", now.UTC().Format(http.TimeFormat)) } - path := d.keyToPath(key) + path := d.keyToPath(d.namespace, key) fullPath := filepath.Join(d.config.Root, path) dir := filepath.Dir(fullPath) @@ -183,6 +185,7 @@ func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl tim disk: d, file: f, key: key, + namespace: d.namespace, path: fullPath, tempPath: f.Name(), expiresAt: expiresAt, @@ -192,12 +195,12 @@ func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl tim } func (d *Disk) Delete(_ context.Context, key Key) error { - path := d.keyToPath(key) + path := d.keyToPath(d.namespace, key) fullPath := filepath.Join(d.config.Root, path) // Check if file is expired expired := false - expiresAt, err := d.db.getTTL(key) + expiresAt, err := d.db.getTTL(d.namespace, key) if err == nil && time.Now().After(expiresAt) { expired = true } @@ -212,7 +215,7 @@ func (d *Disk) Delete(_ context.Context, key Key) error { } // Remove metadata - if err := d.db.delete(key); err != nil { + if err := d.db.delete(d.namespace, key); err != nil { return errors.Errorf("failed to delete TTL metadata: %w", err) } @@ -225,14 +228,14 @@ func (d *Disk) Delete(_ context.Context, key Key) error { } func (d *Disk) Stat(ctx context.Context, key Key) (http.Header, error) { - path := d.keyToPath(key) + path := d.keyToPath(d.namespace, key) fullPath := filepath.Join(d.config.Root, path) if _, err := os.Stat(fullPath); err != nil { return nil, errors.Errorf("failed to stat file: %w", err) } - expiresAt, err := d.db.getTTL(key) + expiresAt, err := d.db.getTTL(d.namespace, key) if err != nil { return nil, errors.Errorf("failed to get TTL: %w", err) } @@ -241,7 +244,7 @@ func (d *Disk) Stat(ctx context.Context, key Key) (http.Header, error) { return nil, errors.Join(fs.ErrNotExist, d.Delete(ctx, key)) } - headers, err := d.db.getHeaders(key) + headers, err := d.db.getHeaders(d.namespace, key) if err != nil { return nil, errors.Errorf("failed to get headers: %w", err) } @@ -250,7 +253,7 @@ func (d *Disk) Stat(ctx context.Context, key Key) (http.Header, error) { } func (d *Disk) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) { - path := d.keyToPath(key) + path := d.keyToPath(d.namespace, key) fullPath := filepath.Join(d.config.Root, path) f, err := os.Open(fullPath) @@ -258,7 +261,7 @@ func (d *Disk) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, e return nil, nil, errors.Errorf("failed to open file: %w", err) } - expiresAt, err := d.db.getTTL(key) + expiresAt, err := d.db.getTTL(d.namespace, key) if err != nil { return nil, nil, errors.Join(err, f.Close()) } @@ -268,7 +271,7 @@ func (d *Disk) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, e return nil, nil, errors.Join(fs.ErrNotExist, f.Close(), d.Delete(ctx, key)) } - headers, err := d.db.getHeaders(key) + headers, err := d.db.getHeaders(d.namespace, key) if err != nil { return nil, nil, errors.Join(errors.Errorf("failed to get headers: %w", err), f.Close()) } @@ -277,16 +280,20 @@ func (d *Disk) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, e ttl := min(expiresAt.Sub(now), d.config.MaxTTL) newExpiresAt := now.Add(ttl) - if err := d.db.setTTL(key, newExpiresAt); err != nil { + if err := d.db.setTTL(d.namespace, key, newExpiresAt); err != nil { return nil, nil, errors.Join(errors.Errorf("failed to update expiration time: %w", err), f.Close()) } return f, headers, nil } -func (d *Disk) keyToPath(key Key) string { +func (d *Disk) keyToPath(namespace string, key Key) string { hexKey := key.String() + // Use first two hex digits as directory, full hex as filename + if namespace != "" { + return filepath.Join(namespace, hexKey[:2], hexKey) + } return filepath.Join(hexKey[:2], hexKey) } @@ -312,27 +319,33 @@ func (d *Disk) evictionLoop(ctx context.Context) { } } -func (d *Disk) evict() error { - type fileInfo struct { - key Key - path string - size int64 - expiresAt time.Time - accessedAt time.Time - } +type evictFileInfo struct { + namespace string + key Key + path string + size int64 + expiresAt time.Time + accessedAt time.Time +} - var remainingFiles []fileInfo - var expiredKeys []Key +type evictEntryKey struct { + namespace string + key Key +} + +func (d *Disk) evict() error { + var remainingFiles []evictFileInfo + var expiredEntries []evictEntryKey now := time.Now() - err := d.db.walk(func(key Key, expiresAt time.Time) error { - path := d.keyToPath(key) + err := d.db.walk(func(key Key, namespace string, expiresAt time.Time) error { + path := d.keyToPath(namespace, key) fullPath := filepath.Join(d.config.Root, path) info, err := os.Stat(fullPath) if err != nil { if errors.Is(err, fs.ErrNotExist) { - expiredKeys = append(expiredKeys, key) + expiredEntries = append(expiredEntries, evictEntryKey{namespace, key}) } return nil } @@ -341,10 +354,11 @@ func (d *Disk) evict() error { if err := os.Remove(fullPath); err != nil && !errors.Is(err, fs.ErrNotExist) { return errors.Errorf("failed to delete expired file %s: %w", path, err) } - expiredKeys = append(expiredKeys, key) + expiredEntries = append(expiredEntries, evictEntryKey{namespace, key}) d.size.Add(-info.Size()) } else { - remainingFiles = append(remainingFiles, fileInfo{ + remainingFiles = append(remainingFiles, evictFileInfo{ + namespace: namespace, key: key, path: path, size: info.Size(), @@ -358,10 +372,24 @@ func (d *Disk) evict() error { return errors.Errorf("failed to walk TTL entries: %w", err) } - if err := d.db.deleteAll(expiredKeys); err != nil { - return errors.Errorf("failed to delete TTL metadata: %w", err) + if err := d.deleteExpiredEntries(expiredEntries); err != nil { + return err + } + + return d.evictBySize(remainingFiles) +} + +func (d *Disk) deleteExpiredEntries(expiredEntries []evictEntryKey) error { + if len(expiredEntries) == 0 { + return nil } + if err := d.db.deleteAll(expiredEntries); err != nil { + return errors.Errorf("failed to delete expired metadata: %w", err) + } + return nil +} +func (d *Disk) evictBySize(remainingFiles []evictFileInfo) error { limitBytes := int64(d.config.LimitMB) * 1024 * 1024 if d.size.Load() <= limitBytes { return nil @@ -372,7 +400,7 @@ func (d *Disk) evict() error { return remainingFiles[i].accessedAt.Before(remainingFiles[j].accessedAt) }) - var sizeEvictedKeys []Key + var sizeEvictedEntries []evictEntryKey for _, f := range remainingFiles { if d.size.Load() <= limitBytes { break @@ -382,21 +410,18 @@ func (d *Disk) evict() error { if err := os.Remove(fullPath); err != nil && !errors.Is(err, fs.ErrNotExist) { return errors.Errorf("failed to delete file during size eviction %s: %w", f.path, err) } - sizeEvictedKeys = append(sizeEvictedKeys, f.key) + sizeEvictedEntries = append(sizeEvictedEntries, evictEntryKey{f.namespace, f.key}) d.size.Add(-f.size) } - if err := d.db.deleteAll(sizeEvictedKeys); err != nil { - return errors.Errorf("failed to delete TTL metadata: %w", err) - } - - return nil + return d.deleteExpiredEntries(sizeEvictedEntries) } type diskWriter struct { disk *Disk file *os.File key Key + namespace string path string tempPath string expiresAt time.Time @@ -437,7 +462,7 @@ func (w *diskWriter) Close() error { return errors.Errorf("failed to rename temp file: %w", err) } - if err := w.disk.db.set(w.key, w.expiresAt, w.headers); err != nil { + if err := w.disk.db.set(w.key, w.namespace, w.expiresAt, w.headers); err != nil { return errors.Join(errors.Errorf("failed to set metadata: %w", err), os.Remove(w.path)) } @@ -450,3 +475,16 @@ func (w *diskWriter) Close() error { return nil } + +// Namespace creates a namespaced view of the disk cache. +func (d *Disk) Namespace(namespace string) Cache { + // Create a shallow copy with the namespace set + c := *d + c.namespace = namespace + return &c +} + +// ListNamespaces returns all unique namespaces in the disk cache. +func (d *Disk) ListNamespaces(_ context.Context) ([]string, error) { + return d.db.listNamespaces() +} diff --git a/internal/cache/disk_eviction_test.go b/internal/cache/disk_eviction_test.go new file mode 100644 index 0000000..5a6409d --- /dev/null +++ b/internal/cache/disk_eviction_test.go @@ -0,0 +1,125 @@ +package cache_test + +import ( + "log/slog" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" + "github.com/block/cachew/internal/logging" +) + +func TestDiskEvictionBySize(t *testing.T) { + dir := t.TempDir() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + + // Create cache with 1MB limit and fast eviction + c, err := cache.NewDisk(ctx, cache.DiskConfig{ + Root: dir, + LimitMB: 1, + MaxTTL: time.Hour, + EvictInterval: 50 * time.Millisecond, + }) + assert.NoError(t, err) + defer c.Close() + + // Create 3 entries of ~500KB each (total 1.5MB, exceeding 1MB limit) + data := make([]byte, 500*1024) + keys := []cache.Key{ + cache.NewKey("key1"), + cache.NewKey("key2"), + cache.NewKey("key3"), + } + + for _, key := range keys { + w, err := c.Create(ctx, key, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write(data) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + time.Sleep(10 * time.Millisecond) // Ensure different access times + } + + // Wait for eviction to run + time.Sleep(200 * time.Millisecond) + + // key1 (oldest) should be evicted + _, _, err = c.Open(ctx, keys[0]) + assert.Error(t, err) + + // key2 and key3 should still exist + r2, _, err := c.Open(ctx, keys[1]) + assert.NoError(t, err) + assert.NoError(t, r2.Close()) + + r3, _, err := c.Open(ctx, keys[2]) + assert.NoError(t, err) + assert.NoError(t, r3.Close()) +} + +func TestDiskEvictionAcrossNamespaces(t *testing.T) { + dir := t.TempDir() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + + // Create cache with 1MB limit + baseCache, err := cache.NewDisk(ctx, cache.DiskConfig{ + Root: dir, + LimitMB: 1, + MaxTTL: time.Hour, + EvictInterval: 50 * time.Millisecond, + }) + assert.NoError(t, err) + defer baseCache.Close() + + // Create namespace views + gitCache := baseCache.Namespace("git") + gomodCache := baseCache.Namespace("gomod") + + // Create entries in different namespaces + data := make([]byte, 500*1024) + + // git namespace + gitKey := cache.NewKey("git-key") + w, err := gitCache.Create(ctx, gitKey, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write(data) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + time.Sleep(10 * time.Millisecond) + + // gomod namespace + gomodKey := cache.NewKey("gomod-key") + w, err = gomodCache.Create(ctx, gomodKey, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write(data) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + time.Sleep(10 * time.Millisecond) + + // Another git entry to exceed limit + gitKey2 := cache.NewKey("git-key2") + w, err = gitCache.Create(ctx, gitKey2, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write(data) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + // Wait for eviction + time.Sleep(200 * time.Millisecond) + + // First git entry (oldest) should be evicted + _, _, err = gitCache.Open(ctx, gitKey) + assert.Error(t, err) + + // gomod entry should still exist + r, _, err := gomodCache.Open(ctx, gomodKey) + assert.NoError(t, err) + assert.NoError(t, r.Close()) + + // Newer git entry should still exist + r, _, err = gitCache.Open(ctx, gitKey2) + assert.NoError(t, err) + assert.NoError(t, r.Close()) +} diff --git a/internal/cache/disk_metadb.go b/internal/cache/disk_metadb.go index 9349aa7..47b28aa 100644 --- a/internal/cache/disk_metadb.go +++ b/internal/cache/disk_metadb.go @@ -1,9 +1,12 @@ package cache import ( + "bytes" "encoding/json" "io/fs" "net/http" + "sort" + "sync" "time" "github.com/alecthomas/errors" @@ -18,7 +21,17 @@ var ( // diskMetaDB manages expiration times and headers for cache entries using bbolt. type diskMetaDB struct { - db *bbolt.DB + db *bbolt.DB + namespacesCache sync.Map // map[string]bool - concurrent-safe +} + +// compositeKey creates a unique database key from namespace and cache key. +// Format: "namespace/hexkey" when namespace is set, or just "hexkey" when empty. +func compositeKey(namespace string, key Key) []byte { + if namespace == "" { + return []byte(key.String()) + } + return []byte(namespace + "/" + key.String()) } // newDiskMetaDB creates a new bbolt-backed metadata storage for the disk cache. @@ -42,22 +55,52 @@ func newDiskMetaDB(dbPath string) (*diskMetaDB, error) { return nil, errors.Join(errors.Errorf("failed to create buckets: %w", err), db.Close()) } - return &diskMetaDB{db: db}, nil + // Initialize in-memory namespace cache by scanning existing entries + metaDB := &diskMetaDB{db: db} + err = db.View(func(tx *bbolt.Tx) error { + ttlBucket := tx.Bucket(ttlBucketName) + if ttlBucket == nil { + return nil + } + return ttlBucket.ForEach(func(k, _ []byte) error { + namespace, _, found := bytes.Cut(k, []byte("/")) + if found && len(namespace) > 0 { + metaDB.namespacesCache.Store(string(namespace), true) + } + return nil + }) + }) + if err != nil { + return nil, errors.Join(errors.Errorf("failed to initialize namespace cache: %w", err), db.Close()) + } + + return metaDB, nil } -func (s *diskMetaDB) setTTL(key Key, expiresAt time.Time) error { +func (s *diskMetaDB) setTTL(namespace string, key Key, expiresAt time.Time) error { ttlBytes, err := expiresAt.MarshalBinary() if err != nil { return errors.Errorf("failed to marshal TTL: %w", err) } - return errors.WithStack(s.db.Update(func(tx *bbolt.Tx) error { + dbKey := compositeKey(namespace, key) + err = s.db.Update(func(tx *bbolt.Tx) error { ttlBucket := tx.Bucket(ttlBucketName) - return errors.WithStack(ttlBucket.Put(key[:], ttlBytes)) - })) + return errors.WithStack(ttlBucket.Put(dbKey, ttlBytes)) + }) + if err != nil { + return errors.WithStack(err) + } + + // Add namespace to in-memory cache + if namespace != "" { + s.namespacesCache.Store(namespace, true) + } + + return nil } -func (s *diskMetaDB) set(key Key, expiresAt time.Time, headers http.Header) error { +func (s *diskMetaDB) set(key Key, namespace string, expiresAt time.Time, headers http.Header) error { ttlBytes, err := expiresAt.MarshalBinary() if err != nil { return errors.Errorf("failed to marshal TTL: %w", err) @@ -68,22 +111,34 @@ func (s *diskMetaDB) set(key Key, expiresAt time.Time, headers http.Header) erro return errors.Errorf("failed to encode headers: %w", err) } - return errors.WithStack(s.db.Update(func(tx *bbolt.Tx) error { + dbKey := compositeKey(namespace, key) + err = s.db.Update(func(tx *bbolt.Tx) error { ttlBucket := tx.Bucket(ttlBucketName) - if err := ttlBucket.Put(key[:], ttlBytes); err != nil { + if err := ttlBucket.Put(dbKey, ttlBytes); err != nil { return errors.WithStack(err) } headersBucket := tx.Bucket(headersBucketName) - return errors.WithStack(headersBucket.Put(key[:], headersBytes)) - })) + return errors.WithStack(headersBucket.Put(dbKey, headersBytes)) + }) + if err != nil { + return errors.WithStack(err) + } + + // Add namespace to in-memory cache + if namespace != "" { + s.namespacesCache.Store(namespace, true) + } + + return nil } -func (s *diskMetaDB) getTTL(key Key) (time.Time, error) { +func (s *diskMetaDB) getTTL(namespace string, key Key) (time.Time, error) { var expiresAt time.Time + dbKey := compositeKey(namespace, key) err := s.db.View(func(tx *bbolt.Tx) error { bucket := tx.Bucket(ttlBucketName) - ttlBytes := bucket.Get(key[:]) + ttlBytes := bucket.Get(dbKey) if ttlBytes == nil { return fs.ErrNotExist } @@ -92,11 +147,12 @@ func (s *diskMetaDB) getTTL(key Key) (time.Time, error) { return expiresAt, errors.WithStack(err) } -func (s *diskMetaDB) getHeaders(key Key) (http.Header, error) { +func (s *diskMetaDB) getHeaders(namespace string, key Key) (http.Header, error) { var headers http.Header + dbKey := compositeKey(namespace, key) err := s.db.View(func(tx *bbolt.Tx) error { bucket := tx.Bucket(headersBucketName) - headersBytes := bucket.Get(key[:]) + headersBytes := bucket.Get(dbKey) if headersBytes == nil { return fs.ErrNotExist } @@ -105,31 +161,33 @@ func (s *diskMetaDB) getHeaders(key Key) (http.Header, error) { return headers, errors.WithStack(err) } -func (s *diskMetaDB) delete(key Key) error { +func (s *diskMetaDB) delete(namespace string, key Key) error { + dbKey := compositeKey(namespace, key) return errors.WithStack(s.db.Update(func(tx *bbolt.Tx) error { ttlBucket := tx.Bucket(ttlBucketName) - if err := ttlBucket.Delete(key[:]); err != nil { + if err := ttlBucket.Delete(dbKey); err != nil { return errors.WithStack(err) } headersBucket := tx.Bucket(headersBucketName) - return errors.WithStack(headersBucket.Delete(key[:])) + return errors.WithStack(headersBucket.Delete(dbKey)) })) } -func (s *diskMetaDB) deleteAll(keys []Key) error { - if len(keys) == 0 { +func (s *diskMetaDB) deleteAll(entries []evictEntryKey) error { + if len(entries) == 0 { return nil } return errors.WithStack(s.db.Update(func(tx *bbolt.Tx) error { ttlBucket := tx.Bucket(ttlBucketName) headersBucket := tx.Bucket(headersBucketName) - for _, key := range keys { - if err := ttlBucket.Delete(key[:]); err != nil { + for _, entry := range entries { + dbKey := compositeKey(entry.namespace, entry.key) + if err := ttlBucket.Delete(dbKey); err != nil { return errors.Errorf("failed to delete TTL: %w", err) } - if err := headersBucket.Delete(key[:]); err != nil { + if err := headersBucket.Delete(dbKey); err != nil { return errors.Errorf("failed to delete headers: %w", err) } } @@ -137,23 +195,35 @@ func (s *diskMetaDB) deleteAll(keys []Key) error { })) } -func (s *diskMetaDB) walk(fn func(key Key, expiresAt time.Time) error) error { +func (s *diskMetaDB) walk(fn func(key Key, namespace string, expiresAt time.Time) error) error { return errors.WithStack(s.db.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(ttlBucketName) - if bucket == nil { + ttlBucket := tx.Bucket(ttlBucketName) + if ttlBucket == nil { return nil } - return bucket.ForEach(func(k, v []byte) error { - if len(k) != 32 { + return ttlBucket.ForEach(func(k, v []byte) error { + var namespace string + var key Key + + before, hexKey, found := bytes.Cut(k, []byte("/")) + if found { + namespace = string(before) + } else { + hexKey = k + } + if len(hexKey) != 64 { return nil } - var key Key - copy(key[:], k) + if err := key.UnmarshalText(hexKey); err != nil { + return nil //nolint:nilerr + } + var expiresAt time.Time if err := expiresAt.UnmarshalBinary(v); err != nil { return nil //nolint:nilerr } - return fn(key, expiresAt) + + return fn(key, namespace, expiresAt) }) })) } @@ -177,3 +247,15 @@ func (s *diskMetaDB) close() error { } return nil } + +func (s *diskMetaDB) listNamespaces() ([]string, error) { + var namespaces []string + s.namespacesCache.Range(func(key, _ any) bool { + if ns, ok := key.(string); ok { + namespaces = append(namespaces, ns) + } + return true + }) + sort.Strings(namespaces) + return namespaces, nil +} diff --git a/internal/cache/memory.go b/internal/cache/memory.go index ced3d7a..10a553f 100644 --- a/internal/cache/memory.go +++ b/internal/cache/memory.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "sync" + "sync/atomic" "time" "github.com/alecthomas/errors" @@ -38,16 +39,19 @@ type memoryEntry struct { type Memory struct { config MemoryConfig - mu sync.RWMutex - entries map[Key]*memoryEntry - currentSize int64 + namespace string + mu *sync.RWMutex + entries map[string]map[Key]*memoryEntry // namespace -> key -> entry + currentSize *atomic.Int64 } func NewMemory(ctx context.Context, config MemoryConfig) (*Memory, error) { logging.FromContext(ctx).InfoContext(ctx, "Constructing in-memory Cache", "limit-mb", config.LimitMB, "max-ttl", config.MaxTTL) return &Memory{ - config: config, - entries: make(map[Key]*memoryEntry), + config: config, + mu: &sync.RWMutex{}, + entries: make(map[string]map[Key]*memoryEntry), + currentSize: &atomic.Int64{}, }, nil } @@ -57,7 +61,12 @@ func (m *Memory) Stat(_ context.Context, key Key) (http.Header, error) { m.mu.RLock() defer m.mu.RUnlock() - entry, exists := m.entries[key] + nsEntries, nsExists := m.entries[m.namespace] + if !nsExists { + return nil, os.ErrNotExist + } + + entry, exists := nsEntries[key] if !exists { return nil, os.ErrNotExist } @@ -73,7 +82,12 @@ func (m *Memory) Open(_ context.Context, key Key) (io.ReadCloser, http.Header, e m.mu.RLock() defer m.mu.RUnlock() - entry, exists := m.entries[key] + nsEntries, nsExists := m.entries[m.namespace] + if !nsExists { + return nil, nil, os.ErrNotExist + } + + entry, exists := nsEntries[key] if !exists { return nil, nil, os.ErrNotExist } @@ -100,6 +114,7 @@ func (m *Memory) Create(ctx context.Context, key Key, headers http.Header, ttl t writer := &memoryWriter{ cache: m, + namespace: m.namespace, key: key, buf: &bytes.Buffer{}, expiresAt: now.Add(ttl), @@ -114,12 +129,17 @@ func (m *Memory) Delete(_ context.Context, key Key) error { m.mu.Lock() defer m.mu.Unlock() - entry, exists := m.entries[key] + nsEntries, nsExists := m.entries[m.namespace] + if !nsExists { + return os.ErrNotExist + } + + entry, exists := nsEntries[key] if !exists { return os.ErrNotExist } - m.currentSize -= int64(len(entry.data)) - delete(m.entries, key) + m.currentSize.Add(-int64(len(entry.data))) + delete(nsEntries, key) return nil } @@ -135,27 +155,36 @@ func (m *Memory) Stats(_ context.Context) (Stats, error) { m.mu.RLock() defer m.mu.RUnlock() + totalObjects := int64(0) + for _, nsEntries := range m.entries { + totalObjects += int64(len(nsEntries)) + } + return Stats{ - Objects: int64(len(m.entries)), - Size: m.currentSize, + Objects: totalObjects, + Size: m.currentSize.Load(), Capacity: int64(m.config.LimitMB) * 1024 * 1024, }, nil } func (m *Memory) evictOldest(neededSpace int64) { type entryInfo struct { + namespace string key Key size int64 expiresAt time.Time } var entries []entryInfo - for k, e := range m.entries { - entries = append(entries, entryInfo{ - key: k, - size: int64(len(e.data)), - expiresAt: e.expiresAt, - }) + for ns, nsEntries := range m.entries { + for k, e := range nsEntries { + entries = append(entries, entryInfo{ + namespace: ns, + key: k, + size: int64(len(e.data)), + expiresAt: e.expiresAt, + }) + } } // Sort by expiry time (earliest first) @@ -172,14 +201,15 @@ func (m *Memory) evictOldest(neededSpace int64) { if freedSpace >= neededSpace { break } - m.currentSize -= e.size - delete(m.entries, e.key) + m.currentSize.Add(-e.size) + delete(m.entries[e.namespace], e.key) freedSpace += e.size } } type memoryWriter struct { cache *Memory + namespace string key Key buf *bytes.Buffer expiresAt time.Time @@ -212,31 +242,58 @@ func (w *memoryWriter) Close() error { newSize := int64(w.buf.Len()) limitBytes := int64(w.cache.config.LimitMB) * 1024 * 1024 + // Ensure namespace map exists + if w.cache.entries[w.namespace] == nil { + w.cache.entries[w.namespace] = make(map[Key]*memoryEntry) + } + nsEntries := w.cache.entries[w.namespace] + // Remove old entry size if it exists oldSize := int64(0) - if oldEntry, exists := w.cache.entries[w.key]; exists { + if oldEntry, exists := nsEntries[w.key]; exists { oldSize = int64(len(oldEntry.data)) } // Evict entries if needed to make room if limitBytes > 0 { - neededSpace := w.cache.currentSize - oldSize + newSize - limitBytes + neededSpace := w.cache.currentSize.Load() - oldSize + newSize - limitBytes if neededSpace > 0 { w.cache.evictOldest(neededSpace) } } - w.cache.currentSize -= oldSize + w.cache.currentSize.Add(-oldSize) // Copy the buffer data to avoid holding a reference to the buffer's internal slice data := make([]byte, w.buf.Len()) copy(data, w.buf.Bytes()) w.buf.Reset() - w.cache.entries[w.key] = &memoryEntry{ + nsEntries[w.key] = &memoryEntry{ data: data, expiresAt: w.expiresAt, headers: w.headers, } - w.cache.currentSize += newSize + w.cache.currentSize.Add(newSize) return nil } + +// Namespace creates a namespaced view of the memory cache. +func (m *Memory) Namespace(namespace string) Cache { + c := *m + c.namespace = namespace + return &c +} + +// ListNamespaces returns all unique namespaces in the memory cache. +func (m *Memory) ListNamespaces(_ context.Context) ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + namespaces := make([]string, 0, len(m.entries)) + for ns := range m.entries { + if ns != "" { + namespaces = append(namespaces, ns) + } + } + return namespaces, nil +} diff --git a/internal/cache/noop.go b/internal/cache/noop.go index ad645ab..c77fee1 100644 --- a/internal/cache/noop.go +++ b/internal/cache/noop.go @@ -60,3 +60,13 @@ func (n *noOpWriter) Close() error { var _ Cache = (*noOpCache)(nil) var _ io.WriteCloser = (*noOpWriter)(nil) + +// Namespace creates a namespaced view (no-op for noop cache). +func (n *noOpCache) Namespace(_ string) Cache { + return n +} + +// ListNamespaces returns empty list for noop cache. +func (n *noOpCache) ListNamespaces(_ context.Context) ([]string, error) { + return []string{}, nil +} diff --git a/internal/cache/remote.go b/internal/cache/remote.go index 05dac25..bcd5650 100644 --- a/internal/cache/remote.go +++ b/internal/cache/remote.go @@ -13,10 +13,13 @@ import ( "github.com/alecthomas/errors" ) +const defaultNamespace = "-" + // Remote implements Cache as a client for the remote cache server. type Remote struct { - baseURL string - client *http.Client + baseURL string + client *http.Client + namespace string } var _ Cache = (*Remote)(nil) @@ -37,7 +40,11 @@ func (c *Remote) String() string { return "remote:" + c.baseURL } // Open retrieves an object from the remote. func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) { - url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) + namespace := c.namespace + if namespace == "" { + namespace = defaultNamespace + } + url := fmt.Sprintf("%s/object/%s/%s", c.baseURL, namespace, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, nil, errors.Wrap(err, "failed to create request") @@ -66,7 +73,11 @@ func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, // Stat retrieves headers for an object from the remote. func (c *Remote) Stat(ctx context.Context, key Key) (http.Header, error) { - url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) + namespace := c.namespace + if namespace == "" { + namespace = defaultNamespace + } + url := fmt.Sprintf("%s/object/%s/%s", c.baseURL, namespace, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) if err != nil { return nil, errors.Wrap(err, "failed to create request") @@ -96,7 +107,11 @@ func (c *Remote) Stat(ctx context.Context, key Key) (http.Header, error) { func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (io.WriteCloser, error) { pr, pw := io.Pipe() - url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) + namespace := c.namespace + if namespace == "" { + namespace = defaultNamespace + } + url := fmt.Sprintf("%s/object/%s/%s", c.baseURL, namespace, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, pr) if err != nil { return nil, errors.Join(errors.Wrap(err, "failed to create request"), pr.Close(), pw.Close()) @@ -136,7 +151,11 @@ func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl t // Delete removes an object from the remote. func (c *Remote) Delete(ctx context.Context, key Key) error { - url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) + namespace := c.namespace + if namespace == "" { + namespace = defaultNamespace + } + url := fmt.Sprintf("%s/object/%s/%s", c.baseURL, namespace, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) if err != nil { return errors.Wrap(err, "failed to create request") @@ -223,3 +242,39 @@ func (wc *writeCloser) Close() error { } return nil } + +// Namespace creates a namespaced view of the remote cache. +func (c *Remote) Namespace(namespace string) Cache { + return &Remote{ + baseURL: c.baseURL, + client: c.client, + namespace: namespace, + } +} + +// ListNamespaces requests namespace list from the remote server. +func (c *Remote) ListNamespaces(ctx context.Context) ([]string, error) { + url := c.baseURL + "/namespaces" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, errors.WithStack(err) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, errors.WithStack(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) //nolint:errcheck + return nil, errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + var namespaces []string + if err := json.NewDecoder(resp.Body).Decode(&namespaces); err != nil { + return nil, errors.WithStack(err) + } + + return namespaces, nil +} diff --git a/internal/cache/s3.go b/internal/cache/s3.go index cbb6734..9b96118 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -41,9 +41,10 @@ type S3Config struct { } type S3 struct { - logger *slog.Logger - config S3Config - client *minio.Client + logger *slog.Logger + config S3Config + namespace string + client *minio.Client } var _ Cache = (*S3)(nil) @@ -156,14 +157,20 @@ func (s *S3) Close() error { return nil } -func (s *S3) keyToPath(key Key) string { +func (s *S3) keyToPath(namespace string, key Key) string { hexKey := key.String() + prefix := "" + + if namespace != "" { + prefix = namespace + "/" + } + // Use first two hex digits as directory, full hex as filename - return hexKey[:2] + "/" + hexKey + return prefix + hexKey[:2] + "/" + hexKey } func (s *S3) Stat(ctx context.Context, key Key) (http.Header, error) { - objectName := s.keyToPath(key) + objectName := s.keyToPath(s.namespace, key) // Get object info to check metadata objInfo, err := s.client.StatObject(ctx, s.config.Bucket, objectName, minio.StatObjectOptions{}) @@ -206,7 +213,7 @@ func (s *S3) Stat(ctx context.Context, key Key) (http.Header, error) { } func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) { - objectName := s.keyToPath(key) + objectName := s.keyToPath(s.namespace, key) // Get object info to retrieve metadata and check expiration objInfo, err := s.client.StatObject(ctx, s.config.Bucket, objectName, minio.StatObjectOptions{}) @@ -291,6 +298,7 @@ func (s *S3) Create(ctx context.Context, key Key, headers http.Header, ttl time. writer := &s3Writer{ s3: s, key: key, + namespace: s.namespace, pipe: pw, expiresAt: expiresAt, headers: clonedHeaders, @@ -305,7 +313,7 @@ func (s *S3) Create(ctx context.Context, key Key, headers http.Header, ttl time. } func (s *S3) Delete(ctx context.Context, key Key) error { - objectName := s.keyToPath(key) + objectName := s.keyToPath(s.namespace, key) err := s.client.RemoveObject(ctx, s.config.Bucket, objectName, minio.RemoveObjectOptions{}) if err != nil { @@ -324,6 +332,7 @@ func (s *S3) Stats(_ context.Context) (Stats, error) { type s3Writer struct { s3 *S3 key Key + namespace string pipe *io.PipeWriter expiresAt time.Time headers http.Header @@ -376,7 +385,7 @@ func (w *s3Writer) upload(pr *io.PipeReader) { _ = pr.CloseWithError(uploadErr) }() - objectName := w.s3.keyToPath(w.key) + objectName := w.s3.keyToPath(w.namespace, w.key) // Prepare user metadata userMetadata := make(map[string]string) @@ -430,3 +439,16 @@ func (w *s3Writer) upload(pr *io.PipeReader) { w.errCh <- nil } + +// Namespace creates a namespaced view of the S3 cache. +func (s *S3) Namespace(namespace string) Cache { + c := *s + c.namespace = namespace + return &c +} + +// ListNamespaces returns all unique namespaces in the S3 cache. +// Not implemented for S3 - would require listing all objects. +func (s *S3) ListNamespaces(_ context.Context) ([]string, error) { + return nil, ErrStatsUnavailable +} diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 6466a6a..8f48a43 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -5,6 +5,7 @@ import ( "io" "net/http" "os" + "sort" "strings" "sync" "time" @@ -179,3 +180,34 @@ func (t tieredWriter) Write(p []byte) (n int, err error) { } return } + +// Namespace creates a namespaced view of the tiered cache. +// All underlying caches are also namespaced. +func (t Tiered) Namespace(namespace string) Cache { + namespaced := make([]Cache, len(t.caches)) + for i, c := range t.caches { + namespaced[i] = c.Namespace(namespace) + } + return Tiered{caches: namespaced} +} + +// ListNamespaces returns unique namespaces from all underlying caches. +func (t Tiered) ListNamespaces(ctx context.Context) ([]string, error) { + namespaceSet := make(map[string]bool) + for _, c := range t.caches { + namespaces, err := c.ListNamespaces(ctx) + if err != nil && !errors.Is(err, ErrStatsUnavailable) { + return nil, errors.WithStack(err) + } + for _, ns := range namespaces { + namespaceSet[ns] = true + } + } + + namespaces := make([]string, 0, len(namespaceSet)) + for ns := range namespaceSet { + namespaces = append(namespaces, ns) + } + sort.Strings(namespaces) + return namespaces, nil +} diff --git a/internal/config/config.go b/internal/config/config.go index 98241fa..0c512ea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -132,9 +132,10 @@ func Load( // Second pass, instantiate strategies and bind them to the mux. for _, block := range strategyCandidates { - logger := logger.With("strategy", block.Name) + strategy := block.Name + logger := logger.With("strategy", strategy) mlog := &loggingMux{logger: logger, mux: mux} - _, err := sr.Create(ctx, block.Name, block, cache, mlog, vars) + _, err := sr.Create(ctx, strategy, block, cache, mlog, vars) if err != nil { return errors.Errorf("%s: %w", block.Pos, err) } diff --git a/internal/strategy/api.go b/internal/strategy/api.go index c2ef959..c2730c1 100644 --- a/internal/strategy/api.go +++ b/internal/strategy/api.go @@ -87,7 +87,9 @@ func (r *Registry) Create( vars map[string]string, ) (Strategy, error) { if entry, ok := r.registry[name]; ok { - return errors.WithStack2(entry.factory(ctx, config, cache, mux, vars)) + // Create a namespaced view of the cache for this strategy + namespacedCache := cache.Namespace(name) + return errors.WithStack2(entry.factory(ctx, config, namespacedCache, mux, vars)) } return nil, errors.Errorf("%s: %w", name, ErrNotFound) } diff --git a/internal/strategy/apiv1.go b/internal/strategy/apiv1.go index d92b0b9..1a55dda 100644 --- a/internal/strategy/apiv1.go +++ b/internal/strategy/apiv1.go @@ -32,24 +32,27 @@ func NewAPIV1(ctx context.Context, _ struct{}, cache cache.Cache, mux Mux) (*API logger: logging.FromContext(ctx), cache: cache, } - mux.Handle("GET /api/v1/object/{key}", http.HandlerFunc(s.getObject)) - mux.Handle("HEAD /api/v1/object/{key}", http.HandlerFunc(s.statObject)) - mux.Handle("POST /api/v1/object/{key}", http.HandlerFunc(s.putObject)) - mux.Handle("DELETE /api/v1/object/{key}", http.HandlerFunc(s.deleteObject)) + mux.Handle("GET /api/v1/object/{namespace}/{key}", http.HandlerFunc(s.getObject)) + mux.Handle("HEAD /api/v1/object/{namespace}/{key}", http.HandlerFunc(s.statObject)) + mux.Handle("POST /api/v1/object/{namespace}/{key}", http.HandlerFunc(s.putObject)) + mux.Handle("DELETE /api/v1/object/{namespace}/{key}", http.HandlerFunc(s.deleteObject)) mux.Handle("GET /api/v1/stats", http.HandlerFunc(s.getStats)) + mux.Handle("GET /api/v1/namespaces", http.HandlerFunc(s.getNamespaces)) return s, nil } func (d *APIV1) String() string { return "default" } func (d *APIV1) statObject(w http.ResponseWriter, r *http.Request) { + namespace := r.PathValue("namespace") key, err := cache.ParseKey(r.PathValue("key")) if err != nil { d.httpError(w, http.StatusBadRequest, err, "Invalid key") return } - headers, err := d.cache.Stat(r.Context(), key) + namespacedCache := d.cache.Namespace(namespace) + headers, err := namespacedCache.Stat(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { http.Error(w, "Cache object not found", http.StatusNotFound) @@ -64,13 +67,15 @@ func (d *APIV1) statObject(w http.ResponseWriter, r *http.Request) { } func (d *APIV1) getObject(w http.ResponseWriter, r *http.Request) { + namespace := r.PathValue("namespace") key, err := cache.ParseKey(r.PathValue("key")) if err != nil { d.httpError(w, http.StatusBadRequest, err, "Invalid key") return } - cr, headers, err := d.cache.Open(r.Context(), key) + namespacedCache := d.cache.Namespace(namespace) + cr, headers, err := namespacedCache.Open(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { http.Error(w, "Cache object not found", http.StatusNotFound) @@ -92,6 +97,7 @@ func (d *APIV1) getObject(w http.ResponseWriter, r *http.Request) { } func (d *APIV1) putObject(w http.ResponseWriter, r *http.Request) { + namespace := r.PathValue("namespace") key, err := cache.ParseKey(r.PathValue("key")) if err != nil { d.httpError(w, http.StatusBadRequest, err, "Invalid key") @@ -111,7 +117,8 @@ func (d *APIV1) putObject(w http.ResponseWriter, r *http.Request) { // Extract and filter headers from request headers := cache.FilterTransportHeaders(r.Header) - cw, err := d.cache.Create(r.Context(), key, headers, ttl) + namespacedCache := d.cache.Namespace(namespace) + cw, err := namespacedCache.Create(r.Context(), key, headers, ttl) if err != nil { d.httpError(w, http.StatusInternalServerError, err, "Failed to create cache writer", slog.String("key", key.String())) return @@ -129,13 +136,15 @@ func (d *APIV1) putObject(w http.ResponseWriter, r *http.Request) { } func (d *APIV1) deleteObject(w http.ResponseWriter, r *http.Request) { + namespace := r.PathValue("namespace") key, err := cache.ParseKey(r.PathValue("key")) if err != nil { d.httpError(w, http.StatusBadRequest, err, "Invalid key") return } - err = d.cache.Delete(r.Context(), key) + namespacedCache := d.cache.Namespace(namespace) + err = namespacedCache.Delete(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { http.Error(w, "Cache object not found", http.StatusNotFound) @@ -163,6 +172,19 @@ func (d *APIV1) getStats(w http.ResponseWriter, r *http.Request) { } } +func (d *APIV1) getNamespaces(w http.ResponseWriter, r *http.Request) { + namespaces, err := d.cache.ListNamespaces(r.Context()) + if err != nil { + d.httpError(w, http.StatusInternalServerError, err, "Failed to list namespaces") + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(namespaces); err != nil { + d.logger.Error("Failed to encode namespaces response", slog.String("error", err.Error())) + } +} + func (d *APIV1) httpError(w http.ResponseWriter, code int, err error, message string, args ...any) { args = append(args, slog.String("error", err.Error())) d.logger.Error(message, args...) diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index a027d2c..4263d67 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -63,7 +63,6 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { logger.WarnContext(ctx, "Failed to clean up snapshot dir", slog.String("error", rmErr.Error())) } - if err != nil { logger.ErrorContext(ctx, "Snapshot generation failed", slog.String("upstream", upstream), slog.String("error", err.Error())) return errors.Wrap(err, "create snapshot") diff --git a/internal/strategy/handler/handler.go b/internal/strategy/handler/handler.go index 4056329..73e6201 100644 --- a/internal/strategy/handler/handler.go +++ b/internal/strategy/handler/handler.go @@ -97,12 +97,14 @@ func (h *Handler) TTL(f func(*http.Request) time.Duration) *Handler { // 4. If not cached, transform the request and fetch from upstream // 5. Cache the response while streaming to the client. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - logger := logging.FromContext(r.Context()) + ctx := r.Context() + + logger := logging.FromContext(ctx) cacheKeyStr := h.cacheKeyFunc(r) key := cache.NewKey(cacheKeyStr) - logger.DebugContext(r.Context(), "Processing request", slog.String("cache_key", cacheKeyStr)) + logger.DebugContext(ctx, "Processing request", slog.String("cache_key", cacheKeyStr)) if h.serveCached(w, r, key, logger) { return