diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 385127dae4..87585ed6a7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -1200,6 +1200,15 @@ func (rg *RouteGroup) handlePongPacket(packet routing.Packet) error { rg.logger.WithField("func", "RouteGroup.handlePongPacket").Tracef("Latency is around %.1f ms", latencyMs) + // A pong correlates to no specific ping (no in-flight tracking), so + // a long-delayed pong arriving after the host woke / queue drained + // produces a 30+ second sample. Reject up front so neither + // networkStats, the synchronous MeasureLatency consumer, nor the + // underlying transport see the bogus value. + if latencyMs <= 0 || latencyMs > transport.MaxReasonableRTTMs { + return nil + } + rg.networkStats.SetLatency(uint32(latencyMs)) //nolint: gosec // If there's a pending synchronous measurement, send the result @@ -1258,14 +1267,21 @@ func (rg *RouteGroup) MeasureLatency(ctx context.Context, count int) (min, max, // Wait for pong with timeout select { case latencyMs := <-pongCh: - // Drop non-positive samples — a 0 here would seed min/max - // at 0 and produce a snapshot {min:0, max:X, avg:Y} that - // downstream consumers (TPD, /stats) treat as "no min - // measurement" and either reject or display misleadingly. + // Drop samples outside (0, transport.MaxReasonableRTTMs]: + // - 0 seeds min/max at 0, producing {min:0, max:X, avg:Y} + // that downstream consumers reject or display oddly. + // - Stale pongs from earlier rounds (or from the periodic + // pingLoop) can land in this buffered channel after a + // long delay and produce 30+ second readings that + // would pin Max indefinitely on the underlying tp. if latencyMs <= 0 { rg.logger.Debugf("Ping %d/%d: dropped non-positive sample (%.6f ms)", i+1, count, latencyMs) continue } + if latencyMs > transport.MaxReasonableRTTMs { + rg.logger.Debugf("Ping %d/%d: dropped outlier sample (%.0f ms) — likely stale pong", i+1, count, latencyMs) + continue + } measurements = append(measurements, latencyMs) rg.logger.Debugf("Ping %d/%d: %.2f ms", i+1, count, latencyMs) case <-time.After(timeout): diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 701a4826b3..62399df8a5 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -152,16 +152,29 @@ func (mt *ManagedTransport) GetLatencyStats() LatencyStats { return mt.latencyStats } +// MaxReasonableRTTMs is the upper bound for an accepted latency sample. +// handleTransportPong has no in-flight tracking, so a pong that finally +// arrives ~30s after its ping (delayed packet, suspended host, NIC +// queue stall) yields rttMs in the tens of seconds. Min/Avg shrug it +// off but Max only ever grows, so a single bad sample pins the +// transport's Max forever (or until the lat: key TTLs out 35 days +// later). Real intercontinental RTT plus heavy queueing is comfortably +// under 5s; 30s is a generous floor for "obviously bogus". +// +// Exported so router-side latency producers (RouteGroup.MeasureLatency, +// RouteGroup.handlePongPacket) can apply the same threshold. +const MaxReasonableRTTMs = 30_000.0 + // SetLatency sets the average inter-visor ping latency in milliseconds. // For backwards compatibility; prefer SetLatencyStats for full statistics. // -// A non-positive sample is dropped: ns-resolution timestamps can produce -// rttMs == 0 on a sub-microsecond loopback, and a 0 here would clobber -// Avg and (via the `latencyMs < Min` branch) Min, leaving Max as the only -// surviving field. Round-trip time is strictly positive in any real -// measurement, so treating 0 as "no sample" is correct. +// Samples outside (0, MaxReasonableRTTMs] are dropped: +// - rttMs <= 0 happens with ns-resolution timestamps on sub-µs loopback; +// a 0 there clobbers Avg and (via the `latencyMs < Min` branch) Min. +// - rttMs > MaxReasonableRTTMs happens when handleTransportPong receives +// a long-delayed pong; that 30+ second value would pin Max indefinitely. func (mt *ManagedTransport) SetLatency(latencyMs float64) { - if latencyMs <= 0 { + if latencyMs <= 0 || latencyMs > MaxReasonableRTTMs { return } mt.latencyMx.Lock() @@ -176,13 +189,17 @@ func (mt *ManagedTransport) SetLatency(latencyMs float64) { } // SetLatencyStats sets the full latency statistics. A snapshot with any -// non-positive field is rejected: a partial measurement (e.g. all five -// pings timed out except one that returned in <1µs) would otherwise -// overwrite a previously good record with a zero in min or max. +// field outside (0, MaxReasonableRTTMs] is rejected wholesale: a partial +// measurement (e.g. all five pings timed out except one that returned in +// <1µs) would otherwise overwrite a previously good record with a zero +// in min, and a stragglerpong on the upper end would pin Max forever. func (mt *ManagedTransport) SetLatencyStats(stats LatencyStats) { if stats.Min <= 0 || stats.Max <= 0 || stats.Avg <= 0 { return } + if stats.Min > MaxReasonableRTTMs || stats.Max > MaxReasonableRTTMs || stats.Avg > MaxReasonableRTTMs { + return + } mt.latencyMx.Lock() defer mt.latencyMx.Unlock() mt.latencyStats = stats @@ -412,6 +429,14 @@ func (mt *ManagedTransport) handleTransportPong(p routing.Packet) { if rttMs <= 0 { return // clock skew, corrupted timestamp, or sub-µs RTT we can't represent } + if rttMs > MaxReasonableRTTMs { + // Pong arrived long after the ping — packet was delayed in a + // queue, the host was suspended, or the kernel held the + // timestamp. Accepting this would pin Max forever. SetLatency + // re-checks the same bound; mirrored here to avoid the log. + mt.log.WithField("rtt_ms", fmt.Sprintf("%.0f", rttMs)).Trace("Dropping outlier transport pong RTT") + return + } mt.SetLatency(rttMs) mt.log.WithField("rtt_ms", fmt.Sprintf("%.2f", rttMs)).Trace("Transport ping RTT") } diff --git a/pkg/transport/managed_transport_test.go b/pkg/transport/managed_transport_test.go index 49075c0053..cd9c8c2d53 100644 --- a/pkg/transport/managed_transport_test.go +++ b/pkg/transport/managed_transport_test.go @@ -208,3 +208,54 @@ func TestSetLatencyStatsDropsPartialZero(t *testing.T) { mt.SetLatencyStats(better) require.Equal(t, better, mt.GetLatencyStats()) } + +// A pong arriving long after its ping (delayed packet, suspended host) +// produces an outlier RTT that would pin Max indefinitely. The +// upper-bound guard must drop such samples without disturbing the +// running stats. Live evidence: production transports observed with +// min/avg ~190ms and max ~35,000ms after a single straggler. +func TestSetLatencyDropsOutliers(t *testing.T) { + mt := &ManagedTransport{} + mt.SetLatency(15.5) + require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats()) + + // Just above the bound is rejected. + mt.SetLatency(MaxReasonableRTTMs + 1) + require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats(), + "outlier sample updated the stats") + + // 35-second straggler — the production case. + mt.SetLatency(35_000) + require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats(), + "35s straggler updated the stats") + + // At the bound — accepted. + mt.SetLatency(MaxReasonableRTTMs) + require.Equal(t, LatencyStats{Min: 15.5, Max: MaxReasonableRTTMs, Avg: MaxReasonableRTTMs}, mt.GetLatencyStats(), + "sample exactly at the bound was rejected") +} + +// SetLatencyStats must reject the entire snapshot when any field +// exceeds the bound — better to keep the previous good record than +// allow a 35s max to land alongside otherwise-fine min/avg values. +func TestSetLatencyStatsDropsOutliers(t *testing.T) { + mt := &ManagedTransport{} + good := LatencyStats{Min: 100, Max: 250, Avg: 180} + mt.SetLatencyStats(good) + + for _, bad := range []LatencyStats{ + {Min: 100, Max: MaxReasonableRTTMs + 1, Avg: 180}, // straggler max + {Min: 100, Max: 35_000, Avg: 180}, // 35s production case + {Min: MaxReasonableRTTMs + 1, Max: 35_000, Avg: 180}, // both at the upper end + {Min: 100, Max: 250, Avg: MaxReasonableRTTMs + 0.001}, // avg over the line + } { + mt.SetLatencyStats(bad) + require.Equal(t, good, mt.GetLatencyStats(), + "outlier snapshot %+v overwrote good stats", bad) + } + + // At the bound — accepted. + atBound := LatencyStats{Min: 100, Max: MaxReasonableRTTMs, Avg: 5_000} + mt.SetLatencyStats(atBound) + require.Equal(t, atBound, mt.GetLatencyStats()) +}