From 3f5008ecca2948ca4a1a01055b3c4ccb8084a857 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 11:16:34 +0300 Subject: [PATCH 1/2] feat(telemetry): add distributed heartbeat metrics and centralize attribute keys - Add DistHeartbeatMetrics() method to expose heartbeat success/failure, nodes removed, and read primary promote metrics - Create internal/telemetry/attrs package to centralize OpenTelemetry attribute key constants - Add /cluster/heartbeat endpoint to management HTTP server for heartbeat metrics access - Update cache Item struct with LastUpdated field for distributed usage tracking - Set LastUpdated timestamp for replicated writes and version assignments - Refactor OTel middleware to use centralized attribute constants from attrs package This change improves observability for distributed cache operations and reduces code duplication by centralizing telemetry attribute definitions. --- hypercache.go | 16 ++++++++++++++++ internal/telemetry/attrs/keys.go | 29 +++++++++++++++++++++++++++++ management_http.go | 8 ++++++++ pkg/backend/dist_http_server.go | 13 +++++++------ pkg/backend/dist_http_transport.go | 11 ++++++----- pkg/backend/dist_memory.go | 3 ++- pkg/cache/v2/item.go | 1 + pkg/middleware/otel_metrics.go | 23 +++++++++++------------ pkg/middleware/otel_tracing.go | 27 ++++++++++++++++----------- 9 files changed, 96 insertions(+), 35 deletions(-) create mode 100644 internal/telemetry/attrs/keys.go diff --git a/hypercache.go b/hypercache.go index ab4756b..70b9b2c 100644 --- a/hypercache.go +++ b/hypercache.go @@ -1056,6 +1056,22 @@ func (hyperCache *HyperCache[T]) DistRingHashSpots() []string { //nolint:ireturn return nil } +// DistHeartbeatMetrics returns distributed heartbeat metrics if supported. +func (hyperCache *HyperCache[T]) DistHeartbeatMetrics() any { //nolint:ireturn + if dm, ok := any(hyperCache.backend).(*backend.DistMemory); ok { + m := dm.Metrics() + + return map[string]any{ + "heartbeatSuccess": m.HeartbeatSuccess, + "heartbeatFailure": m.HeartbeatFailure, + "nodesRemoved": m.NodesRemoved, + "readPrimaryPromote": m.ReadPrimaryPromote, + } + } + + return nil +} + // ManagementHTTPAddress returns the bound address of the optional management HTTP server. // Empty string when the server is disabled or failed to start. func (hyperCache *HyperCache[T]) ManagementHTTPAddress() string { diff --git a/internal/telemetry/attrs/keys.go b/internal/telemetry/attrs/keys.go new file mode 100644 index 0000000..8fa8edf --- /dev/null +++ b/internal/telemetry/attrs/keys.go @@ -0,0 +1,29 @@ +// Package attrs provides reusable OpenTelemetry attribute key constants +// to avoid duplication across middlewares. +// Package attrs defines telemetry attribute keys used for observability and monitoring +// across the hypercache system. These constants provide standardized key names for +// metrics, traces, and logs to ensure consistent telemetry data collection. +package attrs + +const ( + // AttrKeyLength represents the telemetry attribute key for measuring the length + // of a cache key in bytes. This metric helps monitor key size distribution + // and identify potential performance impacts from oversized keys. + AttrKeyLength = "key.len" + // AttrKeysCount represents the telemetry attribute key for measuring the number + // of cache keys being processed. This metric helps monitor the workload and + // identify potential bottlenecks in key management. + AttrKeysCount = "keys.count" + // AttrResultCount represents the telemetry attribute key for measuring the number + // of cache results returned. This metric helps monitor the effectiveness of cache + // lookups and identify potential issues with cache population. + AttrResultCount = "result.count" + // AttrFailedCount represents the telemetry attribute key for measuring the number + // of cache operations that failed. This metric helps monitor error rates and + // identify potential issues with cache reliability. + AttrFailedCount = "failed.count" + // AttrExpirationMS represents the telemetry attribute key for measuring the expiration + // time of cache items in milliseconds. This metric helps monitor cache item lifetimes + // and identify potential issues with cache eviction policies. + AttrExpirationMS = "expiration.ms" +) diff --git a/management_http.go b/management_http.go index 2e6d4a4..4191397 100644 --- a/management_http.go +++ b/management_http.go @@ -102,6 +102,7 @@ type membershipIntrospect interface { vnodes int, ) DistRingHashSpots() []string + DistHeartbeatMetrics() any } // Start launches listener (idempotent). Caller provides cache for handler wiring. @@ -257,6 +258,13 @@ func (s *ManagementHTTPServer) registerCluster(useAuth func(fiber.Handler) fiber return fiberCtx.JSON(fiber.Map{"count": len(spots), "vnodes": spots}) } + return fiberCtx.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "distributed backend unsupported"}) + })) + s.app.Get("/cluster/heartbeat", useAuth(func(fiberCtx fiber.Ctx) error { // heartbeat metrics + if mi, ok := hc.(membershipIntrospect); ok { + return fiberCtx.JSON(mi.DistHeartbeatMetrics()) + } + return fiberCtx.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "distributed backend unsupported"}) })) } diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index cf3e2df..ecfdf48 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -55,12 +55,13 @@ func (s *distHTTPServer) registerSet(ctx context.Context, dm *DistMemory) { //no return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": unmarshalErr.Error()}) } - it := &cache.Item{ - Key: req.Key, - Value: req.Value, - Expiration: time.Duration(req.Expiration) * time.Millisecond, - Version: req.Version, - Origin: req.Origin, + it := &cache.Item{ // LastUpdated set to now for replicated writes + Key: req.Key, + Value: req.Value, + Expiration: time.Duration(req.Expiration) * time.Millisecond, + Version: req.Version, + Origin: req.Origin, + LastUpdated: time.Now(), } dm.applySet(ctx, it, req.Replicate) diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go index e65f4db..2f9d0da 100644 --- a/pkg/backend/dist_http_transport.go +++ b/pkg/backend/dist_http_transport.go @@ -183,11 +183,12 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn } // reconstruct cache.Item (we ignore expiration formatting difference vs ms) return &cache.Item{ // multi-line for readability - Key: mirror.Key, - Value: mirror.Value, - Expiration: time.Duration(mirror.Expiration) * time.Millisecond, - Version: mirror.Version, - Origin: mirror.Origin, + Key: mirror.Key, + Value: mirror.Value, + Expiration: time.Duration(mirror.Expiration) * time.Millisecond, + Version: mirror.Version, + Origin: mirror.Origin, + LastUpdated: time.Now(), }, true, nil } diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 60e5bc7..f56a30a 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -253,9 +253,10 @@ func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error { //nolin } } - // primary path: assign version + // primary path: assign version & timestamp item.Version = atomic.AddUint64(&dm.versionCounter, 1) item.Origin = string(dm.localNode.ID) + item.LastUpdated = time.Now() dm.applySet(ctx, item, false) acks := 1 + dm.replicateTo(ctx, item, owners[1:]) diff --git a/pkg/cache/v2/item.go b/pkg/cache/v2/item.go index 0f4ab84..4b98ee6 100644 --- a/pkg/cache/v2/item.go +++ b/pkg/cache/v2/item.go @@ -62,6 +62,7 @@ type Item struct { Key string // key of the item Value any // value of the item LastAccess time.Time // last access time + LastUpdated time.Time // last write/version assignment time (distributed usage) Size int64 // size in bytes Expiration time.Duration // expiration duration AccessCount uint32 // number of times the item has been accessed diff --git a/pkg/middleware/otel_metrics.go b/pkg/middleware/otel_metrics.go index 6e31a8b..5a04606 100644 --- a/pkg/middleware/otel_metrics.go +++ b/pkg/middleware/otel_metrics.go @@ -9,13 +9,12 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/hyp3rd/hypercache" + "github.com/hyp3rd/hypercache/internal/telemetry/attrs" "github.com/hyp3rd/hypercache/pkg/backend" "github.com/hyp3rd/hypercache/pkg/cache" "github.com/hyp3rd/hypercache/pkg/stats" ) -const attrKeyLen = "key.len" // reused attribute key name - // OTelMetricsMiddleware emits OpenTelemetry metrics for service methods. type OTelMetricsMiddleware struct { next hypercache.Service @@ -45,7 +44,7 @@ func NewOTelMetricsMiddleware(next hypercache.Service, meter metric.Meter) (hype func (mw *OTelMetricsMiddleware) Get(ctx context.Context, key string) (any, bool) { start := time.Now() v, ok := mw.next.Get(ctx, key) - mw.rec(ctx, "Get", start, attribute.Int(attrKeyLen, len(key)), attribute.Bool("hit", ok)) + mw.rec(ctx, "Get", start, attribute.Int(attrs.AttrKeyLength, len(key)), attribute.Bool("hit", ok)) return v, ok } @@ -54,7 +53,7 @@ func (mw *OTelMetricsMiddleware) Get(ctx context.Context, key string) (any, bool func (mw *OTelMetricsMiddleware) Set(ctx context.Context, key string, value any, expiration time.Duration) error { start := time.Now() err := mw.next.Set(ctx, key, value, expiration) - mw.rec(ctx, "Set", start, attribute.Int(attrKeyLen, len(key))) + mw.rec(ctx, "Set", start, attribute.Int(attrs.AttrKeyLength, len(key))) return err } @@ -63,7 +62,7 @@ func (mw *OTelMetricsMiddleware) Set(ctx context.Context, key string, value any, func (mw *OTelMetricsMiddleware) GetOrSet(ctx context.Context, key string, value any, expiration time.Duration) (any, error) { start := time.Now() v, err := mw.next.GetOrSet(ctx, key, value, expiration) - mw.rec(ctx, "GetOrSet", start, attribute.Int(attrKeyLen, len(key))) + mw.rec(ctx, "GetOrSet", start, attribute.Int(attrs.AttrKeyLength, len(key))) return v, err } @@ -72,7 +71,7 @@ func (mw *OTelMetricsMiddleware) GetOrSet(ctx context.Context, key string, value func (mw *OTelMetricsMiddleware) GetWithInfo(ctx context.Context, key string) (*cache.Item, bool) { start := time.Now() it, ok := mw.next.GetWithInfo(ctx, key) - mw.rec(ctx, "GetWithInfo", start, attribute.Int(attrKeyLen, len(key)), attribute.Bool("hit", ok)) + mw.rec(ctx, "GetWithInfo", start, attribute.Int(attrs.AttrKeyLength, len(key)), attribute.Bool("hit", ok)) return it, ok } @@ -85,9 +84,9 @@ func (mw *OTelMetricsMiddleware) GetMultiple(ctx context.Context, keys ...string ctx, "GetMultiple", start, - attribute.Int("keys.count", len(keys)), - attribute.Int("result.count", len(res)), - attribute.Int("failed.count", len(failed)), + attribute.Int(attrs.AttrKeysCount, len(keys)), + attribute.Int(attrs.AttrResultCount, len(res)), + attribute.Int(attrs.AttrFailedCount, len(failed)), ) return res, failed @@ -145,10 +144,10 @@ func (mw *OTelMetricsMiddleware) Stop(ctx context.Context) error { return mw.nex func (mw *OTelMetricsMiddleware) GetStats() stats.Stats { return mw.next.GetStats() } // rec records call count and duration with attributes. -func (mw *OTelMetricsMiddleware) rec(ctx context.Context, method string, start time.Time, attrs ...attribute.KeyValue) { +func (mw *OTelMetricsMiddleware) rec(ctx context.Context, method string, start time.Time, attributes ...attribute.KeyValue) { base := []attribute.KeyValue{attribute.String("method", method)} - if len(attrs) > 0 { - base = append(base, attrs...) + if len(attributes) > 0 { + base = append(base, attributes...) } mw.calls.Add(ctx, 1, metric.WithAttributes(base...)) diff --git a/pkg/middleware/otel_tracing.go b/pkg/middleware/otel_tracing.go index e08f231..c66f1ea 100644 --- a/pkg/middleware/otel_tracing.go +++ b/pkg/middleware/otel_tracing.go @@ -9,13 +9,12 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/hyp3rd/hypercache" + "github.com/hyp3rd/hypercache/internal/telemetry/attrs" "github.com/hyp3rd/hypercache/pkg/backend" "github.com/hyp3rd/hypercache/pkg/cache" "github.com/hyp3rd/hypercache/pkg/stats" ) -const attrKeyLength = "key.len" - // OTelTracingMiddleware wraps hypercache.Service methods with OpenTelemetry spans. type OTelTracingMiddleware struct { next hypercache.Service @@ -28,8 +27,8 @@ type OTelTracingMiddleware struct { type OTelTracingOption func(*OTelTracingMiddleware) // WithCommonAttributes sets attributes applied to all spans. -func WithCommonAttributes(attrs ...attribute.KeyValue) OTelTracingOption { - return func(m *OTelTracingMiddleware) { m.commonAttrs = append(m.commonAttrs, attrs...) } +func WithCommonAttributes(attributes ...attribute.KeyValue) OTelTracingOption { + return func(m *OTelTracingMiddleware) { m.commonAttrs = append(m.commonAttrs, attributes...) } } // NewOTelTracingMiddleware creates a tracing middleware. @@ -44,7 +43,7 @@ func NewOTelTracingMiddleware(next hypercache.Service, tracer trace.Tracer, opts // Get implements Service.Get with tracing. func (mw OTelTracingMiddleware) Get(ctx context.Context, key string) (any, bool) { - ctx, span := mw.startSpan(ctx, "hypercache.Get", attribute.Int(attrKeyLength, len(key))) + ctx, span := mw.startSpan(ctx, "hypercache.Get", attribute.Int(attrs.AttrKeyLength, len(key))) defer span.End() v, ok := mw.next.Get(ctx, key) @@ -55,7 +54,10 @@ func (mw OTelTracingMiddleware) Get(ctx context.Context, key string) (any, bool) // Set implements Service.Set with tracing. func (mw OTelTracingMiddleware) Set(ctx context.Context, key string, value any, expiration time.Duration) error { - ctx, span := mw.startSpan(ctx, "hypercache.Set", attribute.Int(attrKeyLength, len(key)), attribute.Int64("expiration.ms", expiration.Milliseconds())) + ctx, span := mw.startSpan( + ctx, "hypercache.Set", + attribute.Int(attrs.AttrKeyLength, len(key)), + attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds())) defer span.End() err := mw.next.Set(ctx, key, value, expiration) @@ -68,7 +70,10 @@ func (mw OTelTracingMiddleware) Set(ctx context.Context, key string, value any, // GetOrSet implements Service.GetOrSet with tracing. func (mw OTelTracingMiddleware) GetOrSet(ctx context.Context, key string, value any, expiration time.Duration) (any, error) { - ctx, span := mw.startSpan(ctx, "hypercache.GetOrSet", attribute.Int(attrKeyLength, len(key)), attribute.Int64("expiration.ms", expiration.Milliseconds())) + ctx, span := mw.startSpan( + ctx, "hypercache.GetOrSet", + attribute.Int(attrs.AttrKeyLength, len(key)), + attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds())) defer span.End() v, err := mw.next.GetOrSet(ctx, key, value, expiration) @@ -81,7 +86,7 @@ func (mw OTelTracingMiddleware) GetOrSet(ctx context.Context, key string, value // GetWithInfo implements Service.GetWithInfo with tracing. func (mw OTelTracingMiddleware) GetWithInfo(ctx context.Context, key string) (*cache.Item, bool) { - ctx, span := mw.startSpan(ctx, "hypercache.GetWithInfo", attribute.Int(attrKeyLength, len(key))) + ctx, span := mw.startSpan(ctx, "hypercache.GetWithInfo", attribute.Int(attrs.AttrKeyLength, len(key))) defer span.End() it, ok := mw.next.GetWithInfo(ctx, key) @@ -173,14 +178,14 @@ func (mw OTelTracingMiddleware) Stop(ctx context.Context) error { func (mw OTelTracingMiddleware) GetStats() stats.Stats { return mw.next.GetStats() } // startSpan starts a span with common and provided attributes. -func (mw OTelTracingMiddleware) startSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { +func (mw OTelTracingMiddleware) startSpan(ctx context.Context, name string, attributes ...attribute.KeyValue) (context.Context, trace.Span) { ctx, span := mw.tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal)) if len(mw.commonAttrs) > 0 { span.SetAttributes(mw.commonAttrs...) } - if len(attrs) > 0 { - span.SetAttributes(attrs...) + if len(attributes) > 0 { + span.SetAttributes(attributes...) } return ctx, span From f7d44b46872ea1d9dc215371eb896ee43ce28c4e Mon Sep 17 00:00:00 2001 From: "F." <62474964+hyp3rd@users.noreply.github.com> Date: Sat, 23 Aug 2025 10:18:31 +0200 Subject: [PATCH 2/2] Update internal/telemetry/attrs/keys.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/telemetry/attrs/keys.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/telemetry/attrs/keys.go b/internal/telemetry/attrs/keys.go index 8fa8edf..1854c1d 100644 --- a/internal/telemetry/attrs/keys.go +++ b/internal/telemetry/attrs/keys.go @@ -1,8 +1,7 @@ -// Package attrs provides reusable OpenTelemetry attribute key constants -// to avoid duplication across middlewares. -// Package attrs defines telemetry attribute keys used for observability and monitoring -// across the hypercache system. These constants provide standardized key names for -// metrics, traces, and logs to ensure consistent telemetry data collection. +// Package attrs provides reusable OpenTelemetry attribute key constants to avoid duplication +// across middlewares. It defines telemetry attribute keys used for observability and monitoring +// across the hypercache system. These constants provide standardized key names for metrics, +// traces, and logs to ensure consistent telemetry data collection. package attrs const (