Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 30 additions & 33 deletions pkg/transport-discovery/store/redis_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,18 @@ func (s *redisStore) UpdateBandwidth(ctx context.Context, transportID string,
}

// UpdateLatency stores the most recent latency snapshot for a transport
// inside its TransportData blob. Called by the CXO aggregator when a
// visor's TreeStore feed reports a fresh transports/<id>/current
// snapshot. Latency is round-trip — both edges should converge on
// similar values, so last-writer-wins keeps this lock-free.
// in a dedicated key (transport-discovery:lat:<id>) with a 35-day TTL,
// independent of the registration blob. Called by the CXO aggregator
// when a visor's TreeStore feed reports a fresh transports/<id>/current
// snapshot. Latency is round-trip — both edges converge on similar
// values — so last-writer-wins keeps this lock-free.
//
// If the transport hasn't been registered (or its TTL'd out), the
// snapshot is dropped. We intentionally do not fabricate a TransportData
// blob from a latency report alone — registration is the only path that
// knows the canonical edges/type.
// Previously latency was a field inside the tp:<id> blob and inherited
// the 5-minute registration TTL: any visor that paused re-registering
// (TPD restart, network blip, normal churn) silently dropped its
// latency from /metrics until the next CXO push, while bandwidth
// (stored at bw:daily:*) survived. Co-locating with bandwidth's
// retention window restores symmetry.
func (s *redisStore) UpdateLatency(ctx context.Context, transportID string, minMS, maxMS, avgMS float64) error {
if avgMS <= 0 {
return nil
Expand All @@ -125,40 +128,34 @@ func (s *redisStore) UpdateLatency(ctx context.Context, transportID string, minM
return fmt.Errorf("invalid transport id %q: %w", transportID, err)
}

tpKey := s.transportKey(id)
raw, err := s.client.Get(ctx, tpKey).Result()
if errors.Is(err, redis.Nil) {
return nil
rec := LatencyRecord{
Min: int64(minMS * 1000),
Max: int64(maxMS * 1000),
Avg: int64(avgMS * 1000),
UpdatedAt: time.Now().UTC().Unix(),
}
raw, err := json.Marshal(rec)
if err != nil {
return err
}
return s.client.Set(ctx, s.latencyKey(id), string(raw), latencyTTL).Err()
}

var data TransportData
if err := json.Unmarshal([]byte(raw), &data); err != nil {
return fmt.Errorf("decode TransportData: %w", err)
// getLatencyRecord reads the durable latency snapshot for a transport,
// returning (nil, nil) when no record exists.
func (s *redisStore) getLatencyRecord(ctx context.Context, id uuid.UUID) (*LatencyRecord, error) {
raw, err := s.client.Get(ctx, s.latencyKey(id)).Result()
if errors.Is(err, redis.Nil) {
return nil, nil
}

// ms → us, matching the TransportData latency field unit.
data.LatencyMin = int64(minMS * 1000)
data.LatencyMax = int64(maxMS * 1000)
data.LatencyAvg = int64(avgMS * 1000)
data.LastUpdate = time.Now().UTC().Unix()

updated, err := json.Marshal(data)
if err != nil {
return err
return nil, err
}

// Preserve the registration TTL — KEEPTTL is the obvious idiom but
// requires Redis 6+. Read TTL, fall back to the configured default
// if Redis returns -1 (no expiry) or -2 (key vanished between GET
// and TTL).
ttl, err := s.client.TTL(ctx, tpKey).Result()
if err != nil || ttl <= 0 {
ttl = s.ttl
var rec LatencyRecord
if err := json.Unmarshal([]byte(raw), &rec); err != nil {
return nil, fmt.Errorf("decode LatencyRecord: %w", err)
}
return s.client.Set(ctx, tpKey, string(updated), ttl).Err()
return &rec, nil
}

// GetTransportBandwidth retrieves bandwidth aggregations for a transport.
Expand Down
18 changes: 10 additions & 8 deletions pkg/transport-discovery/store/redis_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,15 @@ func (s *redisStore) buildTransportMetrics(ctx context.Context, entries []*trans
return []TransportMetric{}, nil
}

// Fetch latency data via pipeline
// Fetch latency data via pipeline. Reads the durable lat:<id> key
// (35-day TTL) rather than the tp:<id> registration blob — survives
// the 5-minute registration churn that bandwidth has always survived.
var latencyResults []*redis.StringCmd
if query.Latency {
pipe := s.client.Pipeline()
latencyResults = make([]*redis.StringCmd, len(filtered))
for i, f := range filtered {
latencyResults[i] = pipe.Get(ctx, s.transportKey(f.entry.ID))
latencyResults[i] = pipe.Get(ctx, s.latencyKey(f.entry.ID))
}
_, _ = pipe.Exec(ctx) //nolint:errcheck // Errors handled per-command via Result()
}
Expand Down Expand Up @@ -544,16 +546,16 @@ func (s *redisStore) buildTransportMetrics(ctx context.Context, entries []*trans
metric.Edges = []string{f.entry.Edges[0].Hex(), f.entry.Edges[1].Hex()}
}

// Process latency result
// Process latency result from the durable lat:<id> key.
if query.Latency && latencyResults != nil {
dataJSON, err := latencyResults[i].Result()
if err == nil {
var data TransportData
if json.Unmarshal([]byte(dataJSON), &data) == nil && data.LatencyAvg > 0 {
var rec LatencyRecord
if json.Unmarshal([]byte(dataJSON), &rec) == nil && rec.Avg > 0 {
metric.Latency = &TransportLatency{
Min: data.LatencyMin,
Max: data.LatencyMax,
Avg: data.LatencyAvg,
Min: rec.Min,
Max: rec.Max,
Avg: rec.Avg,
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/transport-discovery/store/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,33 @@ type TransportData struct {
Bandwidth uint64 `json:"bandwidth"` // Total bytes (sent + recv)
LastUpdate int64 `json:"last_update"` // Unix timestamp of last update
}

// LatencyRecord is the durable per-transport latency snapshot persisted at
// transport-discovery:lat:<id>. It lives independently of the registration
// blob (TransportData) so a 5-min entry-timeout cycle doesn't erase
// latency the way it did when the values were stored only inside the tp:
// key. Bandwidth gets the same independence via bw:daily:* keys; this
// gives latency the equivalent durability so /metrics doesn't show a
// transport with bandwidth-today but no latency just because the visor's
// re-registration window briefly lapsed.
//
// Last-writer-wins on the per-transport key: latency is round-trip and
// both edges observe the same RTT modulo measurement noise, so no merge
// or per-edge tracking. Min/Max/Avg are stored in microseconds to match
// the existing TransportData layout. UpdatedAt is informational (lets
// readers gauge staleness) and not currently surfaced to the API.
type LatencyRecord struct {
Min int64 `json:"min"` // microseconds
Max int64 `json:"max"` // microseconds
Avg int64 `json:"avg"` // microseconds
UpdatedAt int64 `json:"updated_at"`
}

// latencyTTL is how long a latency record sits in redis without being
// refreshed before it ages out. Mirrors bw:daily:* retention so the two
// telemetry types share the same observability window.
const latencyTTL = 35 * 24 * time.Hour

type redisStore struct {
client *redis.Client
ttl time.Duration
Expand Down
56 changes: 55 additions & 1 deletion pkg/transport-discovery/store/redis_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,15 @@ func (s *redisStore) GetTransportByID(ctx context.Context, id uuid.UUID) (*trans
return nil, err
}

return s.dataToEntry(data)
entry, err := s.dataToEntry(data)
if err != nil {
return nil, err
}
rec, _ := s.getLatencyRecord(ctx, id) //nolint:errcheck // best-effort overlay; entry stays usable without latency
if rec != nil && rec.Avg > 0 {
entry.Latency = float64(rec.Avg) / 1000.0
}
return entry, nil
}

func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
Expand Down Expand Up @@ -281,6 +289,7 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey)
return nil, ErrTransportNotFound
}

s.hydrateDurableLatency(ctx, entries)
s.edgeCache.Put(pk, entries)
return entries, nil
}
Expand Down Expand Up @@ -344,6 +353,12 @@ func (s *redisStore) GetAllTransports(ctx context.Context, selfTransports bool)
// Cached with the same TTL+slot scheme as GetAllTransports — metrics
// scrapers (Prometheus / Victoria Metrics) hit these endpoints on a
// regular cadence and were paying a full SCAN+MGET each time.
//
// Latency lives in dedicated lat:<id> keys (independent of the tp:<id>
// registration TTL); after scanAllTransports populates entries from the
// blobs we overlay the durable latency so aggregate-metric paths
// (GetNetworkMetrics, GetVisorAggregateMetrics) see the same values
// /metrics surfaces, including across registration churn.
func (s *redisStore) getAllTransportsWithQoS(ctx context.Context, selfTransports bool) ([]*transport.Entry, error) {
if entries, ok := s.allTpsCache.Get(selfTransports, true); ok {
return entries, nil
Expand All @@ -352,10 +367,45 @@ func (s *redisStore) getAllTransportsWithQoS(ctx context.Context, selfTransports
if err != nil {
return nil, err
}
s.hydrateDurableLatency(ctx, entries)
s.allTpsCache.Put(selfTransports, true, entries)
return entries, nil
}

// hydrateDurableLatency overlays the persisted lat:<id> values onto
// entry.Latency. Best-effort: a redis or decode error leaves the entry
// at whatever scanAllTransports produced (which after the latency
// move-out is 0 — the blob's lat_avg field is no longer written, but
// remains in the schema for backwards-compatible decoding of older
// payloads still present in redis).
func (s *redisStore) hydrateDurableLatency(ctx context.Context, entries []*transport.Entry) {
if len(entries) == 0 {
return
}
keys := make([]string, len(entries))
for i, e := range entries {
keys[i] = s.latencyKey(e.ID)
}
vals, err := s.client.MGet(ctx, keys...).Result()
if err != nil {
return
}
for i, v := range vals {
raw, ok := v.(string)
if !ok || raw == "" {
continue
}
var rec LatencyRecord
if err := json.Unmarshal([]byte(raw), &rec); err != nil {
continue
}
if rec.Avg > 0 {
// Same us → ms conversion dataToEntry applies.
entries[i].Latency = float64(rec.Avg) / 1000.0
}
}
}

// scanAllTransports is the shared implementation for GetAllTransports and getAllTransportsWithQoS.
// Reads the transport-id index set built by RegisterTransport / DeregisterTransport
// and MGET-fetches the values; lazy-removes stale members whose primary
Expand Down Expand Up @@ -458,6 +508,10 @@ func (s *redisStore) transportKey(id uuid.UUID) string {
return fmt.Sprintf("%s:tp:%s", serviceName, id.String())
}

func (s *redisStore) latencyKey(id uuid.UUID) string {
return fmt.Sprintf("%s:lat:%s", serviceName, id.String())
}

func (s *redisStore) edgeKey(pk cipher.PubKey) string {
return fmt.Sprintf("%s:edge:%s", serviceName, pk.Hex())
}
Expand Down
Loading