Skip to content

Commit fc5b237

Browse files
GabrielePiccocoderabbitai[bot]bmuddha
authored
fix: remove race condition between pings and request response (#600)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Refactor** * Improved WebSocket keep-alive: moved from fixed-interval polling to an activity-driven ping timer (pings every 30s; idle close after 60s). * Ping scheduling now resets on inbound activity, outbound writes, and after handling responses or errors, preventing blocked or redundant pings and reducing unnecessary network traffic. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Babur Makhmudov <bmuddha13@gmail.com>
1 parent 9e2d817 commit fc5b237

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

magicblock-aperture/src/server/websocket/connection.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
atomic::{AtomicU32, Ordering},
44
Arc,
55
},
6-
time::{Duration, Instant},
6+
time::Duration,
77
};
88

99
use fastwebsockets::{
@@ -15,7 +15,7 @@ use json::Value;
1515
use log::debug;
1616
use tokio::{
1717
sync::mpsc::{self, Receiver},
18-
time,
18+
time::{self, Instant},
1919
};
2020
use tokio_util::sync::CancellationToken;
2121

@@ -95,8 +95,10 @@ impl ConnectionHandler {
9595
/// The loop terminates upon any I/O error, an inactivity timeout, or a shutdown signal.
9696
pub(super) async fn run(mut self) {
9797
const MAX_INACTIVE_INTERVAL: Duration = Duration::from_secs(60);
98+
const PING_PERIOD: Duration = Duration::from_secs(30);
9899
let mut last_activity = Instant::now();
99-
let mut ping = time::interval(Duration::from_secs(30));
100+
let next_ping = time::sleep_until(Instant::now() + PING_PERIOD);
101+
tokio::pin!(next_ping);
100102

101103
loop {
102104
tokio::select! {
@@ -105,7 +107,11 @@ impl ConnectionHandler {
105107

106108
// 1. Handle an incoming frame from the client's WebSocket.
107109
Ok(frame) = self.ws.read_frame() => {
110+
// Record inbound client activity
108111
last_activity = Instant::now();
112+
// Reschedule the next ping
113+
next_ping.as_mut().reset(Instant::now() + PING_PERIOD);
114+
109115
if frame.opcode != OpCode::Text {
110116
continue;
111117
}
@@ -115,7 +121,7 @@ impl ConnectionHandler {
115121
let mut request = match parsed {
116122
Ok(r) => r,
117123
Err(error) => {
118-
self.report_failure(None, error).await;
124+
let _ = self.report_failure(None, error).await;
119125
continue;
120126
}
121127
};
@@ -130,8 +136,8 @@ impl ConnectionHandler {
130136
if !success { break };
131137
}
132138

133-
// 2. Handle the periodic keep-alive timer.
134-
_ = ping.tick() => {
139+
// 2. Handle the periodic keep-alive timer (scheduled relative to last activity).
140+
_ = &mut next_ping => {
135141
// If the connection has been idle for too long, close it.
136142
if last_activity.elapsed() > MAX_INACTIVE_INTERVAL {
137143
let frame = Frame::close(
@@ -146,6 +152,8 @@ impl ConnectionHandler {
146152
if self.ws.write_frame(frame).await.is_err() {
147153
break;
148154
};
155+
// Schedule the next ping
156+
next_ping.as_mut().reset(Instant::now() + PING_PERIOD);
149157
}
150158

151159
// 3. Handle a new subscription notification from the server backend.

0 commit comments

Comments
 (0)