From 1b27adb278b0d86e14ae94b117598220050ffc76 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Mon, 13 Apr 2026 13:29:51 -0700 Subject: [PATCH] feat: add snapshot serve metrics by source and repository Add two new OTel metrics to the git strategy: - cachew.git.snapshot_serves_total: counter of snapshot serves tagged by source (cache, cold_cache, spool, generated) and repository - cachew.git.snapshot_serve_bytes: histogram of snapshot sizes served These metrics enable tracking cache hit rates and per-repo snapshot serving patterns to inform cache priming decisions. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019d8cfb-c84e-72db-9756-c0cedcd5540e --- internal/strategy/git/metrics.go | 29 +++++++++++++++++++++++------ internal/strategy/git/snapshot.go | 27 +++++++++++++++++++-------- internal/strategy/git/spool.go | 7 +++++++ 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/internal/strategy/git/metrics.go b/internal/strategy/git/metrics.go index a9b0fe20..243f1225 100644 --- a/internal/strategy/git/metrics.go +++ b/internal/strategy/git/metrics.go @@ -12,17 +12,21 @@ import ( ) type gitMetrics struct { - operationDuration metric.Float64Histogram - operationTotal metric.Int64Counter - requestTotal metric.Int64Counter + operationDuration metric.Float64Histogram + operationTotal metric.Int64Counter + requestTotal metric.Int64Counter + snapshotServeTotal metric.Int64Counter + snapshotServeSize metric.Float64Histogram } func newGitMetrics() *gitMetrics { meter := otel.Meter("cachew.git") return &gitMetrics{ - operationDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)"), - operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"), - requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"), + operationDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)"), + operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"), + requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"), + snapshotServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.snapshot_serves_total", "{serves}", "Snapshot serve events by source (cache, spool, cold_cache) and repository"), + snapshotServeSize: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.git.snapshot_serve_bytes", "By", "Size of served snapshots in bytes"), } } @@ -39,3 +43,16 @@ func (m *gitMetrics) recordOperation(ctx context.Context, operation, status stri func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) { m.requestTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("type", requestType))) } + +// recordSnapshotServe records a snapshot serve event with its source and repository. +// Source is one of: "cache", "cold_cache", "spool", "generated". +func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo string, sizeBytes int64) { + attrs := metric.WithAttributes( + attribute.String("source", source), + attribute.String("repository", repo), + ) + m.snapshotServeTotal.Add(ctx, 1, attrs) + if sizeBytes > 0 { + m.snapshotServeSize.Record(ctx, float64(sizeBytes), attrs) + } +} diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 30d5df86..0bad5d3d 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -192,6 +192,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst")) upstreamURL := "https://" + host + "/" + repoPath + repoName := host + "/" + repoPath repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) if repoErr != nil { @@ -220,7 +221,9 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, }() logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL) w.Header().Set("Content-Type", "application/zstd") - if _, err := io.Copy(w, reader); err != nil { + n, err := io.Copy(w, reader) + s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n) + if err != nil { logger.WarnContext(ctx, "Failed to stream locally cached snapshot", "upstream", upstreamURL, "error", err) } return @@ -234,7 +237,9 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, if openErr == nil && reader != nil { logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL) w.Header().Set("Content-Type", "application/zstd") - if _, err := io.Copy(w, reader); err != nil { + n, err := io.Copy(w, reader) + s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n) + if err != nil { logger.WarnContext(ctx, "Failed to stream cached snapshot", "upstream", upstreamURL, "error", err) } _ = reader.Close() @@ -264,14 +269,14 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } if reader == nil { - if err := s.serveSnapshotWithSpool(w, r, repo, upstreamURL); err != nil { + if err := s.serveSnapshotWithSpool(w, r, repo, upstreamURL, repoName); err != nil { logger.ErrorContext(ctx, "Failed to serve snapshot via spool", "upstream", upstreamURL, "error", err) } return } defer reader.Close() - if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL); err != nil { + if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL, repoName); err != nil { logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err) } } @@ -343,7 +348,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h } } -func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error { +func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL, repoName string) error { snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") mirrorHead := s.getMirrorHead(ctx, repo) @@ -376,7 +381,8 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW } w.Header().Set("Content-Type", "application/zstd") - _, err := io.Copy(w, reader) + n, err := io.Copy(w, reader) + s.metrics.recordSnapshotServe(ctx, "cache", repoName, n) return errors.Wrap(err, "stream snapshot") } @@ -454,7 +460,7 @@ func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, // mirror, streams tar+zstd to both the HTTP client and a spool file, then // triggers a background cache backfill. Concurrent requests for the same URL // become readers that follow the spool, avoiding redundant clone+tar work. -func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string) error { +func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL, repoName string) error { ctx := r.Context() logger := logging.FromContext(ctx) @@ -475,13 +481,18 @@ func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request } return errors.Wrap(err, "snapshot spool read") } + s.metrics.recordSnapshotServe(ctx, "spool", repoName, spool.Written()) return nil } // Writer failed; fall through to generate independently. return s.streamSnapshotDirect(w, r, repo) } - return s.writeSnapshotSpool(w, r, repo, upstreamURL, entry) + err := s.writeSnapshotSpool(w, r, repo, upstreamURL, entry) + if err == nil { + s.metrics.recordSnapshotServe(ctx, "generated", repoName, entry.spool.Written()) + } + return err } // streamSnapshotDirect streams a snapshot directly to the client without diff --git a/internal/strategy/git/spool.go b/internal/strategy/git/spool.go index 48821396..bcb5909d 100644 --- a/internal/strategy/git/spool.go +++ b/internal/strategy/git/spool.go @@ -94,6 +94,13 @@ func (rs *ResponseSpool) MarkError(err error) { rs.cond.Broadcast() } +// Written returns the total number of bytes written to the spool. +func (rs *ResponseSpool) Written() int64 { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.written +} + func (rs *ResponseSpool) Failed() bool { rs.mu.Lock() defer rs.mu.Unlock()