diff --git a/pkg/transport-discovery/store/redis_bandwidth.go b/pkg/transport-discovery/store/redis_bandwidth.go index 4a59b41646..3f3bfd839c 100644 --- a/pkg/transport-discovery/store/redis_bandwidth.go +++ b/pkg/transport-discovery/store/redis_bandwidth.go @@ -106,15 +106,18 @@ func (s *redisStore) UpdateBandwidth(ctx context.Context, transportID string, } // UpdateLatency stores the most recent latency snapshot for a transport -// inside its TransportData blob. Called by the CXO aggregator when a -// visor's TreeStore feed reports a fresh transports//current -// snapshot. Latency is round-trip — both edges should converge on -// similar values, so last-writer-wins keeps this lock-free. +// in a dedicated key (transport-discovery:lat:) with a 35-day TTL, +// independent of the registration blob. Called by the CXO aggregator +// when a visor's TreeStore feed reports a fresh transports//current +// snapshot. Latency is round-trip — both edges converge on similar +// values — so last-writer-wins keeps this lock-free. // -// If the transport hasn't been registered (or its TTL'd out), the -// snapshot is dropped. We intentionally do not fabricate a TransportData -// blob from a latency report alone — registration is the only path that -// knows the canonical edges/type. +// Previously latency was a field inside the tp: blob and inherited +// the 5-minute registration TTL: any visor that paused re-registering +// (TPD restart, network blip, normal churn) silently dropped its +// latency from /metrics until the next CXO push, while bandwidth +// (stored at bw:daily:*) survived. Co-locating with bandwidth's +// retention window restores symmetry. func (s *redisStore) UpdateLatency(ctx context.Context, transportID string, minMS, maxMS, avgMS float64) error { if avgMS <= 0 { return nil @@ -125,40 +128,34 @@ func (s *redisStore) UpdateLatency(ctx context.Context, transportID string, minM return fmt.Errorf("invalid transport id %q: %w", transportID, err) } - tpKey := s.transportKey(id) - raw, err := s.client.Get(ctx, tpKey).Result() - if errors.Is(err, redis.Nil) { - return nil + rec := LatencyRecord{ + Min: int64(minMS * 1000), + Max: int64(maxMS * 1000), + Avg: int64(avgMS * 1000), + UpdatedAt: time.Now().UTC().Unix(), } + raw, err := json.Marshal(rec) if err != nil { return err } + return s.client.Set(ctx, s.latencyKey(id), string(raw), latencyTTL).Err() +} - var data TransportData - if err := json.Unmarshal([]byte(raw), &data); err != nil { - return fmt.Errorf("decode TransportData: %w", err) +// getLatencyRecord reads the durable latency snapshot for a transport, +// returning (nil, nil) when no record exists. +func (s *redisStore) getLatencyRecord(ctx context.Context, id uuid.UUID) (*LatencyRecord, error) { + raw, err := s.client.Get(ctx, s.latencyKey(id)).Result() + if errors.Is(err, redis.Nil) { + return nil, nil } - - // ms → us, matching the TransportData latency field unit. - data.LatencyMin = int64(minMS * 1000) - data.LatencyMax = int64(maxMS * 1000) - data.LatencyAvg = int64(avgMS * 1000) - data.LastUpdate = time.Now().UTC().Unix() - - updated, err := json.Marshal(data) if err != nil { - return err + return nil, err } - - // Preserve the registration TTL — KEEPTTL is the obvious idiom but - // requires Redis 6+. Read TTL, fall back to the configured default - // if Redis returns -1 (no expiry) or -2 (key vanished between GET - // and TTL). - ttl, err := s.client.TTL(ctx, tpKey).Result() - if err != nil || ttl <= 0 { - ttl = s.ttl + var rec LatencyRecord + if err := json.Unmarshal([]byte(raw), &rec); err != nil { + return nil, fmt.Errorf("decode LatencyRecord: %w", err) } - return s.client.Set(ctx, tpKey, string(updated), ttl).Err() + return &rec, nil } // GetTransportBandwidth retrieves bandwidth aggregations for a transport. diff --git a/pkg/transport-discovery/store/redis_metrics.go b/pkg/transport-discovery/store/redis_metrics.go index d033f1953c..5c866f3830 100644 --- a/pkg/transport-discovery/store/redis_metrics.go +++ b/pkg/transport-discovery/store/redis_metrics.go @@ -436,13 +436,15 @@ func (s *redisStore) buildTransportMetrics(ctx context.Context, entries []*trans return []TransportMetric{}, nil } - // Fetch latency data via pipeline + // Fetch latency data via pipeline. Reads the durable lat: key + // (35-day TTL) rather than the tp: registration blob — survives + // the 5-minute registration churn that bandwidth has always survived. var latencyResults []*redis.StringCmd if query.Latency { pipe := s.client.Pipeline() latencyResults = make([]*redis.StringCmd, len(filtered)) for i, f := range filtered { - latencyResults[i] = pipe.Get(ctx, s.transportKey(f.entry.ID)) + latencyResults[i] = pipe.Get(ctx, s.latencyKey(f.entry.ID)) } _, _ = pipe.Exec(ctx) //nolint:errcheck // Errors handled per-command via Result() } @@ -544,16 +546,16 @@ func (s *redisStore) buildTransportMetrics(ctx context.Context, entries []*trans metric.Edges = []string{f.entry.Edges[0].Hex(), f.entry.Edges[1].Hex()} } - // Process latency result + // Process latency result from the durable lat: key. if query.Latency && latencyResults != nil { dataJSON, err := latencyResults[i].Result() if err == nil { - var data TransportData - if json.Unmarshal([]byte(dataJSON), &data) == nil && data.LatencyAvg > 0 { + var rec LatencyRecord + if json.Unmarshal([]byte(dataJSON), &rec) == nil && rec.Avg > 0 { metric.Latency = &TransportLatency{ - Min: data.LatencyMin, - Max: data.LatencyMax, - Avg: data.LatencyAvg, + Min: rec.Min, + Max: rec.Max, + Avg: rec.Avg, } } } diff --git a/pkg/transport-discovery/store/redis_store.go b/pkg/transport-discovery/store/redis_store.go index aefccdf79e..81d4ecd19d 100644 --- a/pkg/transport-discovery/store/redis_store.go +++ b/pkg/transport-discovery/store/redis_store.go @@ -28,6 +28,33 @@ type TransportData struct { Bandwidth uint64 `json:"bandwidth"` // Total bytes (sent + recv) LastUpdate int64 `json:"last_update"` // Unix timestamp of last update } + +// LatencyRecord is the durable per-transport latency snapshot persisted at +// transport-discovery:lat:. It lives independently of the registration +// blob (TransportData) so a 5-min entry-timeout cycle doesn't erase +// latency the way it did when the values were stored only inside the tp: +// key. Bandwidth gets the same independence via bw:daily:* keys; this +// gives latency the equivalent durability so /metrics doesn't show a +// transport with bandwidth-today but no latency just because the visor's +// re-registration window briefly lapsed. +// +// Last-writer-wins on the per-transport key: latency is round-trip and +// both edges observe the same RTT modulo measurement noise, so no merge +// or per-edge tracking. Min/Max/Avg are stored in microseconds to match +// the existing TransportData layout. UpdatedAt is informational (lets +// readers gauge staleness) and not currently surfaced to the API. +type LatencyRecord struct { + Min int64 `json:"min"` // microseconds + Max int64 `json:"max"` // microseconds + Avg int64 `json:"avg"` // microseconds + UpdatedAt int64 `json:"updated_at"` +} + +// latencyTTL is how long a latency record sits in redis without being +// refreshed before it ages out. Mirrors bw:daily:* retention so the two +// telemetry types share the same observability window. +const latencyTTL = 35 * 24 * time.Hour + type redisStore struct { client *redis.Client ttl time.Duration diff --git a/pkg/transport-discovery/store/redis_transport.go b/pkg/transport-discovery/store/redis_transport.go index d34ade32f1..5f44a18288 100644 --- a/pkg/transport-discovery/store/redis_transport.go +++ b/pkg/transport-discovery/store/redis_transport.go @@ -203,7 +203,15 @@ func (s *redisStore) GetTransportByID(ctx context.Context, id uuid.UUID) (*trans return nil, err } - return s.dataToEntry(data) + entry, err := s.dataToEntry(data) + if err != nil { + return nil, err + } + rec, _ := s.getLatencyRecord(ctx, id) //nolint:errcheck // best-effort overlay; entry stays usable without latency + if rec != nil && rec.Avg > 0 { + entry.Latency = float64(rec.Avg) / 1000.0 + } + return entry, nil } func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { @@ -281,6 +289,7 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) return nil, ErrTransportNotFound } + s.hydrateDurableLatency(ctx, entries) s.edgeCache.Put(pk, entries) return entries, nil } @@ -344,6 +353,12 @@ func (s *redisStore) GetAllTransports(ctx context.Context, selfTransports bool) // Cached with the same TTL+slot scheme as GetAllTransports — metrics // scrapers (Prometheus / Victoria Metrics) hit these endpoints on a // regular cadence and were paying a full SCAN+MGET each time. +// +// Latency lives in dedicated lat: keys (independent of the tp: +// registration TTL); after scanAllTransports populates entries from the +// blobs we overlay the durable latency so aggregate-metric paths +// (GetNetworkMetrics, GetVisorAggregateMetrics) see the same values +// /metrics surfaces, including across registration churn. func (s *redisStore) getAllTransportsWithQoS(ctx context.Context, selfTransports bool) ([]*transport.Entry, error) { if entries, ok := s.allTpsCache.Get(selfTransports, true); ok { return entries, nil @@ -352,10 +367,45 @@ func (s *redisStore) getAllTransportsWithQoS(ctx context.Context, selfTransports if err != nil { return nil, err } + s.hydrateDurableLatency(ctx, entries) s.allTpsCache.Put(selfTransports, true, entries) return entries, nil } +// hydrateDurableLatency overlays the persisted lat: values onto +// entry.Latency. Best-effort: a redis or decode error leaves the entry +// at whatever scanAllTransports produced (which after the latency +// move-out is 0 — the blob's lat_avg field is no longer written, but +// remains in the schema for backwards-compatible decoding of older +// payloads still present in redis). +func (s *redisStore) hydrateDurableLatency(ctx context.Context, entries []*transport.Entry) { + if len(entries) == 0 { + return + } + keys := make([]string, len(entries)) + for i, e := range entries { + keys[i] = s.latencyKey(e.ID) + } + vals, err := s.client.MGet(ctx, keys...).Result() + if err != nil { + return + } + for i, v := range vals { + raw, ok := v.(string) + if !ok || raw == "" { + continue + } + var rec LatencyRecord + if err := json.Unmarshal([]byte(raw), &rec); err != nil { + continue + } + if rec.Avg > 0 { + // Same us → ms conversion dataToEntry applies. + entries[i].Latency = float64(rec.Avg) / 1000.0 + } + } +} + // scanAllTransports is the shared implementation for GetAllTransports and getAllTransportsWithQoS. // Reads the transport-id index set built by RegisterTransport / DeregisterTransport // and MGET-fetches the values; lazy-removes stale members whose primary @@ -458,6 +508,10 @@ func (s *redisStore) transportKey(id uuid.UUID) string { return fmt.Sprintf("%s:tp:%s", serviceName, id.String()) } +func (s *redisStore) latencyKey(id uuid.UUID) string { + return fmt.Sprintf("%s:lat:%s", serviceName, id.String()) +} + func (s *redisStore) edgeKey(pk cipher.PubKey) string { return fmt.Sprintf("%s:edge:%s", serviceName, pk.Hex()) }