From f0c340ed464f8595ab06301c65724ed62c1bd296 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Sun, 3 May 2026 19:08:06 +0000 Subject: [PATCH] transport: drop outlier RTT samples above MaxReasonableRTTMs (30s) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Production observation: three sudph transports persisted with min/avg ~190ms and max ~33-35 SECONDS in /metrics. The min/avg are correct; the max is a single straggler pong arriving long after its ping (delayed packet, host suspended, NIC queue stall). Once it lands, Max only ever grows, so a single bad sample pins the transport's Max for the life of the lat: key (35 days). handleTransportPong correlates pongs to pings only by timestamp — there's no in-flight tracking that would discard a pong arriving N seconds after timeout. Real intercontinental RTT plus heavy queueing is comfortably under 5s, so 30s is a generous upper bound for "obviously bogus". Mirror the existing zero-clobber lower-bound guard with an upper-bound at every layer: - transport.MaxReasonableRTTMs (exported, 30_000.0). - SetLatency drops latencyMs > MaxReasonableRTTMs (alongside <=0). - SetLatencyStats rejects the whole snapshot if any of min/max/avg exceeds the bound. - handleTransportPong short-circuits before SetLatency, with a Trace log of the dropped value. - RouteGroup.MeasureLatency drops samples >MaxReasonableRTTMs from `measurements` so a stale pong from the periodic pingLoop leaking into the buffered pongCh can't poison the active measurement's min/max/avg. - RouteGroup.handlePongPacket rejects up front so neither networkStats, the synchronous MeasureLatency consumer, nor the underlying transport see the bogus value. TestSetLatencyDropsOutliers and TestSetLatencyStatsDropsOutliers pin both the rejection and the boundary-exact acceptance. --- pkg/router/route_group.go | 24 ++++++++++-- pkg/transport/managed_transport.go | 43 ++++++++++++++++----- pkg/transport/managed_transport_test.go | 51 +++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 13 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 385127dae4..87585ed6a7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -1200,6 +1200,15 @@ func (rg *RouteGroup) handlePongPacket(packet routing.Packet) error { rg.logger.WithField("func", "RouteGroup.handlePongPacket").Tracef("Latency is around %.1f ms", latencyMs) + // A pong correlates to no specific ping (no in-flight tracking), so + // a long-delayed pong arriving after the host woke / queue drained + // produces a 30+ second sample. Reject up front so neither + // networkStats, the synchronous MeasureLatency consumer, nor the + // underlying transport see the bogus value. + if latencyMs <= 0 || latencyMs > transport.MaxReasonableRTTMs { + return nil + } + rg.networkStats.SetLatency(uint32(latencyMs)) //nolint: gosec // If there's a pending synchronous measurement, send the result @@ -1258,14 +1267,21 @@ func (rg *RouteGroup) MeasureLatency(ctx context.Context, count int) (min, max, // Wait for pong with timeout select { case latencyMs := <-pongCh: - // Drop non-positive samples — a 0 here would seed min/max - // at 0 and produce a snapshot {min:0, max:X, avg:Y} that - // downstream consumers (TPD, /stats) treat as "no min - // measurement" and either reject or display misleadingly. + // Drop samples outside (0, transport.MaxReasonableRTTMs]: + // - 0 seeds min/max at 0, producing {min:0, max:X, avg:Y} + // that downstream consumers reject or display oddly. + // - Stale pongs from earlier rounds (or from the periodic + // pingLoop) can land in this buffered channel after a + // long delay and produce 30+ second readings that + // would pin Max indefinitely on the underlying tp. if latencyMs <= 0 { rg.logger.Debugf("Ping %d/%d: dropped non-positive sample (%.6f ms)", i+1, count, latencyMs) continue } + if latencyMs > transport.MaxReasonableRTTMs { + rg.logger.Debugf("Ping %d/%d: dropped outlier sample (%.0f ms) — likely stale pong", i+1, count, latencyMs) + continue + } measurements = append(measurements, latencyMs) rg.logger.Debugf("Ping %d/%d: %.2f ms", i+1, count, latencyMs) case <-time.After(timeout): diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 701a4826b3..62399df8a5 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -152,16 +152,29 @@ func (mt *ManagedTransport) GetLatencyStats() LatencyStats { return mt.latencyStats } +// MaxReasonableRTTMs is the upper bound for an accepted latency sample. +// handleTransportPong has no in-flight tracking, so a pong that finally +// arrives ~30s after its ping (delayed packet, suspended host, NIC +// queue stall) yields rttMs in the tens of seconds. Min/Avg shrug it +// off but Max only ever grows, so a single bad sample pins the +// transport's Max forever (or until the lat: key TTLs out 35 days +// later). Real intercontinental RTT plus heavy queueing is comfortably +// under 5s; 30s is a generous floor for "obviously bogus". +// +// Exported so router-side latency producers (RouteGroup.MeasureLatency, +// RouteGroup.handlePongPacket) can apply the same threshold. +const MaxReasonableRTTMs = 30_000.0 + // SetLatency sets the average inter-visor ping latency in milliseconds. // For backwards compatibility; prefer SetLatencyStats for full statistics. // -// A non-positive sample is dropped: ns-resolution timestamps can produce -// rttMs == 0 on a sub-microsecond loopback, and a 0 here would clobber -// Avg and (via the `latencyMs < Min` branch) Min, leaving Max as the only -// surviving field. Round-trip time is strictly positive in any real -// measurement, so treating 0 as "no sample" is correct. +// Samples outside (0, MaxReasonableRTTMs] are dropped: +// - rttMs <= 0 happens with ns-resolution timestamps on sub-µs loopback; +// a 0 there clobbers Avg and (via the `latencyMs < Min` branch) Min. +// - rttMs > MaxReasonableRTTMs happens when handleTransportPong receives +// a long-delayed pong; that 30+ second value would pin Max indefinitely. func (mt *ManagedTransport) SetLatency(latencyMs float64) { - if latencyMs <= 0 { + if latencyMs <= 0 || latencyMs > MaxReasonableRTTMs { return } mt.latencyMx.Lock() @@ -176,13 +189,17 @@ func (mt *ManagedTransport) SetLatency(latencyMs float64) { } // SetLatencyStats sets the full latency statistics. A snapshot with any -// non-positive field is rejected: a partial measurement (e.g. all five -// pings timed out except one that returned in <1µs) would otherwise -// overwrite a previously good record with a zero in min or max. +// field outside (0, MaxReasonableRTTMs] is rejected wholesale: a partial +// measurement (e.g. all five pings timed out except one that returned in +// <1µs) would otherwise overwrite a previously good record with a zero +// in min, and a stragglerpong on the upper end would pin Max forever. func (mt *ManagedTransport) SetLatencyStats(stats LatencyStats) { if stats.Min <= 0 || stats.Max <= 0 || stats.Avg <= 0 { return } + if stats.Min > MaxReasonableRTTMs || stats.Max > MaxReasonableRTTMs || stats.Avg > MaxReasonableRTTMs { + return + } mt.latencyMx.Lock() defer mt.latencyMx.Unlock() mt.latencyStats = stats @@ -412,6 +429,14 @@ func (mt *ManagedTransport) handleTransportPong(p routing.Packet) { if rttMs <= 0 { return // clock skew, corrupted timestamp, or sub-µs RTT we can't represent } + if rttMs > MaxReasonableRTTMs { + // Pong arrived long after the ping — packet was delayed in a + // queue, the host was suspended, or the kernel held the + // timestamp. Accepting this would pin Max forever. SetLatency + // re-checks the same bound; mirrored here to avoid the log. + mt.log.WithField("rtt_ms", fmt.Sprintf("%.0f", rttMs)).Trace("Dropping outlier transport pong RTT") + return + } mt.SetLatency(rttMs) mt.log.WithField("rtt_ms", fmt.Sprintf("%.2f", rttMs)).Trace("Transport ping RTT") } diff --git a/pkg/transport/managed_transport_test.go b/pkg/transport/managed_transport_test.go index 49075c0053..cd9c8c2d53 100644 --- a/pkg/transport/managed_transport_test.go +++ b/pkg/transport/managed_transport_test.go @@ -208,3 +208,54 @@ func TestSetLatencyStatsDropsPartialZero(t *testing.T) { mt.SetLatencyStats(better) require.Equal(t, better, mt.GetLatencyStats()) } + +// A pong arriving long after its ping (delayed packet, suspended host) +// produces an outlier RTT that would pin Max indefinitely. The +// upper-bound guard must drop such samples without disturbing the +// running stats. Live evidence: production transports observed with +// min/avg ~190ms and max ~35,000ms after a single straggler. +func TestSetLatencyDropsOutliers(t *testing.T) { + mt := &ManagedTransport{} + mt.SetLatency(15.5) + require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats()) + + // Just above the bound is rejected. + mt.SetLatency(MaxReasonableRTTMs + 1) + require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats(), + "outlier sample updated the stats") + + // 35-second straggler — the production case. + mt.SetLatency(35_000) + require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats(), + "35s straggler updated the stats") + + // At the bound — accepted. + mt.SetLatency(MaxReasonableRTTMs) + require.Equal(t, LatencyStats{Min: 15.5, Max: MaxReasonableRTTMs, Avg: MaxReasonableRTTMs}, mt.GetLatencyStats(), + "sample exactly at the bound was rejected") +} + +// SetLatencyStats must reject the entire snapshot when any field +// exceeds the bound — better to keep the previous good record than +// allow a 35s max to land alongside otherwise-fine min/avg values. +func TestSetLatencyStatsDropsOutliers(t *testing.T) { + mt := &ManagedTransport{} + good := LatencyStats{Min: 100, Max: 250, Avg: 180} + mt.SetLatencyStats(good) + + for _, bad := range []LatencyStats{ + {Min: 100, Max: MaxReasonableRTTMs + 1, Avg: 180}, // straggler max + {Min: 100, Max: 35_000, Avg: 180}, // 35s production case + {Min: MaxReasonableRTTMs + 1, Max: 35_000, Avg: 180}, // both at the upper end + {Min: 100, Max: 250, Avg: MaxReasonableRTTMs + 0.001}, // avg over the line + } { + mt.SetLatencyStats(bad) + require.Equal(t, good, mt.GetLatencyStats(), + "outlier snapshot %+v overwrote good stats", bad) + } + + // At the bound — accepted. + atBound := LatencyStats{Min: 100, Max: MaxReasonableRTTMs, Avg: 5_000} + mt.SetLatencyStats(atBound) + require.Equal(t, atBound, mt.GetLatencyStats()) +}