From 1c5419bed11465fd8f33c57ee9b554abf9aade4e Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Sun, 3 May 2026 00:02:03 +0000 Subject: [PATCH 1/2] transport-discovery: persist latency in a dedicated key, decoupled from registration TTL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Latency lived inside the tp: registration blob and inherited its 5-minute entry-timeout. Bandwidth has always been stored separately at bw:daily:: with a 35-day TTL, so any visor that paused re-registering (TPD restart, network blip, normal churn) silently lost its latency from /metrics until the next CXO push, while bandwidth- today survived intact. Move latency to a peer of bandwidth: - New key: transport-discovery:lat:, JSON {min, max, avg, updated_at} in microseconds, 35-day TTL. - UpdateLatency writes only to that key. The "must be registered" coupling and the TTL-inheriting Set on tp: are gone — those were exactly the reasons latency disappeared with registration churn. avg<=0 still drops the update. - GetTransportMetrics reads the new key in its existing pipeline. - getAllTransportsWithQoS, GetTransportsByEdge, GetTransportByID hydrate entry.Latency from the durable record so the aggregate paths (GetNetworkMetrics, GetVisorAggregateMetrics) and the /transports/id:, /transports/edge:, /transports/edges API endpoints all see the persisted value. TransportData.Latency{Min,Max,Avg} stays in the schema so older payloads decode cleanly, but no write touches them anymore. Reads that go through the QoS hydration step end up with the durable value overlaid on top of the (now always 0) blob field. No semantic change to the value itself: still last-writer-wins, still per-transport (round-trip is symmetric, no per-edge tracking), still no daily aggregation. Only the storage location and lifetime change. Verified post-deploy by: - redis-cli TTL transport-discovery:lat: returns ~3024000s, never the registration TTL. - After a TPD bounce, /metrics carries latency for transports whose visors haven't yet re-pushed a CXO sample. --- .../store/redis_bandwidth.go | 63 +++++++++---------- .../store/redis_metrics.go | 18 +++--- pkg/transport-discovery/store/redis_store.go | 26 ++++++++ .../store/redis_transport.go | 55 +++++++++++++++- 4 files changed, 120 insertions(+), 42 deletions(-) diff --git a/pkg/transport-discovery/store/redis_bandwidth.go b/pkg/transport-discovery/store/redis_bandwidth.go index 4a59b41646..3f3bfd839c 100644 --- a/pkg/transport-discovery/store/redis_bandwidth.go +++ b/pkg/transport-discovery/store/redis_bandwidth.go @@ -106,15 +106,18 @@ func (s *redisStore) UpdateBandwidth(ctx context.Context, transportID string, } // UpdateLatency stores the most recent latency snapshot for a transport -// inside its TransportData blob. Called by the CXO aggregator when a -// visor's TreeStore feed reports a fresh transports//current -// snapshot. Latency is round-trip — both edges should converge on -// similar values, so last-writer-wins keeps this lock-free. +// in a dedicated key (transport-discovery:lat:) with a 35-day TTL, +// independent of the registration blob. Called by the CXO aggregator +// when a visor's TreeStore feed reports a fresh transports//current +// snapshot. Latency is round-trip — both edges converge on similar +// values — so last-writer-wins keeps this lock-free. // -// If the transport hasn't been registered (or its TTL'd out), the -// snapshot is dropped. We intentionally do not fabricate a TransportData -// blob from a latency report alone — registration is the only path that -// knows the canonical edges/type. +// Previously latency was a field inside the tp: blob and inherited +// the 5-minute registration TTL: any visor that paused re-registering +// (TPD restart, network blip, normal churn) silently dropped its +// latency from /metrics until the next CXO push, while bandwidth +// (stored at bw:daily:*) survived. Co-locating with bandwidth's +// retention window restores symmetry. func (s *redisStore) UpdateLatency(ctx context.Context, transportID string, minMS, maxMS, avgMS float64) error { if avgMS <= 0 { return nil @@ -125,40 +128,34 @@ func (s *redisStore) UpdateLatency(ctx context.Context, transportID string, minM return fmt.Errorf("invalid transport id %q: %w", transportID, err) } - tpKey := s.transportKey(id) - raw, err := s.client.Get(ctx, tpKey).Result() - if errors.Is(err, redis.Nil) { - return nil + rec := LatencyRecord{ + Min: int64(minMS * 1000), + Max: int64(maxMS * 1000), + Avg: int64(avgMS * 1000), + UpdatedAt: time.Now().UTC().Unix(), } + raw, err := json.Marshal(rec) if err != nil { return err } + return s.client.Set(ctx, s.latencyKey(id), string(raw), latencyTTL).Err() +} - var data TransportData - if err := json.Unmarshal([]byte(raw), &data); err != nil { - return fmt.Errorf("decode TransportData: %w", err) +// getLatencyRecord reads the durable latency snapshot for a transport, +// returning (nil, nil) when no record exists. +func (s *redisStore) getLatencyRecord(ctx context.Context, id uuid.UUID) (*LatencyRecord, error) { + raw, err := s.client.Get(ctx, s.latencyKey(id)).Result() + if errors.Is(err, redis.Nil) { + return nil, nil } - - // ms → us, matching the TransportData latency field unit. - data.LatencyMin = int64(minMS * 1000) - data.LatencyMax = int64(maxMS * 1000) - data.LatencyAvg = int64(avgMS * 1000) - data.LastUpdate = time.Now().UTC().Unix() - - updated, err := json.Marshal(data) if err != nil { - return err + return nil, err } - - // Preserve the registration TTL — KEEPTTL is the obvious idiom but - // requires Redis 6+. Read TTL, fall back to the configured default - // if Redis returns -1 (no expiry) or -2 (key vanished between GET - // and TTL). - ttl, err := s.client.TTL(ctx, tpKey).Result() - if err != nil || ttl <= 0 { - ttl = s.ttl + var rec LatencyRecord + if err := json.Unmarshal([]byte(raw), &rec); err != nil { + return nil, fmt.Errorf("decode LatencyRecord: %w", err) } - return s.client.Set(ctx, tpKey, string(updated), ttl).Err() + return &rec, nil } // GetTransportBandwidth retrieves bandwidth aggregations for a transport. diff --git a/pkg/transport-discovery/store/redis_metrics.go b/pkg/transport-discovery/store/redis_metrics.go index d033f1953c..5c866f3830 100644 --- a/pkg/transport-discovery/store/redis_metrics.go +++ b/pkg/transport-discovery/store/redis_metrics.go @@ -436,13 +436,15 @@ func (s *redisStore) buildTransportMetrics(ctx context.Context, entries []*trans return []TransportMetric{}, nil } - // Fetch latency data via pipeline + // Fetch latency data via pipeline. Reads the durable lat: key + // (35-day TTL) rather than the tp: registration blob — survives + // the 5-minute registration churn that bandwidth has always survived. var latencyResults []*redis.StringCmd if query.Latency { pipe := s.client.Pipeline() latencyResults = make([]*redis.StringCmd, len(filtered)) for i, f := range filtered { - latencyResults[i] = pipe.Get(ctx, s.transportKey(f.entry.ID)) + latencyResults[i] = pipe.Get(ctx, s.latencyKey(f.entry.ID)) } _, _ = pipe.Exec(ctx) //nolint:errcheck // Errors handled per-command via Result() } @@ -544,16 +546,16 @@ func (s *redisStore) buildTransportMetrics(ctx context.Context, entries []*trans metric.Edges = []string{f.entry.Edges[0].Hex(), f.entry.Edges[1].Hex()} } - // Process latency result + // Process latency result from the durable lat: key. if query.Latency && latencyResults != nil { dataJSON, err := latencyResults[i].Result() if err == nil { - var data TransportData - if json.Unmarshal([]byte(dataJSON), &data) == nil && data.LatencyAvg > 0 { + var rec LatencyRecord + if json.Unmarshal([]byte(dataJSON), &rec) == nil && rec.Avg > 0 { metric.Latency = &TransportLatency{ - Min: data.LatencyMin, - Max: data.LatencyMax, - Avg: data.LatencyAvg, + Min: rec.Min, + Max: rec.Max, + Avg: rec.Avg, } } } diff --git a/pkg/transport-discovery/store/redis_store.go b/pkg/transport-discovery/store/redis_store.go index aefccdf79e..2e26cf08db 100644 --- a/pkg/transport-discovery/store/redis_store.go +++ b/pkg/transport-discovery/store/redis_store.go @@ -28,6 +28,32 @@ type TransportData struct { Bandwidth uint64 `json:"bandwidth"` // Total bytes (sent + recv) LastUpdate int64 `json:"last_update"` // Unix timestamp of last update } + +// LatencyRecord is the durable per-transport latency snapshot persisted at +// transport-discovery:lat:. It lives independently of the registration +// blob (TransportData) so a 5-min entry-timeout cycle doesn't erase +// latency the way it did when the values were stored only inside the tp: +// key. Bandwidth gets the same independence via bw:daily:* keys; this +// gives latency the equivalent durability so /metrics doesn't show a +// transport with bandwidth-today but no latency just because the visor's +// re-registration window briefly lapsed. +// +// Last-writer-wins on the per-transport key: latency is round-trip and +// both edges observe the same RTT modulo measurement noise, so no merge +// or per-edge tracking. Min/Max/Avg are stored in microseconds to match +// the existing TransportData layout. UpdatedAt is informational (lets +// readers gauge staleness) and not currently surfaced to the API. +type LatencyRecord struct { + Min int64 `json:"min"` // microseconds + Max int64 `json:"max"` // microseconds + Avg int64 `json:"avg"` // microseconds + UpdatedAt int64 `json:"updated_at"` +} + +// latencyTTL is how long a latency record sits in redis without being +// refreshed before it ages out. Mirrors bw:daily:* retention so the two +// telemetry types share the same observability window. +const latencyTTL = 35 * 24 * time.Hour type redisStore struct { client *redis.Client ttl time.Duration diff --git a/pkg/transport-discovery/store/redis_transport.go b/pkg/transport-discovery/store/redis_transport.go index d34ade32f1..2d8c87ac9c 100644 --- a/pkg/transport-discovery/store/redis_transport.go +++ b/pkg/transport-discovery/store/redis_transport.go @@ -203,7 +203,14 @@ func (s *redisStore) GetTransportByID(ctx context.Context, id uuid.UUID) (*trans return nil, err } - return s.dataToEntry(data) + entry, err := s.dataToEntry(data) + if err != nil { + return nil, err + } + if rec, _ := s.getLatencyRecord(ctx, id); rec != nil && rec.Avg > 0 { + entry.Latency = float64(rec.Avg) / 1000.0 + } + return entry, nil } func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { @@ -281,6 +288,7 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) return nil, ErrTransportNotFound } + s.hydrateDurableLatency(ctx, entries) s.edgeCache.Put(pk, entries) return entries, nil } @@ -344,6 +352,12 @@ func (s *redisStore) GetAllTransports(ctx context.Context, selfTransports bool) // Cached with the same TTL+slot scheme as GetAllTransports — metrics // scrapers (Prometheus / Victoria Metrics) hit these endpoints on a // regular cadence and were paying a full SCAN+MGET each time. +// +// Latency lives in dedicated lat: keys (independent of the tp: +// registration TTL); after scanAllTransports populates entries from the +// blobs we overlay the durable latency so aggregate-metric paths +// (GetNetworkMetrics, GetVisorAggregateMetrics) see the same values +// /metrics surfaces, including across registration churn. func (s *redisStore) getAllTransportsWithQoS(ctx context.Context, selfTransports bool) ([]*transport.Entry, error) { if entries, ok := s.allTpsCache.Get(selfTransports, true); ok { return entries, nil @@ -352,10 +366,45 @@ func (s *redisStore) getAllTransportsWithQoS(ctx context.Context, selfTransports if err != nil { return nil, err } + s.hydrateDurableLatency(ctx, entries) s.allTpsCache.Put(selfTransports, true, entries) return entries, nil } +// hydrateDurableLatency overlays the persisted lat: values onto +// entry.Latency. Best-effort: a redis or decode error leaves the entry +// at whatever scanAllTransports produced (which after the latency +// move-out is 0 — the blob's lat_avg field is no longer written, but +// remains in the schema for backwards-compatible decoding of older +// payloads still present in redis). +func (s *redisStore) hydrateDurableLatency(ctx context.Context, entries []*transport.Entry) { + if len(entries) == 0 { + return + } + keys := make([]string, len(entries)) + for i, e := range entries { + keys[i] = s.latencyKey(e.ID) + } + vals, err := s.client.MGet(ctx, keys...).Result() + if err != nil { + return + } + for i, v := range vals { + raw, ok := v.(string) + if !ok || raw == "" { + continue + } + var rec LatencyRecord + if err := json.Unmarshal([]byte(raw), &rec); err != nil { + continue + } + if rec.Avg > 0 { + // Same us → ms conversion dataToEntry applies. + entries[i].Latency = float64(rec.Avg) / 1000.0 + } + } +} + // scanAllTransports is the shared implementation for GetAllTransports and getAllTransportsWithQoS. // Reads the transport-id index set built by RegisterTransport / DeregisterTransport // and MGET-fetches the values; lazy-removes stale members whose primary @@ -458,6 +507,10 @@ func (s *redisStore) transportKey(id uuid.UUID) string { return fmt.Sprintf("%s:tp:%s", serviceName, id.String()) } +func (s *redisStore) latencyKey(id uuid.UUID) string { + return fmt.Sprintf("%s:lat:%s", serviceName, id.String()) +} + func (s *redisStore) edgeKey(pk cipher.PubKey) string { return fmt.Sprintf("%s:edge:%s", serviceName, pk.Hex()) } From 19b1a7882f784145df0224c81447e7f6ad8db8c3 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Sun, 3 May 2026 00:11:10 +0000 Subject: [PATCH 2/2] fix lint: gofmt blank line + errcheck nolint on best-effort latency hydration --- pkg/transport-discovery/store/redis_store.go | 1 + pkg/transport-discovery/store/redis_transport.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/transport-discovery/store/redis_store.go b/pkg/transport-discovery/store/redis_store.go index 2e26cf08db..81d4ecd19d 100644 --- a/pkg/transport-discovery/store/redis_store.go +++ b/pkg/transport-discovery/store/redis_store.go @@ -54,6 +54,7 @@ type LatencyRecord struct { // refreshed before it ages out. Mirrors bw:daily:* retention so the two // telemetry types share the same observability window. const latencyTTL = 35 * 24 * time.Hour + type redisStore struct { client *redis.Client ttl time.Duration diff --git a/pkg/transport-discovery/store/redis_transport.go b/pkg/transport-discovery/store/redis_transport.go index 2d8c87ac9c..5f44a18288 100644 --- a/pkg/transport-discovery/store/redis_transport.go +++ b/pkg/transport-discovery/store/redis_transport.go @@ -207,7 +207,8 @@ func (s *redisStore) GetTransportByID(ctx context.Context, id uuid.UUID) (*trans if err != nil { return nil, err } - if rec, _ := s.getLatencyRecord(ctx, id); rec != nil && rec.Avg > 0 { + rec, _ := s.getLatencyRecord(ctx, id) //nolint:errcheck // best-effort overlay; entry stays usable without latency + if rec != nil && rec.Avg > 0 { entry.Latency = float64(rec.Avg) / 1000.0 } return entry, nil