Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,15 @@ func (rg *RouteGroup) handlePongPacket(packet routing.Packet) error {

rg.logger.WithField("func", "RouteGroup.handlePongPacket").Tracef("Latency is around %.1f ms", latencyMs)

// A pong correlates to no specific ping (no in-flight tracking), so
// a long-delayed pong arriving after the host woke / queue drained
// produces a 30+ second sample. Reject up front so neither
// networkStats, the synchronous MeasureLatency consumer, nor the
// underlying transport see the bogus value.
if latencyMs <= 0 || latencyMs > transport.MaxReasonableRTTMs {
return nil
}

rg.networkStats.SetLatency(uint32(latencyMs)) //nolint: gosec

// If there's a pending synchronous measurement, send the result
Expand Down Expand Up @@ -1258,14 +1267,21 @@ func (rg *RouteGroup) MeasureLatency(ctx context.Context, count int) (min, max,
// Wait for pong with timeout
select {
case latencyMs := <-pongCh:
// Drop non-positive samples — a 0 here would seed min/max
// at 0 and produce a snapshot {min:0, max:X, avg:Y} that
// downstream consumers (TPD, /stats) treat as "no min
// measurement" and either reject or display misleadingly.
// Drop samples outside (0, transport.MaxReasonableRTTMs]:
// - 0 seeds min/max at 0, producing {min:0, max:X, avg:Y}
// that downstream consumers reject or display oddly.
// - Stale pongs from earlier rounds (or from the periodic
// pingLoop) can land in this buffered channel after a
// long delay and produce 30+ second readings that
// would pin Max indefinitely on the underlying tp.
if latencyMs <= 0 {
rg.logger.Debugf("Ping %d/%d: dropped non-positive sample (%.6f ms)", i+1, count, latencyMs)
continue
}
if latencyMs > transport.MaxReasonableRTTMs {
rg.logger.Debugf("Ping %d/%d: dropped outlier sample (%.0f ms) — likely stale pong", i+1, count, latencyMs)
continue
}
measurements = append(measurements, latencyMs)
rg.logger.Debugf("Ping %d/%d: %.2f ms", i+1, count, latencyMs)
case <-time.After(timeout):
Expand Down
43 changes: 34 additions & 9 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,29 @@ func (mt *ManagedTransport) GetLatencyStats() LatencyStats {
return mt.latencyStats
}

// MaxReasonableRTTMs is the upper bound for an accepted latency sample.
// handleTransportPong has no in-flight tracking, so a pong that finally
// arrives ~30s after its ping (delayed packet, suspended host, NIC
// queue stall) yields rttMs in the tens of seconds. Min/Avg shrug it
// off but Max only ever grows, so a single bad sample pins the
// transport's Max forever (or until the lat:<id> key TTLs out 35 days
// later). Real intercontinental RTT plus heavy queueing is comfortably
// under 5s; 30s is a generous floor for "obviously bogus".
//
// Exported so router-side latency producers (RouteGroup.MeasureLatency,
// RouteGroup.handlePongPacket) can apply the same threshold.
const MaxReasonableRTTMs = 30_000.0

// SetLatency sets the average inter-visor ping latency in milliseconds.
// For backwards compatibility; prefer SetLatencyStats for full statistics.
//
// A non-positive sample is dropped: ns-resolution timestamps can produce
// rttMs == 0 on a sub-microsecond loopback, and a 0 here would clobber
// Avg and (via the `latencyMs < Min` branch) Min, leaving Max as the only
// surviving field. Round-trip time is strictly positive in any real
// measurement, so treating 0 as "no sample" is correct.
// Samples outside (0, MaxReasonableRTTMs] are dropped:
// - rttMs <= 0 happens with ns-resolution timestamps on sub-µs loopback;
// a 0 there clobbers Avg and (via the `latencyMs < Min` branch) Min.
// - rttMs > MaxReasonableRTTMs happens when handleTransportPong receives
// a long-delayed pong; that 30+ second value would pin Max indefinitely.
func (mt *ManagedTransport) SetLatency(latencyMs float64) {
if latencyMs <= 0 {
if latencyMs <= 0 || latencyMs > MaxReasonableRTTMs {
return
}
mt.latencyMx.Lock()
Expand All @@ -176,13 +189,17 @@ func (mt *ManagedTransport) SetLatency(latencyMs float64) {
}

// SetLatencyStats sets the full latency statistics. A snapshot with any
// non-positive field is rejected: a partial measurement (e.g. all five
// pings timed out except one that returned in <1µs) would otherwise
// overwrite a previously good record with a zero in min or max.
// field outside (0, MaxReasonableRTTMs] is rejected wholesale: a partial
// measurement (e.g. all five pings timed out except one that returned in
// <1µs) would otherwise overwrite a previously good record with a zero
// in min, and a stragglerpong on the upper end would pin Max forever.
func (mt *ManagedTransport) SetLatencyStats(stats LatencyStats) {
if stats.Min <= 0 || stats.Max <= 0 || stats.Avg <= 0 {
return
}
if stats.Min > MaxReasonableRTTMs || stats.Max > MaxReasonableRTTMs || stats.Avg > MaxReasonableRTTMs {
return
}
mt.latencyMx.Lock()
defer mt.latencyMx.Unlock()
mt.latencyStats = stats
Expand Down Expand Up @@ -412,6 +429,14 @@ func (mt *ManagedTransport) handleTransportPong(p routing.Packet) {
if rttMs <= 0 {
return // clock skew, corrupted timestamp, or sub-µs RTT we can't represent
}
if rttMs > MaxReasonableRTTMs {
// Pong arrived long after the ping — packet was delayed in a
// queue, the host was suspended, or the kernel held the
// timestamp. Accepting this would pin Max forever. SetLatency
// re-checks the same bound; mirrored here to avoid the log.
mt.log.WithField("rtt_ms", fmt.Sprintf("%.0f", rttMs)).Trace("Dropping outlier transport pong RTT")
return
}
mt.SetLatency(rttMs)
mt.log.WithField("rtt_ms", fmt.Sprintf("%.2f", rttMs)).Trace("Transport ping RTT")
}
Expand Down
51 changes: 51 additions & 0 deletions pkg/transport/managed_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,54 @@ func TestSetLatencyStatsDropsPartialZero(t *testing.T) {
mt.SetLatencyStats(better)
require.Equal(t, better, mt.GetLatencyStats())
}

// A pong arriving long after its ping (delayed packet, suspended host)
// produces an outlier RTT that would pin Max indefinitely. The
// upper-bound guard must drop such samples without disturbing the
// running stats. Live evidence: production transports observed with
// min/avg ~190ms and max ~35,000ms after a single straggler.
func TestSetLatencyDropsOutliers(t *testing.T) {
mt := &ManagedTransport{}
mt.SetLatency(15.5)
require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats())

// Just above the bound is rejected.
mt.SetLatency(MaxReasonableRTTMs + 1)
require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats(),
"outlier sample updated the stats")

// 35-second straggler — the production case.
mt.SetLatency(35_000)
require.Equal(t, LatencyStats{Min: 15.5, Max: 15.5, Avg: 15.5}, mt.GetLatencyStats(),
"35s straggler updated the stats")

// At the bound — accepted.
mt.SetLatency(MaxReasonableRTTMs)
require.Equal(t, LatencyStats{Min: 15.5, Max: MaxReasonableRTTMs, Avg: MaxReasonableRTTMs}, mt.GetLatencyStats(),
"sample exactly at the bound was rejected")
}

// SetLatencyStats must reject the entire snapshot when any field
// exceeds the bound — better to keep the previous good record than
// allow a 35s max to land alongside otherwise-fine min/avg values.
func TestSetLatencyStatsDropsOutliers(t *testing.T) {
mt := &ManagedTransport{}
good := LatencyStats{Min: 100, Max: 250, Avg: 180}
mt.SetLatencyStats(good)

for _, bad := range []LatencyStats{
{Min: 100, Max: MaxReasonableRTTMs + 1, Avg: 180}, // straggler max
{Min: 100, Max: 35_000, Avg: 180}, // 35s production case
{Min: MaxReasonableRTTMs + 1, Max: 35_000, Avg: 180}, // both at the upper end
{Min: 100, Max: 250, Avg: MaxReasonableRTTMs + 0.001}, // avg over the line
} {
mt.SetLatencyStats(bad)
require.Equal(t, good, mt.GetLatencyStats(),
"outlier snapshot %+v overwrote good stats", bad)
}

// At the bound — accepted.
atBound := LatencyStats{Min: 100, Max: MaxReasonableRTTMs, Avg: 5_000}
mt.SetLatencyStats(atBound)
require.Equal(t, atBound, mt.GetLatencyStats())
}
Loading