Skip to content
12 changes: 12 additions & 0 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ fn bench_protocol_serialize_test_start(c: &mut Criterion) {
protocol: Protocol::Tcp,
direction: Direction::Upload,
bitrate: Some(1_000_000_000),
congestion: None,
mptcp: false,
dscp: None,
};

c.bench_function("protocol_serialize_test_start", |b| {
Expand All @@ -141,6 +143,8 @@ fn bench_protocol_serialize_interval(c: &mut Criterion) {
jitter_ms: None,
lost: None,
error: None,
rtt_us: None,
cwnd: None,
},
StreamInterval {
id: 1,
Expand All @@ -149,6 +153,8 @@ fn bench_protocol_serialize_interval(c: &mut Criterion) {
jitter_ms: None,
lost: None,
error: None,
rtt_us: None,
cwnd: None,
},
],
aggregate: AggregateInterval {
Expand All @@ -157,6 +163,8 @@ fn bench_protocol_serialize_interval(c: &mut Criterion) {
retransmits: Some(8),
jitter_ms: None,
lost: None,
rtt_us: None,
cwnd: None,
},
};

Expand All @@ -176,13 +184,17 @@ fn bench_protocol_deserialize_interval(c: &mut Criterion) {
jitter_ms: None,
lost: None,
error: None,
rtt_us: None,
cwnd: None,
}],
aggregate: AggregateInterval {
bytes: 125_000_000,
throughput_mbps: 1000.0,
retransmits: Some(5),
jitter_ms: None,
lost: None,
rtt_us: None,
cwnd: None,
},
};
let json = msg.serialize().unwrap();
Expand Down
14 changes: 10 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,11 +816,17 @@ impl Client {
error!("Bidir receive error: {}", e);
}

if let (Ok(write_half), Ok(read_half)) = (send_result, recv_result)
&& let Ok(stream) = read_half.reunite(write_half)
&& let Some(info) = tcp::get_stream_tcp_info(&stream)
// Use the clamp-time TCP_INFO snapshot returned by
// send_data_half: re-reading after reunite would see a later,
// larger bytes_acked that could exceed the clamped bytes_sent.
if let (Ok((write_half, send_tcp_info)), Ok(read_half)) =
(send_result, recv_result)
{
stream_stats.set_final_tcp_info(info);
if let Some(info) = send_tcp_info {
stream_stats.set_final_tcp_info(info);
}
// Reunite to keep the socket lifetime symmetric; dropped next.
let _ = read_half.reunite(write_half);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ mod tests {
rtt_us,
rtt_var_us: 100,
cwnd: 65535,
bytes_acked: None,
}),
udp_stats: None,
}
Expand Down
1 change: 1 addition & 0 deletions src/output/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ mod tests {
rtt_us: 1000,
rtt_var_us: 100,
cwnd: 64 * 1024,
bytes_acked: None,
}),
udp_stats: None,
}
Expand Down
4 changes: 4 additions & 0 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ pub struct TcpInfoSnapshot {
pub rtt_us: u32,
pub rtt_var_us: u32,
pub cwnd: u32,
/// Bytes acknowledged by the peer (from `tcpi_bytes_acked` on Linux).
/// Used to correct overcount on abortive close where unACK'd buffer is discarded.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bytes_acked: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
26 changes: 15 additions & 11 deletions src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2187,14 +2187,17 @@ async fn spawn_tcp_handlers(
.await
});

// Wait for both to complete and reunite halves to get TCP_INFO
// Wait for both to complete. Use the clamp-time TCP_INFO snapshot
// returned by send_data_half rather than re-reading post-reunite.
let (send_result, recv_result) = tokio::join!(send_handle, recv_handle);
if let (Ok(Ok(write_half)), Ok(Ok(read_half))) = (send_result, recv_result)
&& let Ok(stream) = read_half.reunite(write_half)
&& let Some(info) = tcp::get_stream_tcp_info(&stream)
if let (Ok(Ok((write_half, send_tcp_info))), Ok(Ok(read_half))) =
(send_result, recv_result)
{
final_stats.add_retransmits(info.retransmits);
test_stats.add_tcp_info(info);
if let Some(info) = send_tcp_info {
final_stats.add_retransmits(info.retransmits);
test_stats.add_tcp_info(info);
}
let _ = read_half.reunite(write_half);
}
}
}
Expand Down Expand Up @@ -2378,13 +2381,14 @@ async fn spawn_tcp_stream_handlers(
});

let (send_result, recv_result) = tokio::join!(send_handle, recv_handle);
if let (Ok(Ok(write_half)), Ok(Ok(read_half))) =
if let (Ok(Ok((write_half, send_tcp_info))), Ok(Ok(read_half))) =
(send_result, recv_result)
&& let Ok(stream) = read_half.reunite(write_half)
&& let Some(info) = tcp::get_stream_tcp_info(&stream)
{
final_stats.add_retransmits(info.retransmits);
test_stats.add_tcp_info(info);
if let Some(info) = send_tcp_info {
final_stats.add_retransmits(info.retransmits);
test_stats.add_tcp_info(info);
}
let _ = read_half.reunite(write_half);
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,18 @@ impl TestStats {
let mut retransmits = 0u64;
let mut cwnd = 0u32;
let mut count = 0u64;
let mut bytes_acked_sum = 0u64;
let mut any_bytes_acked = false;
for stream in &self.streams {
if let Some(info) = stream.final_tcp_info() {
rtt_sum += info.rtt_us as u64;
rtt_var_sum += info.rtt_var_us as u64;
retransmits += info.retransmits;
cwnd += info.cwnd;
if let Some(acked) = info.bytes_acked {
bytes_acked_sum += acked;
any_bytes_acked = true;
}
count += 1;
}
}
Expand All @@ -428,6 +434,11 @@ impl TestStats {
rtt_us: avg_rtt as u32,
rtt_var_us: avg_rtt_var as u32,
cwnd,
bytes_acked: if any_bytes_acked {
Some(bytes_acked_sum)
} else {
None
},
}
})
}
Expand Down Expand Up @@ -509,6 +520,7 @@ mod tests {
rtt_us: 1200,
rtt_var_us: 300,
cwnd: 64 * 1024,
bytes_acked: None,
};

stats.set_tcp_info_fd(123);
Expand All @@ -531,12 +543,14 @@ mod tests {
rtt_us: 1000,
rtt_var_us: 200,
cwnd: 32 * 1024,
bytes_acked: None,
});
stats.streams[1].set_final_tcp_info(TcpInfoSnapshot {
retransmits: 5,
rtt_us: 3000,
rtt_var_us: 600,
cwnd: 64 * 1024,
bytes_acked: None,
});

let info = stats
Expand Down
Loading