Skip to content

Commit b05d47d

Browse files
committed
Added a buffer to handle reconnection events
1 parent c77809d commit b05d47d

File tree

1 file changed

+78
-32
lines changed

1 file changed

+78
-32
lines changed

src/webrtc_link.rs

Lines changed: 78 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,23 @@ use thiserror::Error;
1313
use tokio::net::TcpStream;
1414
use tokio::sync::Mutex;
1515
use tokio_tungstenite::{
16-
Connector, MaybeTlsStream, WebSocketStream, connect_async_tls_with_config,
16+
connect_async_tls_with_config, Connector, MaybeTlsStream, WebSocketStream,
1717
};
1818
use tungstenite::Message;
19+
use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_H264};
1920
use webrtc::api::APIBuilder;
20-
use webrtc::api::media_engine::{MIME_TYPE_H264, MediaEngine};
21-
use webrtc::data_channel::RTCDataChannel;
2221
use webrtc::data_channel::data_channel_message::DataChannelMessage;
22+
use webrtc::data_channel::RTCDataChannel;
2323
use webrtc::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
2424
use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
2525
use webrtc::ice_transport::ice_server::RTCIceServer;
2626
use webrtc::media::Sample;
27-
use webrtc::peer_connection::RTCPeerConnection;
2827
use webrtc::peer_connection::configuration::RTCConfiguration;
2928
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
29+
use webrtc::peer_connection::RTCPeerConnection;
3030
use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
31-
use webrtc::track::track_local::TrackLocal;
3231
use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample;
32+
use webrtc::track::track_local::TrackLocal;
3333

3434
use crate::webrtc_message::WebRtcMessage;
3535

@@ -39,7 +39,9 @@ type MaybeWebSocketReader =
3939
Arc<Mutex<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>;
4040

4141
pub type OnWebRtcMessageHdlrFn = Box<
42-
dyn (FnMut(&WebRtcMessage) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync,
42+
dyn (FnMut(&WebRtcMessage) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
43+
+ Send
44+
+ Sync,
4345
>;
4446

4547
fn insecure_verifier() -> Arc<dyn ServerCertVerifier> {
@@ -101,7 +103,7 @@ pub enum WebRtcLinkError {
101103
FailedToSendMessage,
102104
#[error("Unrecognised message type received")]
103105
UnrecognisedMessage,
104-
#[error("Failed to write framte to GStreamer")]
106+
#[error("Failed to write frame to GStreamer")]
105107
WriteFrameError,
106108
}
107109

@@ -119,6 +121,10 @@ pub struct WebRtcLink {
119121
ws_read: MaybeWebSocketReader,
120122
peer_connection: Arc<Mutex<Option<RTCPeerConnection>>>,
121123

124+
// ICE state helpers
125+
remote_desc_set: Arc<Mutex<bool>>,
126+
pending_remote_candidates: Arc<Mutex<Vec<RTCIceCandidateInit>>>,
127+
122128
// Valid only after full connection
123129
video_track: Arc<Mutex<Option<Arc<TrackLocalStaticSample>>>>,
124130
data_channel: Arc<Mutex<Option<Arc<RTCDataChannel>>>>,
@@ -138,6 +144,8 @@ impl WebRtcLink {
138144
ws_write: Arc::new(Mutex::new(None)),
139145
ws_read: Arc::new(Mutex::new(None)),
140146
peer_connection: Arc::new(Mutex::new(None)),
147+
remote_desc_set: Arc::new(Mutex::new(false)),
148+
pending_remote_candidates: Arc::new(Mutex::new(Vec::new())),
141149
video_track: Arc::new(Mutex::new(None)),
142150
data_channel: Arc::new(Mutex::new(None)),
143151
remote_id: Arc::new(Mutex::new(None)),
@@ -183,12 +191,10 @@ impl WebRtcLink {
183191

184192
debug!("Adding default codecs to WebRTC offer");
185193
let mut media_engine = MediaEngine::default();
186-
media_engine
187-
.register_default_codecs()
188-
.map_err(|e| {
189-
error!("❌ Register_default_codecs failed: {e}");
190-
WebRtcLinkError::AddDefaultCodecFailed
191-
})?;
194+
media_engine.register_default_codecs().map_err(|e| {
195+
error!("❌ Register_default_codecs failed: {e}");
196+
WebRtcLinkError::AddDefaultCodecFailed
197+
})?;
192198
let api = APIBuilder::new().with_media_engine(media_engine).build();
193199

194200
debug!("Adding STUN servers to WebRTC offer");
@@ -201,13 +207,10 @@ impl WebRtcLink {
201207
};
202208

203209
debug!("Creating peer connection");
204-
let peer_connection = api
205-
.new_peer_connection(config)
206-
.await
207-
.map_err(|e| {
208-
error!("❌ New_peer_connection failed: {e}");
209-
WebRtcLinkError::PeerConnectionCreationFailed
210-
})?;
210+
let peer_connection = api.new_peer_connection(config).await.map_err(|e| {
211+
error!("❌ New_peer_connection failed: {e}");
212+
WebRtcLinkError::PeerConnectionCreationFailed
213+
})?;
211214

212215
info!("✅ PeerConnection created for robot_id={}", self.local_id);
213216

@@ -225,9 +228,7 @@ impl WebRtcLink {
225228
let robot_id = robot_id.clone();
226229

227230
Box::pin(async move {
228-
if let Some(candidate) = c
229-
&& let Some(ws_write) = &mut *write_clone.lock().await
230-
{
231+
if let Some(candidate) = c && let Some(ws_write) = &mut *write_clone.lock().await {
231232
match candidate.to_json() {
232233
Ok(json_candidate) => {
233234
let maybe_to = remote_id.lock().await.clone();
@@ -321,7 +322,6 @@ impl WebRtcLink {
321322
Ok(())
322323
}
323324

324-
325325
pub async fn try_register(&mut self) -> Result<(), WebRtcLinkError> {
326326
let current_status = *self.status.lock().await;
327327
info!(
@@ -346,8 +346,8 @@ impl WebRtcLink {
346346
};
347347

348348
if let Some(ws_write) = &mut *self.ws_write.lock().await {
349-
let encoded = serde_json::to_string(&register_msg)
350-
.map_err(|_| WebRtcLinkError::EncodeMessageFailure)?;
349+
let encoded =
350+
serde_json::to_string(&register_msg).map_err(|_| WebRtcLinkError::EncodeMessageFailure)?;
351351
debug!("Sending register message: {encoded}");
352352

353353
ws_write
@@ -370,7 +370,6 @@ impl WebRtcLink {
370370
Ok(())
371371
}
372372

373-
374373
pub async fn write_frame(&self, frame: Bytes) -> Result<(), WebRtcLinkError> {
375374
if let Some(video) = self.video_track.lock().await.as_ref() {
376375
video
@@ -398,6 +397,8 @@ impl WebRtcLink {
398397
let read_clone = Arc::clone(&self.ws_read);
399398
let video_track_clone = Arc::clone(&self.video_track);
400399
let peer_connection_clone = Arc::clone(&self.peer_connection);
400+
let remote_desc_set_clone = Arc::clone(&self.remote_desc_set);
401+
let pending_remote_candidates_clone = Arc::clone(&self.pending_remote_candidates);
401402
let robot_id = self.local_id.clone();
402403

403404
info!("Starting listen loop for robot_id={}", robot_id);
@@ -454,7 +455,8 @@ impl WebRtcLink {
454455
warn!("Offer received without 'from', remote_id not set");
455456
}
456457

457-
let video_track = add_video_track(&peer_connection_clone).await?;
458+
let video_track =
459+
add_video_track(&peer_connection_clone).await?;
458460
video_track_clone.lock().await.replace(video_track);
459461

460462
if let Some(pc) = peer_connection_clone.lock().await.as_mut() {
@@ -465,6 +467,34 @@ impl WebRtcLink {
465467
WebRtcLinkError::RemoteDescriptionFailed
466468
})?;
467469

470+
// Mark remote description as set
471+
{
472+
let mut rd =
473+
remote_desc_set_clone.lock().await;
474+
*rd = true;
475+
}
476+
477+
// Flush any queued remote candidates
478+
{
479+
let mut pending =
480+
pending_remote_candidates_clone.lock().await;
481+
if !pending.is_empty() {
482+
info!(
483+
"Flushing {} pending ICE candidates",
484+
pending.len()
485+
);
486+
for cand in pending.drain(..) {
487+
if let Err(e) =
488+
pc.add_ice_candidate(cand).await
489+
{
490+
error!(
491+
"Failed to add queued ICE candidate: {e}"
492+
);
493+
}
494+
}
495+
}
496+
}
497+
468498
info!("Remote description set, creating answer");
469499

470500
let answer = pc
@@ -526,16 +556,33 @@ impl WebRtcLink {
526556
WebRtcLinkError::ParseMessageFailure
527557
})?;
528558

529-
if let Some(pc) = peer_connection_clone.lock().await.as_mut() {
559+
let remote_desc_is_set =
560+
*remote_desc_set_clone.lock().await;
561+
562+
if !remote_desc_is_set {
563+
info!(
564+
"Remote description not set yet, buffering ICE candidate"
565+
);
566+
pending_remote_candidates_clone
567+
.lock()
568+
.await
569+
.push(candidate);
570+
} else if let Some(pc) =
571+
peer_connection_clone.lock().await.as_mut()
572+
{
530573
info!("Adding remote ICE candidate");
531574
pc.add_ice_candidate(candidate)
532575
.await
533576
.map_err(|e| {
534-
error!("add_ice_candidate failed: {e}");
577+
error!(
578+
"add_ice_candidate failed: {e}"
579+
);
535580
WebRtcLinkError::AddIceCandidateFailed
536581
})?;
537582
} else {
538-
error!("PeerConnection is None in candidate handler");
583+
error!(
584+
"PeerConnection is None in candidate handler"
585+
);
539586
}
540587
} else {
541588
warn!("Candidate message with no candidate field");
@@ -560,7 +607,6 @@ impl WebRtcLink {
560607
Ok::<(), WebRtcLinkError>(())
561608
});
562609
}
563-
564610
}
565611

566612
#[derive(Serialize, Deserialize, Debug)]

0 commit comments

Comments
 (0)